You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/27 06:58:06 UTC

[GitHub] [flink] rmetzger opened a new pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

rmetzger opened a new pull request #12350:
URL: https://github.com/apache/flink/pull/12350


   ## What is the purpose of the change
   
   We observed 1 failure of test_resume_savepoint.sh so far. Due to the implementation of this test, we can not see the Flink logs.
   With this change, the logs will be made available if this error ever happens again.
   
   ## Brief change log
   
   - Copied the "watchdog" code from the HA tests
   - this PR also includes a hotfix to increase the timeout from 9 to 15 minutes for the HA tests. I want to find out if the HA tests are sometimes just slow. 9 minutes used to be the timeout because travis would have killed the e2e after 10 minutes anyways.
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] GJL commented on a change in pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
GJL commented on a change in pull request #12350:
URL: https://github.com/apache/flink/pull/12350#discussion_r431850898



##########
File path: flink-end-to-end-tests/test-scripts/common.sh
##########
@@ -799,3 +799,32 @@ function extract_job_id_from_job_submission_return() {
     echo "$JOB_ID"
 }
 
+
+#
+# NOTE: This function requires at least Bash version >= 4. Mac OS in 2020 still ships 3.x
+#
+
+run_test_with_timeout() {

Review comment:
       > I'm registering the function with the on_exit callback, thus it needs to be accessible outside of run_test_with_timeout.
   
   Sorry, missed that. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] GJL commented on a change in pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
GJL commented on a change in pull request #12350:
URL: https://github.com/apache/flink/pull/12350#discussion_r431301866



##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 
+    cmdpid=$BASHPID; 
+    (sleep $TEST_TIMEOUT_SECONDS; # set a timeout for this test

Review comment:
       What does the `;` at the end of this line do? The semicolon permits putting two or more commands on the same line. Since we have only line here it should not be needed. Same in line 139.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rmetzger closed pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
rmetzger closed pull request #12350:
URL: https://github.com/apache/flink/pull/12350


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] GJL removed a comment on pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
GJL removed a comment on pull request #12350:
URL: https://github.com/apache/flink/pull/12350#issuecomment-635358052


   > I'm registering the function with the on_exit callback, thus it needs to be accessible outside of run_test_with_timeout.
   
   Sorry, missed that. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rmetzger commented on a change in pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
rmetzger commented on a change in pull request #12350:
URL: https://github.com/apache/flink/pull/12350#discussion_r431877190



##########
File path: flink-end-to-end-tests/test-scripts/common.sh
##########
@@ -799,3 +799,32 @@ function extract_job_id_from_job_submission_return() {
     echo "$JOB_ID"
 }
 
+
+#
+# NOTE: This function requires at least Bash version >= 4. Mac OS in 2020 still ships 3.x
+#
+
+run_test_with_timeout() {

Review comment:
       Yes, I will merge it with `local`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12350:
URL: https://github.com/apache/flink/pull/12350#issuecomment-634485040


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5a37c0125f808a477da831a4a1a5acbaec92133e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2249",
       "triggerID" : "5a37c0125f808a477da831a4a1a5acbaec92133e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5a37c0125f808a477da831a4a1a5acbaec92133e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2249) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rmetzger commented on pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #12350:
URL: https://github.com/apache/flink/pull/12350#issuecomment-635388414


   Thank you for your review. Merging!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] GJL commented on a change in pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
GJL commented on a change in pull request #12350:
URL: https://github.com/apache/flink/pull/12350#discussion_r431300217



##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 
+    cmdpid=$BASHPID; 

