You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/02/25 08:01:10 UTC

[flink-kubernetes-operator] 02/02: [FLINK-26141] Add e2e test to guard last state upgrade

This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit baaed88b21a27df0eb26a5bdce4516ec3c7510c1
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Thu Feb 24 16:32:05 2022 +0800

    [FLINK-26141] Add e2e test to guard last state upgrade
    
    This closes #22.
---
 .github/workflows/ci.yml             |  5 ++-
 e2e-tests/data/cr.yaml               |  1 +
 e2e-tests/test_last_state_upgrade.sh | 68 ++++++++++++++++++++++++++++++++++++
 3 files changed, 73 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 343142a..6f3345a 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -106,7 +106,10 @@ jobs:
           kubectl get pods
       - name: Run Flink e2e tests
         run: |
-          ./e2e-tests/test_kubernetes_application_ha.sh
+          ls e2e-tests/test_*.sh | while read script_test;do \
+            echo "Running $script_test"
+            bash $script_test || exit 1
+          done
       - name: Stop the operator
         run: |
           helm uninstall flink-operator
diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/cr.yaml
index 936a1de..90069f9 100644
--- a/e2e-tests/data/cr.yaml
+++ b/e2e-tests/data/cr.yaml
@@ -78,6 +78,7 @@ spec:
     jarURI: local:///opt/flink/usrlib/myjob.jar
     entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
     parallelism: 2
+    upgradeMode: last-state
 
 ---
 apiVersion: v1
diff --git a/e2e-tests/test_last_state_upgrade.sh b/e2e-tests/test_last_state_upgrade.sh
new file mode 100755
index 0000000..20ca7d0
--- /dev/null
+++ b/e2e-tests/test_last_state_upgrade.sh
@@ -0,0 +1,68 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/utils.sh
+
+CLUSTER_ID="flink-example-statemachine"
+TIMEOUT=300
+
+function cleanup_and_exit() {
+    if [ $TRAPPED_EXIT_CODE != 0 ];then
+      debug_and_show_logs
+    fi
+
+    kubectl delete -f e2e-tests/data/cr.yaml
+    kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}"
+    kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability"
+}
+
+function wait_for_jobmanager_running() {
+    retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1
+
+    kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1
+    jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')
+
+    echo "Waiting for jobmanager pod ${jm_pod_name} ready."
+    kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1
+
+    wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1
+}
+
+on_exit cleanup_and_exit
+
+retry_times 5 30 "kubectl apply -f e2e-tests/data/cr.yaml" || exit 1
+
+wait_for_jobmanager_running
+
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
+
+job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}')
+
+# Update the FlinkDeployment and trigger the last state upgrade
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"jobManager": {"resource": {"cpu": 0.51, "memory": "1024m"} } } }'
+
+kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}"
+wait_for_jobmanager_running
+
+# Check the new JobManager recovering from latest successful checkpoint
+wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" ${TIMEOUT} || exit 1
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
+
+echo "Successfully run the last-state upgrade test"
+