You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/27 17:40:55 UTC

[GitHub] [flink] GJL commented on a change in pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

GJL commented on a change in pull request #12350:
URL: https://github.com/apache/flink/pull/12350#discussion_r431300217



##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 
+    cmdpid=$BASHPID; 

Review comment:
       There is trailing white space on this line.

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {

Review comment:
       ```
   run_resume_savepoint_test() {
   ...
   }
   ```
   (no `function` keyword) is the preferred way in bash: https://www.tldp.org/LDP/abs/html/functions.html
   Since we are already mixing styles in Flink, this is not very important.

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 
+    cmdpid=$BASHPID; 
+    (sleep $TEST_TIMEOUT_SECONDS; # set a timeout for this test
+    echo "Test (pid: $cmdpid) did not finish after $TEST_TIMEOUT_SECONDS seconds."
+    echo "Printing Flink logs and killing it:"
+    cat ${FLINK_DIR}/log/* 
+    kill "$cmdpid") & watchdog_pid=$!
+    echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid
+    

Review comment:
       There is trailing white space on this line.

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 
+    cmdpid=$BASHPID; 
+    (sleep $TEST_TIMEOUT_SECONDS; # set a timeout for this test

Review comment:
       What does the `;` at the end of this line do? The semicolon permits putting two or more commands on the same line. Since we have only line here it should not be needed.

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 
+    cmdpid=$BASHPID; 
+    (sleep $TEST_TIMEOUT_SECONDS; # set a timeout for this test
+    echo "Test (pid: $cmdpid) did not finish after $TEST_TIMEOUT_SECONDS seconds."
+    echo "Printing Flink logs and killing it:"
+    cat ${FLINK_DIR}/log/* 

Review comment:
       There is trailing white space on this line.

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 
+    cmdpid=$BASHPID; 
+    (sleep $TEST_TIMEOUT_SECONDS; # set a timeout for this test
+    echo "Test (pid: $cmdpid) did not finish after $TEST_TIMEOUT_SECONDS seconds."
+    echo "Printing Flink logs and killing it:"
+    cat ${FLINK_DIR}/log/* 
+    kill "$cmdpid") & watchdog_pid=$!
+    echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid
+    
+    run_resume_savepoint_test

Review comment:
       Actually it would be nice to extract a reusable function that enables:
   
   `run_test_with_timeout 900 run_resume_savepoint_test`
   
   I think you can achieve this with `shift` and `$@`

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 

Review comment:
       There is trailing white space on this line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org