Review comment:
       There is trailing white space on this line.

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {

Review comment:
       ```
   run_resume_savepoint_test() {
   ...
   }
   ```
   (no `function` keyword) is the preferred way in bash: https://www.tldp.org/LDP/abs/html/functions.html
   Since we are already mixing styles in Flink, this is not very important.

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 
+    cmdpid=$BASHPID; 
+    (sleep $TEST_TIMEOUT_SECONDS; # set a timeout for this test
+    echo "Test (pid: $cmdpid) did not finish after $TEST_TIMEOUT_SECONDS seconds."
+    echo "Printing Flink logs and killing it:"
+    cat ${FLINK_DIR}/log/* 
+    kill "$cmdpid") & watchdog_pid=$!
+    echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid
+    

Review comment:
       There is trailing white space on this line.

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 
+    cmdpid=$BASHPID; 
+    (sleep $TEST_TIMEOUT_SECONDS; # set a timeout for this test

Review comment:
       What does the `;` at the end of this line do? The semicolon permits putting two or more commands on the same line. Since we have only line here it should not be needed.

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 
+    cmdpid=$BASHPID; 
+    (sleep $TEST_TIMEOUT_SECONDS; # set a timeout for this test
+    echo "Test (pid: $cmdpid) did not finish after $TEST_TIMEOUT_SECONDS seconds."
+    echo "Printing Flink logs and killing it:"
+    cat ${FLINK_DIR}/log/* 

Review comment:
       There is trailing white space on this line.

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 
+    cmdpid=$BASHPID; 
+    (sleep $TEST_TIMEOUT_SECONDS; # set a timeout for this test
+    echo "Test (pid: $cmdpid) did not finish after $TEST_TIMEOUT_SECONDS seconds."
+    echo "Printing Flink logs and killing it:"
+    cat ${FLINK_DIR}/log/* 
+    kill "$cmdpid") & watchdog_pid=$!
+    echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid
+    
+    run_resume_savepoint_test

Review comment:
       Actually it would be nice to extract a reusable function that enables:
   
   `run_test_with_timeout 900 run_resume_savepoint_test`
   
   I think you can achieve this with `shift` and `$@`

##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
 
 source "$(dirname "$0")"/common.sh
 
+TEST_TIMEOUT_SECONDS=900
+
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 STATE_BACKEND_TYPE=${3:-file}
 STATE_BACKEND_FILE_ASYNC=${4:-true}
 STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
 
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$ORIGINAL_DOP
-else
-  NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
-  set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $ORIGINAL_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
-  | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
-  exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
-  echo "Savepoint $SAVEPOINT_PATH does not exist."
-  exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first and second execution,
-# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.parallelism $NEW_DOP \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1 \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
-# if state is errorneous and the state machine job produces alerting state transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+  if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+    NUM_SLOTS=$ORIGINAL_DOP
+  else
+    NUM_SLOTS=$NEW_DOP
+  fi
+
+  set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+  if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+    set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+  fi
+  set_config_key "metrics.fetcher.update-interval" "2000"
+
+  setup_flink_slf4j_metric_reporter
+
+  start_cluster
+
+  CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+  # run the DataStream allroundjob
+  TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $ORIGINAL_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # take a savepoint of the state machine job
+  SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+    | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+  wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+  # isolate the path without the scheme ("file:") and do the necessary checks
+  SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+  if [ -z "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint location was empty. This may mean that the stop-with-savepoint failed."
+    exit 1
+  elif [ ! -d "$SAVEPOINT_DIR" ]; then
+    echo "Savepoint $SAVEPOINT_PATH does not exist."
+    exit 1
+  fi
+
+  # Since it is not possible to differentiate reporter output between the first and second execution,
+  # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
+  OLD_NUM_METRICS=$(get_num_metric_samples)
+
+  # resume state machine job with savepoint
+  DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $NEW_DOP \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1 \
+    | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+  wait_job_running $DATASTREAM_JOB
+
+  wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+  # if state is errorneous and the state machine job produces alerting state transitions,
+  # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+    local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+    kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+( 

Review comment:
       There is trailing white space on this line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12350:
URL: https://github.com/apache/flink/pull/12350#issuecomment-634468809


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 5a37c0125f808a477da831a4a1a5acbaec92133e (Wed May 27 07:00:01 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rmetzger commented on pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #12350:
URL: https://github.com/apache/flink/pull/12350#issuecomment-635164566


   Thanks a lot for your review! I will address your comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12350:
URL: https://github.com/apache/flink/pull/12350#issuecomment-634485040


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5a37c0125f808a477da831a4a1a5acbaec92133e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2249",
       "triggerID" : "5a37c0125f808a477da831a4a1a5acbaec92133e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3909312e36831fa3c6ba8f79de5e42b7e8d8e856",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2354",
       "triggerID" : "3909312e36831fa3c6ba8f79de5e42b7e8d8e856",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3909312e36831fa3c6ba8f79de5e42b7e8d8e856 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2354) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12350:
URL: https://github.com/apache/flink/pull/12350#issuecomment-634485040


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5a37c0125f808a477da831a4a1a5acbaec92133e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2249",
       "triggerID" : "5a37c0125f808a477da831a4a1a5acbaec92133e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5a37c0125f808a477da831a4a1a5acbaec92133e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2249) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] GJL commented on pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
GJL commented on pull request #12350:
URL: https://github.com/apache/flink/pull/12350#issuecomment-635358052


   > I'm registering the function with the on_exit callback, thus it needs to be accessible outside of run_test_with_timeout.
   
   Sorry, missed that. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12350:
URL: https://github.com/apache/flink/pull/12350#issuecomment-634485040


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5a37c0125f808a477da831a4a1a5acbaec92133e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5a37c0125f808a477da831a4a1a5acbaec92133e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5a37c0125f808a477da831a4a1a5acbaec92133e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] GJL commented on a change in pull request #12350: [FLINK-17824][e2e] Introduce timeout to 'resume savepoint' test

Posted by GitBox <gi...@apache.org>.
GJL commented on a change in pull request #12350:
URL: https://github.com/apache/flink/pull/12350#discussion_r431854108



##########
File path: flink-end-to-end-tests/test-scripts/common.sh
##########
@@ -799,3 +799,32 @@ function extract_job_id_from_job_submission_return() {
     echo "$JOB_ID"
 }
 
+
+#
+# NOTE: This function requires at least Bash version >= 4. Mac OS in 2020 still ships 3.x
+#
+
+run_test_with_timeout() {

Review comment:
       `local` can still be added I think. The variables are only used in the subshell.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org