You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/17 14:50:52 UTC

[flink-kubernetes-operator] 02/03: [FLINK-26142] Add e2e test and enable in github actions

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 78e6005b5dac7d6a140f98b6c5b941914d907f60
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Wed Feb 16 16:19:34 2022 +0800

    [FLINK-26142] Add e2e test and enable in github actions
---
 .github/workflows/ci.yml                    |  38 ++++++++++-
 e2e-tests/data/cr.yaml                      |  93 ++++++++++++++++++++++++++
 e2e-tests/test_kubernetes_application_ha.sh |  62 +++++++++++++++++
 e2e-tests/utils.sh                          | 100 ++++++++++++++++++++++++++++
 4 files changed, 292 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 7b3c557..1d299ab 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -65,4 +65,40 @@ jobs:
       - name: Stop the operator
         run: |
           helm uninstall flink-operator
-
+  e2e_ci:
+    runs-on: ubuntu-latest
+    name: e2e_ci
+    steps:
+      - uses: actions/checkout@v2
+      - name: Set up JDK 11
+        uses: actions/setup-java@v2
+        with:
+          java-version: '11'
+          distribution: 'adopt'
+      - name: Build with Maven
+        run: mvn clean install
+      - name: Start minikube
+        uses: medyagh/setup-minikube@master
+      - name: Install cert-manager
+        run: |
+          kubectl get pods -A
+          kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
+          kubectl -n cert-manager wait --all=true --for=condition=Ready --timeout=300s pod
+      - name: Build image
+        run: |
+          export SHELL=/bin/bash
+          export DOCKER_BUILDKIT=1
+          eval $(minikube -p minikube docker-env)
+          docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .
+          docker images
+      - name: Start the operator
+        run: |
+          helm install flink-operator helm/flink-operator --set image.repository=flink-kubernetes-operator --set image.tag=ci-latest
+          kubectl wait --for=condition=Available --timeout=120s deploy/flink-operator
+          kubectl get pods
+      - name: Run Flink e2e tests
+        run: |
+          ./e2e-tests/test_kubernetes_application_ha.sh
+      - name: Stop the operator
+        run: |
+          helm uninstall flink-operator
diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/cr.yaml
new file mode 100644
index 0000000..cb04df1
--- /dev/null
+++ b/e2e-tests/data/cr.yaml
@@ -0,0 +1,93 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1alpha1
+kind: FlinkDeployment
+metadata:
+  namespace: default
+  name: flink-example-statemachine
+spec:
+  image: flink:1.14.3
+  flinkVersion: 1.14.3
+  flinkConfiguration:
+    kubernetes.service-account: flink-operator
+    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.storageDir: file:///opt/flink/volume/flink-ha
+    state.checkpoints.dir: file:///opt/flink/volume/flink-cp
+    state.savepoints.dir: file:///opt/flink/volume/flink-sp
+  podTemplate:
+    apiVersion: v1
+    kind: Pod
+    metadata:
+      name: pod-template
+    spec:
+      initContainers:
+        - name: artifacts-fetcher
+          image: busybox:latest
+          imagePullPolicy: IfNotPresent
+          # Use wget or other tools to get user jars from remote storage
+          command: [ 'wget', 'https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.3/flink-examples-streaming_2.12-1.14.3.jar', '-O', '/flink-artifact/myjob.jar' ]
+          volumeMounts:
+            - mountPath: /flink-artifact
+              name: flink-artifact
+      containers:
+        # Do not change the main container name
+        - name: flink-main-container
+          resources:
+            requests:
+              ephemeral-storage: 2048Mi
+            limits:
+              ephemeral-storage: 2048Mi
+          volumeMounts:
+            - mountPath: /opt/flink/usrlib
+              name: flink-artifact
+            - mountPath: /opt/flink/volume
+              name: flink-volume
+      volumes:
+        - name: flink-artifact
+          emptyDir: { }
+        - name: flink-volume
+          persistentVolumeClaim:
+            claimName: flink-example-statemachine
+  jobManager:
+    replicas: 1
+    resource:
+      memory: "1024m"
+      cpu: 0.5
+  taskManager:
+    taskSlots: 2
+    resource:
+      memory: "1024m"
+      cpu: 0.5
+  job:
+    jarURI: local:///opt/flink/usrlib/myjob.jar
+    entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
+    parallelism: 2
+
+---
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: flink-example-statemachine
+spec:
+  accessModes:
+    - ReadWriteOnce
+  volumeMode: Filesystem
+  resources:
+    requests:
+      storage: 1Gi
diff --git a/e2e-tests/test_kubernetes_application_ha.sh b/e2e-tests/test_kubernetes_application_ha.sh
new file mode 100755
index 0000000..b91eab4
--- /dev/null
+++ b/e2e-tests/test_kubernetes_application_ha.sh
@@ -0,0 +1,62 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/utils.sh
+
+CLUSTER_ID="flink-example-statemachine"
+TIMEOUT=300
+
+function cleanup_and_exit() {
+    if [ $TRAPPED_EXIT_CODE != 0 ];then
+      debug_and_show_logs
+    fi
+
+    kubectl delete -f e2e-tests/data/cr.yaml
+    kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}"
+    kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability"
+}
+
+on_exit cleanup_and_exit
+
+retry_times 5 30 "kubectl apply -f e2e-tests/data/cr.yaml" || exit 1
+
+retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1
+
+kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1
+jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')
+
+echo "Waiting for jobmanager pod ${jm_pod_name} ready."
+kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1
+
+wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1
+
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
+
+job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}')
+
+# Kill the JobManager
+echo "Kill the $jm_pod_name"
+kubectl exec $jm_pod_name -- /bin/sh -c "kill 1"
+
+# Check the new JobManager recovering from latest successful checkpoint
+wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" ${TIMEOUT} || exit 1
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
+
+echo "Successfully run the Flink Kubernetes application HA test"
+
diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
new file mode 100644
index 0000000..70bc56b
--- /dev/null
+++ b/e2e-tests/utils.sh
@@ -0,0 +1,100 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+function wait_for_logs {
+  local jm_pod_name=$1
+  local successful_response_regex=$2
+  local timeout=$3
+
+  # wait or timeout until the log shows up
+  echo "Waiting for log \"$2\"..."
+  for i in $(seq 1 ${timeout}); do
+    if kubectl logs $jm_pod_name | grep -E "${successful_response_regex}" >/dev/null; then
+      echo "Log \"$2\" shows up."
+      return
+    fi
+
+    sleep 1
+  done
+  echo "Log $2 does not show up within a timeout of ${timeout} sec"
+  exit 1
+}
+
+function retry_times() {
+    local retriesNumber=$1
+    local backoff=$2
+    local command="$3"
+
+    for i in $(seq 1 ${retriesNumber})
+    do
+        if ${command}; then
+            return 0
+        fi
+
+        echo "Command: ${command} failed. Retrying..."
+        sleep ${backoff}
+    done
+
+    echo "Command: ${command} failed ${retriesNumber} times."
+    return 1
+}
+
+function debug_and_show_logs {
+    echo "Debugging failed e2e test:"
+    echo "Currently existing Kubernetes resources"
+    kubectl get all
+    kubectl describe all
+
+    echo "Flink logs:"
+    kubectl get pods -o jsonpath='{range .items[*]}{.metadata.name}{"\n"}{end}' | while read pod;do
+        echo "Current logs for $pod: "
+        kubectl logs $pod;
+        restart_count=$(kubectl get pod $pod -o jsonpath='{.status.containerStatuses[0].restartCount}')
+        if [[ ${restart_count} -gt 0 ]];then
+          echo "Previous logs for $pod: "
+          kubectl logs $pod --previous
+        fi
+    done
+}
+
+function _on_exit_callback {
+  # Export the exit code so that it could be used by the callback commands
+  export TRAPPED_EXIT_CODE=$?
+  # Un-register the callback, to avoid multiple invocations: some shells may treat some signals as subset of others.
+  trap "" INT EXIT
+  # Fast exit, if there is another keyboard interrupt.
+  trap "exit -1" INT
+
+  for command in "${_on_exit_commands[@]-}"; do
+    eval "${command}"
+  done
+}
+
+# Register for multiple signals: some shells interpret them as mutually exclusive.
+trap _on_exit_callback INT EXIT
+
+# Helper method to register a command that should be called on current script exit.
+# It allows to have multiple "on exit" commands to be called, compared to the built-in `trap "$command" EXIT`.
+# Note: tests should not use `trap $command INT|EXIT` directly, to avoid having "Highlander" situation.
+function on_exit {
+  local command="$1"
+
+  # Keep commands in reverse order, so commands would be executed in LIFO order.
+  _on_exit_commands=("${command}" "${_on_exit_commands[@]-}")
+}