You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/12/28 06:54:55 UTC

[flink] branch master updated: [FLINK-29859][e2e] Running TPC-DS with adaptive batch scheduler supports custom exceptions check

This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 14a61f36833 [FLINK-29859][e2e] Running TPC-DS with adaptive batch scheduler supports custom exceptions check
14a61f36833 is described below

commit 14a61f368332320d7e38cc93a04f95bb63c66788
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Mon Dec 26 10:20:05 2022 +0800

    [FLINK-29859][e2e] Running TPC-DS with adaptive batch scheduler supports custom exceptions check
    
    Speculative execution will cause some extra exceptions during execution, which are expected, we should skip these exceptions in logs check. The skipped exceptions are:
    1. ExecutionGraphException: The execution attempt
    2. Cannot find task to fail for execution
    3. ExceptionInChainedOperatorException: Could not forward element to next operator
    4. CancelTaskException: Buffer pool has already been destroyed
    
    This closes #21559
---
 flink-end-to-end-tests/run-nightly-tests.sh        |   2 +-
 flink-end-to-end-tests/test-scripts/common.sh      |  87 ++++++++----
 .../test-scripts/test-runner-common.sh             |  23 +--
 flink-end-to-end-tests/test-scripts/test_tpcds.sh  | 156 +++++++++++++--------
 4 files changed, 166 insertions(+), 102 deletions(-)

diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index b9eb22effaf..cce44508cc9 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -212,7 +212,7 @@ function run_group_2 {
 
     run_test "TPC-H end-to-end test" "$END_TO_END_DIR/test-scripts/test_tpch.sh"
     run_test "TPC-DS end-to-end test" "$END_TO_END_DIR/test-scripts/test_tpcds.sh"
-    run_test "TPC-DS end-to-end test with adaptive batch scheduler" "$END_TO_END_DIR/test-scripts/test_tpcds.sh AdaptiveBatch"
+    run_test "TPC-DS end-to-end test with adaptive batch scheduler" "$END_TO_END_DIR/test-scripts/test_tpcds.sh AdaptiveBatch run_test" "custom_check_exceptions" "$END_TO_END_DIR/test-scripts/test_tpcds.sh AdaptiveBatch check_exceptions"
 
     run_test "Heavy deployment end-to-end test" "$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh" "skip_check_exceptions"
 
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index d7ea2b87413..31741710b3a 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -390,38 +390,59 @@ function check_logs_for_errors {
   fi
 }
 
-function check_logs_for_exceptions {
+# check logs for exceptions, the arguments are the additional allowed exceptions
+function internal_check_logs_for_exceptions {
   echo "Checking for exceptions..."
-  exception_count=$(grep -rv "GroupCoordinatorNotAvailableException" $FLINK_LOG_DIR \
-   | grep -v "due to CancelTaskException" \
-   | grep -v "RetriableCommitFailedException" \
-   | grep -v "NoAvailableBrokersException" \
-   | grep -v "Async Kafka commit failed" \
-   | grep -v "DisconnectException" \
-   | grep -v "Cannot connect to ResourceManager right now" \
-   | grep -v "AskTimeoutException" \
-   | grep -v "WARN  akka.remote.transport.netty.NettyTransport" \
-   | grep -v  "WARN  org.jboss.netty.channel.DefaultChannelPipeline" \
-   | grep -v 'INFO.*AWSErrorCode' \
-   | grep -v "RejectedExecutionException" \
-   | grep -v "CancellationException" \
-   | grep -v "An exception was thrown by an exception handler" \
-   | grep -v "Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException" \
-   | grep -v "Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration" \
-   | grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException" \
-   | grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration" \
-   | grep -v "java.lang.Exception: Execution was suspended" \
-   | grep -v "java.io.InvalidClassException: org.apache.flink.formats.avro.typeutils.AvroSerializer" \
-   | grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \
-   | grep -v "java.lang.Exception: Artificial failure" \
-   | grep -v "org.apache.flink.runtime.checkpoint.CheckpointException" \
-   | grep -v "org.apache.flink.runtime.JobException: Recovery is suppressed" \
-   | grep -v "WARN  akka.remote.ReliableDeliverySupervisor" \
-   | grep -v "RecipientUnreachableException" \
-   | grep -v "SerializedCheckpointException.unwrap" \
-   | grep -v "ExecutionGraphException: The execution attempt" \
-   | grep -v "Cannot find task to fail for execution" \
-   | grep -ic "exception" || true)
+
+  local additional_allowed_exceptions=()
+  local index=0
+  for exception in "$@"; do
+    additional_allowed_exceptions[index]="$exception"
+    index=$index+1
+  done
+
+  local default_allowed_exceptions=("GroupCoordinatorNotAvailableException" \
+  "due to CancelTaskException" \
+  "RetriableCommitFailedException" \
+  "NoAvailableBrokersException" \
+  "Async Kafka commit failed" \
+  "DisconnectException" \
+  "Cannot connect to ResourceManager right now" \
+  "AskTimeoutException" \
+  "WARN  akka.remote.transport.netty.NettyTransport" \
+  "WARN  org.jboss.netty.channel.DefaultChannelPipeline" \
+  'INFO.*AWSErrorCode' \
+  "RejectedExecutionException" \
+  "CancellationException" \
+  "An exception was thrown by an exception handler" \
+  "Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException" \
+  "Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration" \
+  "java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException" \
+  "java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration" \
+  "java.lang.Exception: Execution was suspended" \
+  "java.io.InvalidClassException: org.apache.flink.formats.avro.typeutils.AvroSerializer" \
+  "Caused by: java.lang.Exception: JobManager is shutting down" \
+  "java.lang.Exception: Artificial failure" \
+  "org.apache.flink.runtime.checkpoint.CheckpointException" \
+  "org.apache.flink.runtime.JobException: Recovery is suppressed" \
+  "WARN  akka.remote.ReliableDeliverySupervisor" \
+  "RecipientUnreachableException" \
+  "SerializedCheckpointException.unwrap")
+
+  local all_allowed_exceptions=("${default_allowed_exceptions[@]}" "${additional_allowed_exceptions[@]}")
+
+  # generate the grep command
+  local grep_command=""
+  for exception in "${all_allowed_exceptions[@]}"; do
+    if [[ $grep_command == "" ]]; then
+      grep_command="grep -rv \"$exception\" $FLINK_LOG_DIR"
+    else
+      grep_command="$grep_command | grep -v \"$exception\""
+    fi
+  done
+  grep_command="$grep_command | grep -ic \"exception\" || true"
+
+  exception_count=$(eval "$grep_command")
   if [[ ${exception_count} -gt 0 ]]; then
     echo "Found exception in log files; printing first 500 lines; see full logs for details:"
     find $FLINK_LOG_DIR/ -type f -exec head -n 500 {} \;
@@ -431,6 +452,10 @@ function check_logs_for_exceptions {
   fi
 }
 
+function check_logs_for_exceptions() {
+  internal_check_logs_for_exceptions
+}
+
 function check_logs_for_non_empty_out_files {
   echo "Checking for non-empty .out files..."
   # exclude reflective access warnings as these are expected (and currently unavoidable) on Java 9
diff --git a/flink-end-to-end-tests/test-scripts/test-runner-common.sh b/flink-end-to-end-tests/test-scripts/test-runner-common.sh
index 3a17d15b19c..662a91a7888 100644
--- a/flink-end-to-end-tests/test-scripts/test-runner-common.sh
+++ b/flink-end-to-end-tests/test-scripts/test-runner-common.sh
@@ -26,12 +26,14 @@ export FLINK_VERSION=$(MVN_RUN_VERBOSE=false run_mvn --file ${END_TO_END_DIR}/po
 # Arguments:
 #   $1: description of the test
 #   $2: command to execute
-#   $3: check logs for erors & exceptions
+#   $3: check logs action
+#   $4: the command to check the log, if $3 is "custom_check_exceptions"
 #######################################
 function run_test {
     local description="$1"
     local command="$2"
-    local skip_check_exceptions=${3:-}
+    local check_logs_action=${3:-}
+    local custom_check_logs_command=${4:-}
 
     printf "\n==============================================================================\n"
     printf "Running '${description}'\n"
@@ -46,7 +48,7 @@ function run_test {
 
     function test_error() {
       echo "[FAIL] Test script contains errors."
-      post_test_validation 1 "$description" "$skip_check_exceptions"
+      post_test_validation 1 "$description" "$check_logs_action" "$custom_check_logs_command"
     }
     # set a trap to catch a test execution error
     trap 'test_error' ERR
@@ -58,23 +60,28 @@ function run_test {
     exit_code="$?"
     # remove trap for test execution
     trap - ERR
-    post_test_validation ${exit_code} "$description" "$skip_check_exceptions"
+    post_test_validation ${exit_code} "$description" "$check_logs_action" "$custom_check_logs_command"
 }
 
 # Validates the test result and exit code after its execution.
 function post_test_validation {
     local exit_code="$1"
     local description="$2"
-    local skip_check_exceptions="$3"
+    local check_logs_action="$3"
+    local custom_check_logs_command="$4"
 
     local time_elapsed=$(end_timer)
 
-    if [[ "${skip_check_exceptions}" != "skip_check_exceptions" ]]; then
+    if [[ "${check_logs_action}" == "skip_check_exceptions" ]]; then
+        echo "Checking of logs skipped."
+    elif [[ "${check_logs_action}" == "custom_check_exceptions" ]]; then
+        echo "Custom exception checking of logs."
+        ${custom_check_logs_command}
+        EXIT_CODE="$?"
+    else
         check_logs_for_errors
         check_logs_for_exceptions
         check_logs_for_non_empty_out_files
-    else
-        echo "Checking of logs skipped."
     fi
 
     # Investigate exit_code for failures of test executable as well as EXIT_CODE for failures of the test.
diff --git a/flink-end-to-end-tests/test-scripts/test_tpcds.sh b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
index 859135f0921..8c1ddee1f0a 100755
--- a/flink-end-to-end-tests/test-scripts/test_tpcds.sh
+++ b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
@@ -24,86 +24,118 @@ USE_TABLE_STATS=true
 
 source "$(dirname "$0")"/common.sh
 
-################################################################################
-# Generate test data
-################################################################################
-TPCDS_TOOL_DIR="$END_TO_END_DIR/flink-tpcds-test/tpcds-tool"
-ORGIN_ANSWER_DIR="$END_TO_END_DIR/flink-tpcds-test/tpcds-tool/answer_set"
-TPCDS_QUERY_DIR="$END_TO_END_DIR/flink-tpcds-test/tpcds-tool/query"
+function run_test() {
+    ################################################################################
+    # Generate test data
+    ################################################################################
+    TPCDS_TOOL_DIR="$END_TO_END_DIR/flink-tpcds-test/tpcds-tool"
+    ORGIN_ANSWER_DIR="$END_TO_END_DIR/flink-tpcds-test/tpcds-tool/answer_set"
+    TPCDS_QUERY_DIR="$END_TO_END_DIR/flink-tpcds-test/tpcds-tool/query"
 
-TARGET_DIR="$END_TO_END_DIR/flink-tpcds-test/target"
+    TARGET_DIR="$END_TO_END_DIR/flink-tpcds-test/target"
 
-TPCDS_GENERATOR_DIR_DIR="$TARGET_DIR/generator"
-TPCDS_DATA_DIR="$TARGET_DIR/table"
+    TPCDS_GENERATOR_DIR_DIR="$TARGET_DIR/generator"
+    TPCDS_DATA_DIR="$TARGET_DIR/table"
 
-mkdir -p "$TPCDS_GENERATOR_DIR_DIR"
-mkdir -p "$TPCDS_DATA_DIR"
+    mkdir -p "$TPCDS_GENERATOR_DIR_DIR"
+    mkdir -p "$TPCDS_DATA_DIR"
 
-cd "$TPCDS_TOOL_DIR"
-# use relative path, because tpcds gennerator cannot recognize path which is too long.
-TPCDS_GENERATOR_RELATIVE_DIR="../target/generator"
-TPCDS_DATA_RELATIVE_DIR="../table"
+    cd "$TPCDS_TOOL_DIR"
+    # use relative path, because tpcds gennerator cannot recognize path which is too long.
+    TPCDS_GENERATOR_RELATIVE_DIR="../target/generator"
+    TPCDS_DATA_RELATIVE_DIR="../table"
 
-${TPCDS_TOOL_DIR}/data_generator.sh "$TPCDS_GENERATOR_RELATIVE_DIR" "$SCALE" "$TPCDS_DATA_RELATIVE_DIR" "$END_TO_END_DIR/test-scripts"
+    ${TPCDS_TOOL_DIR}/data_generator.sh "$TPCDS_GENERATOR_RELATIVE_DIR" "$SCALE" "$TPCDS_DATA_RELATIVE_DIR" "$END_TO_END_DIR/test-scripts"
 
-cd "$END_TO_END_DIR"
+    cd "$END_TO_END_DIR"
 
-################################################################################
-# Prepare Flink
-################################################################################
+    ################################################################################
+    # Prepare Flink
+    ################################################################################
 
-echo "[INFO]Preparing Flink cluster..."
+    echo "[INFO]Preparing Flink cluster..."
 
-SCHEDULER="${1:-Default}"
+    local scheduler="$1"
 
-set_config_key "jobmanager.scheduler" "${SCHEDULER}"
-set_config_key "taskmanager.memory.process.size" "4096m"
-set_config_key "taskmanager.memory.network.fraction" "0.2"
-
-if [ "${SCHEDULER}" == "Default" ]; then
-    set_config_key "taskmanager.numberOfTaskSlots" "4"
-    set_config_key "parallelism.default" "4"
-elif [ "${SCHEDULER}" == "AdaptiveBatch" ]; then
-    set_config_key "taskmanager.numberOfTaskSlots" "8"
-    set_config_key "parallelism.default" "-1"
-    set_config_key "jobmanager.adaptive-batch-scheduler.max-parallelism" "8"
-    set_config_key "jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task" "6m"
-    set_config_key "jobmanager.adaptive-batch-scheduler.speculative.enabled" "true"
-    set_config_key "jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration" "0s"
-    set_config_key "slow-task-detector.execution-time.baseline-ratio" "0.0"
-    set_config_key "slow-task-detector.execution-time.baseline-lower-bound" "0s"
-else
-    echo "ERROR: Scheduler ${SCHEDULER} is unsupported for tpcds test. Aborting..."
-    exit 1
-fi
+    set_config_key "jobmanager.scheduler" "${scheduler}"
+    set_config_key "taskmanager.memory.process.size" "4096m"
+    set_config_key "taskmanager.memory.network.fraction" "0.2"
 
-start_cluster
+    if [ "${scheduler}" == "Default" ]; then
+        set_config_key "taskmanager.numberOfTaskSlots" "4"
+        set_config_key "parallelism.default" "4"
+    elif [ "${scheduler}" == "AdaptiveBatch" ]; then
+        set_config_key "taskmanager.numberOfTaskSlots" "8"
+        set_config_key "parallelism.default" "-1"
+        set_config_key "jobmanager.adaptive-batch-scheduler.max-parallelism" "8"
+        set_config_key "jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task" "6m"
+        set_config_key "jobmanager.adaptive-batch-scheduler.speculative.enabled" "true"
+        set_config_key "jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration" "0s"
+        set_config_key "slow-task-detector.execution-time.baseline-ratio" "0.0"
+        set_config_key "slow-task-detector.execution-time.baseline-lower-bound" "0s"
+    else
+        echo "ERROR: Scheduler ${scheduler} is unsupported for tpcds test. Aborting..."
+        exit 1
+    fi
 
+    start_cluster
 
-################################################################################
-# Run TPC-DS SQL
-################################################################################
 
-echo "[INFO] Runing TPC-DS queries..."
+    ################################################################################
+    # Run TPC-DS SQL
+    ################################################################################
 
-RESULT_DIR="$TARGET_DIR/result"
-mkdir -p "$RESULT_DIR"
+    echo "[INFO] Runing TPC-DS queries..."
 
-$FLINK_DIR/bin/flink run -c org.apache.flink.table.tpcds.TpcdsTestProgram "$TARGET_DIR/TpcdsTestProgram.jar" -sourceTablePath "$TPCDS_DATA_DIR" -queryPath "$TPCDS_QUERY_DIR" -sinkTablePath "$RESULT_DIR" -useTableStats "$USE_TABLE_STATS"
+    RESULT_DIR="$TARGET_DIR/result"
+    mkdir -p "$RESULT_DIR"
 
-################################################################################
-# validate result
-################################################################################
-QUALIFIED_ANSWER_DIR="$TARGET_DIR/answer_set_qualified"
-mkdir -p "$QUALIFIED_ANSWER_DIR"
+    $FLINK_DIR/bin/flink run -c org.apache.flink.table.tpcds.TpcdsTestProgram "$TARGET_DIR/TpcdsTestProgram.jar" -sourceTablePath "$TPCDS_DATA_DIR" -queryPath "$TPCDS_QUERY_DIR" -sinkTablePath "$RESULT_DIR" -useTableStats "$USE_TABLE_STATS"
 
-java -cp "$TARGET_DIR/TpcdsTestProgram.jar:$TARGET_DIR/lib/*" org.apache.flink.table.tpcds.utils.AnswerFormatter -originDir "$ORGIN_ANSWER_DIR" -destDir "$QUALIFIED_ANSWER_DIR"
+    ################################################################################
+    # validate result
+    ################################################################################
+    QUALIFIED_ANSWER_DIR="$TARGET_DIR/answer_set_qualified"
+    mkdir -p "$QUALIFIED_ANSWER_DIR"
 
-java -cp "$TARGET_DIR/TpcdsTestProgram.jar:$TARGET_DIR/lib/*" org.apache.flink.table.tpcds.utils.TpcdsResultComparator -expectedDir "$QUALIFIED_ANSWER_DIR" -actualDir "$RESULT_DIR"
+    java -cp "$TARGET_DIR/TpcdsTestProgram.jar:$TARGET_DIR/lib/*" org.apache.flink.table.tpcds.utils.AnswerFormatter -originDir "$ORGIN_ANSWER_DIR" -destDir "$QUALIFIED_ANSWER_DIR"
 
-################################################################################
-# Clean-up generated data folder
-################################################################################
+    java -cp "$TARGET_DIR/TpcdsTestProgram.jar:$TARGET_DIR/lib/*" org.apache.flink.table.tpcds.utils.TpcdsResultComparator -expectedDir "$QUALIFIED_ANSWER_DIR" -actualDir "$RESULT_DIR"
+
+    ################################################################################
+    # Clean-up generated data folder
+    ################################################################################
+
+    rm -rf "${TPCDS_DATA_DIR}"
+    echo "Deleted all files under $TPCDS_DATA_DIR"
+}
+
+function check_logs_for_exceptions_for_adaptive_batch_scheduler {
+    local additional_allowed_exceptions=("ExecutionGraphException: The execution attempt" \
+    "Cannot find task to fail for execution" \
+    "ExceptionInChainedOperatorException: Could not forward element to next operator" \
+    "CancelTaskException: Buffer pool has already been destroyed")
+
+    internal_check_logs_for_exceptions "${additional_allowed_exceptions[@]}"
+}
+
+SCHEDULER="${1:-Default}"
+ACTION="${2:-run_test}"
+
+if [ "${ACTION}" == "run_test" ]; then
+    run_test "${SCHEDULER}"
+elif [ "${ACTION}" == "check_exceptions" ]; then
+    if [[ "${SCHEDULER}" != "AdaptiveBatch" ]]; then
+        echo "Only supports checking exceptions for adaptive batch scheduler."
+        exit 1
+    fi
+
+    check_logs_for_errors
+    check_logs_for_exceptions_for_adaptive_batch_scheduler
+    check_logs_for_non_empty_out_files
+else
+    echo "ERROR: Action ${ACTION} is unsupported for tpcds test. Aborting..."
+    exit 1
+fi
 
-rm -rf "${TPCDS_DATA_DIR}"
-echo "Deleted all files under $TPCDS_DATA_DIR"
+exit $EXIT_CODE