You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/04/03 10:36:51 UTC

flink git commit: [FLINK-8973] [E2E] HA end-to-end test with StateMachineExample.

Repository: flink
Updated Branches:
  refs/heads/master 6d0d366eb -> a666455c9


[FLINK-8973] [E2E] HA end-to-end test with StateMachineExample.

Adds an end-to-end test that runs the StateMachineExample on a local
cluster with HA enabled. There is a single JM which gets killed and
re-created and we check if the new JM picks up the job execution and
if at the end the StateMachine has no ALERTs printed.

This closes #5750.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a666455c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a666455c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a666455c

Branch: refs/heads/master
Commit: a666455c98c269c63373e991c6ca2751e132a7c8
Parents: 6d0d366
Author: kkloudas <kk...@gmail.com>
Authored: Thu Mar 15 13:13:46 2018 +0100
Committer: Timo Walther <tw...@apache.org>
Committed: Tue Apr 3 11:57:33 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/run-nightly-tests.sh     |   9 +
 flink-end-to-end-tests/test-scripts/common.sh   | 106 +++++++++-
 flink-end-to-end-tests/test-scripts/test_ha.sh  | 209 +++++++++++++++++++
 flink-examples/flink-examples-streaming/pom.xml |   8 +-
 .../statemachine/StateMachineExample.java       |  37 +++-
 5 files changed, 362 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 1ece1db..714dd2d 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -47,6 +47,15 @@ EXIT_CODE=0
 #     EXIT_CODE=$?
 # fi
 
+
+if [ $EXIT_CODE == 0 ]; then
+    printf "\n==============================================================================\n"
+    printf "Running HA end-to-end test\n"
+    printf "==============================================================================\n"
+    $END_TO_END_DIR/test-scripts/test_ha.sh
+    EXIT_CODE=$?
+fi
+
 if [ $EXIT_CODE == 0 ]; then
   printf "\n==============================================================================\n"
   printf "Running Resuming Savepoint (no parallelism change) end-to-end test\n"

http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index d4b9126..0db735a 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -39,6 +39,101 @@ cd $TEST_ROOT
 export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
 echo "TEST_DATA_DIR: $TEST_DATA_DIR"
 
+function revert_default_config() {
+
+    # revert our modifications to the masters file
+    if [ -f $FLINK_DIR/conf/masters.bak ]; then
+        rm $FLINK_DIR/conf/masters
+        mv $FLINK_DIR/conf/masters.bak $FLINK_DIR/conf/masters
+    fi
+
+    # revert our modifications to the Flink conf yaml
+    if [ -f $FLINK_DIR/conf/flink-conf.yaml.bak ]; then
+        rm $FLINK_DIR/conf/flink-conf.yaml
+        mv $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+    fi
+}
+
+function create_ha_config() {
+
+    # back up the masters and flink-conf.yaml
+    cp $FLINK_DIR/conf/masters $FLINK_DIR/conf/masters.bak
+    cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+
+    # clean up the dir that will be used for zookeeper storage
+    # (see high-availability.zookeeper.storageDir below)
+    if [ -e $TEST_DATA_DIR/recovery ]; then
+       echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
+       rm -rf $TEST_DATA_DIR/recovery
+    fi
+
+    # create the masters file (only one currently).
+    # This must have all the masters to be used in HA.
+    echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+    # then move on to create the flink-conf.yaml
+    sed 's/^    //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+    #==============================================================================
+    # Common
+    #==============================================================================
+
+    jobmanager.rpc.address: localhost
+    jobmanager.rpc.port: 6123
+    jobmanager.heap.mb: 1024
+    taskmanager.heap.mb: 1024
+    taskmanager.numberOfTaskSlots: 4
+    parallelism.default: 1
+
+    #==============================================================================
+    # High Availability
+    #==============================================================================
+
+    high-availability: zookeeper
+    high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
+    high-availability.zookeeper.quorum: localhost:2181
+    high-availability.zookeeper.path.root: /flink
+    high-availability.cluster-id: /test_cluster_one
+
+    #==============================================================================
+    # Web Frontend
+    #==============================================================================
+
+    web.port: 8081
+EOL
+}
+
+function start_ha_cluster {
+    create_ha_config
+    start_local_zk
+    start_cluster
+}
+
+function start_local_zk {
+    # Parses the zoo.cfg and starts locally zk.
+
+    # This is almost the same code as the
+    # /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost.
+
+    while read server ; do
+        server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
+
+        # match server.id=address[:port[:port]]
+        if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
+            id=${BASH_REMATCH[1]}
+            address=${BASH_REMATCH[2]}
+
+            if [ "${address}" != "localhost" ]; then
+                echo "[ERROR] Parse error. Only available for localhost."
+                PASS=""
+                exit 1
+            fi
+            ${FLINK_DIR}/bin/zookeeper.sh start $id
+        else
+            echo "[WARN] Parse error. Skipping config entry '$server'."
+        fi
+    done < <(grep "^server\." "${FLINK_DIR}/conf/zoo.cfg")
+}
+
 function start_cluster {
   "$FLINK_DIR"/bin/start-cluster.sh
 
@@ -62,6 +157,11 @@ function start_cluster {
 function stop_cluster {
   "$FLINK_DIR"/bin/stop-cluster.sh
 
+  # stop zookeeper only if there are processes running
+  if ! [ `jps | grep 'FlinkZooKeeperQuorumPeer' | wc -l` -eq 0 ]; then
+    "$FLINK_DIR"/bin/zookeeper.sh stop
+  fi
+
   if grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \
       | grep -v "RetriableCommitFailedException" \
       | grep -v "NoAvailableBrokersException" \
@@ -107,8 +207,6 @@ function stop_cluster {
     cat $FLINK_DIR/log/*.out
     PASS=""
   fi
-
-  rm $FLINK_DIR/log/*
 }
 
 function wait_job_running {
@@ -201,7 +299,9 @@ function s3_delete {
 # make sure to clean up even in case of failures
 function cleanup {
   stop_cluster
-  rm -r $TEST_DATA_DIR
   check_all_pass
+  rm -rf $TEST_DATA_DIR
+  rm $FLINK_DIR/log/*
+  revert_default_config
 }
 trap cleanup EXIT

http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-end-to-end-tests/test-scripts/test_ha.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_ha.sh b/flink-end-to-end-tests/test-scripts/test_ha.sh
new file mode 100755
index 0000000..2e65504
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_ha.sh
@@ -0,0 +1,209 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR=$FLINK_DIR/examples/streaming/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
+
+JM_WATCHDOG_PID=0
+TM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+    if [ ${CLEARED} -eq 0 ]; then
+
+        if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+            echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+            kill ${JM_WATCHDOG_PID} 2> /dev/null
+            wait ${JM_WATCHDOG_PID} 2> /dev/null
+        fi
+
+        if ! [ ${TM_WATCHDOG_PID} -eq 0 ]; then
+            echo "Killing TM watchdog @ ${TM_WATCHDOG_PID}"
+            kill ${TM_WATCHDOG_PID} 2> /dev/null
+            wait ${TM_WATCHDOG_PID} 2> /dev/null
+        fi
+
+        cleanup
+        CLEARED=1
+    fi
+}
+
+function verify_logs() {
+    local OUTPUT=$1
+    local JM_FAILURES=$2
+
+    # verify that we have no alerts
+    if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+        echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
+        PASS=""
+    fi
+
+    # checks that all apart from the first JM recover the failed jobgraph.
+    if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then
+        echo "FAILURE: A JM did not take over."
+        PASS=""
+    fi
+
+    # search the logs for JMs that log completed checkpoints
+    if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then
+        echo "FAILURE: A JM did not execute the job."
+        PASS=""
+    fi
+
+    if [[ ! "$PASS" ]]; then
+        echo "One or more tests FAILED."
+        exit 1
+    fi
+}
+
+function jm_watchdog() {
+    local EXPECTED_JMS=$1
+    local IP_PORT=$2
+
+    while true; do
+        local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`;
+        local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+        for (( c=0; c<MISSING_JMS; c++ )); do
+            "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${IP_PORT}
+        done
+        sleep 5;
+    done
+}
+
+function kill_jm {
+    local JM_PIDS=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
+    local JM_PIDS=(${JM_PIDS[@]})
+    local PID=${JM_PIDS[0]}
+    kill -9 ${PID}
+
+    echo "Killed JM @ ${PID}"
+}
+
+function tm_watchdog() {
+    local JOB_ID=$1
+    local EXPECTED_TMS=$2
+
+    # the number of already seen successful checkpoints
+    local SUCCESSFUL_CHCKP=0
+
+    while true; do
+
+        # check how many successful checkpoints we have
+        # and kill a TM only if the previous one already had some
+
+        local CHECKPOINTS=`curl -s "http://localhost:8081/jobs/${JOB_ID}/checkpoints" | cut -d ":" -f 6 | sed 's/,.*//'`
+
+        if [[ ${CHECKPOINTS} =~ '^[0-9]+$' ]] || [[ ${CHECKPOINTS} == "" ]]; then
+
+            # this may be the case during leader election.
+            # in this case we retry later with a smaller interval
+            sleep 5; continue
+
+        elif [ "${CHECKPOINTS}" -ne "${SUCCESSFUL_CHCKP}" ]; then
+
+            # we are not only searching for > because when the JM goes down,
+            # the job starts with reporting 0 successful checkpoints
+
+            local RUNNING_TMS=`jps | grep 'TaskManager' | wc -l`
+            local TM_PIDS=`jps | grep 'TaskManager' | cut -d " " -f 1`
+
+            local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS))
+            if [ ${MISSING_TMS} -eq 0 ]; then
+                # start a new TM only if we have exactly the expected number
+                "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null
+            fi
+
+            # kill an existing one
+            local TM_PIDS=(${TM_PIDS[@]})
+            local PID=${TM_PIDS[0]}
+            kill -9 ${PID}
+
+            echo "Killed TM @ ${PID}"
+
+            SUCCESSFUL_CHCKP=${CHECKPOINTS}
+        fi
+
+        sleep 11;
+    done
+}
+
+function run_ha_test() {
+    local PARALLELISM=$1
+    local BACKEND=$2
+    local ASYNC=$3
+    local INCREM=$4
+    local OUTPUT=$5
+
+    local JM_KILLS=3
+    local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/"
+
+    CLEARED=0
+
+    # start the cluster on HA mode
+    start_ha_cluster
+
+    echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
+
+    # submit a job in detached mode and let it run
+    local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
+     $TEST_PROGRAM_JAR \
+        --backend ${BACKEND} \
+        --checkpoint-dir "file://${CHECKPOINT_DIR}" \
+        --async-checkpoints ${ASYNC} \
+        --incremental-checkpoints ${INCREM} \
+        --output ${OUTPUT} | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+    wait_job_running ${JOB_ID}
+
+    # start the watchdog that keeps the number of JMs stable
+    jm_watchdog 1 "8081" &
+    JM_WATCHDOG_PID=$!
+    echo "Running JM watchdog @ ${JM_WATCHDOG_PID}"
+
+    sleep 5
+
+    # start the watchdog that keeps the number of TMs stable
+    tm_watchdog ${JOB_ID} 1 &
+    TM_WATCHDOG_PID=$!
+    echo "Running TM watchdog @ ${TM_WATCHDOG_PID}"
+
+    # let the job run for a while to take some checkpoints
+    sleep 20
+
+    for (( c=0; c<${JM_KILLS}; c++ )); do
+        # kill the JM and wait for watchdog to
+        # create a new one which will take over
+        kill_jm
+        sleep 60
+    done
+
+    verify_logs ${OUTPUT} ${JM_KILLS}
+
+    # kill the cluster and zookeeper
+    stop_cluster_and_watchdog
+}
+
+trap stop_cluster_and_watchdog EXIT
+run_ha_test 4 "file" "false" "false" "${TEST_DATA_DIR}/output.txt"
+run_ha_test 4 "rocks" "false" "false" "${TEST_DATA_DIR}/output.txt"
+run_ha_test 4 "file" "true" "false" "${TEST_DATA_DIR}/output.txt"
+run_ha_test 4 "rocks" "false" "true" "${TEST_DATA_DIR}/output.txt"

http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-examples/flink-examples-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index ea253d8..6ff5512 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -84,7 +84,13 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
-	</dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+    </dependencies>
 
 	<build>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
index 14757fb..054ed0a 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -55,6 +58,12 @@ public class StateMachineExample {
 
 		System.out.println("Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]");
 		System.out.println("Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]");
+		System.out.println("Options for both the above setups: ");
+		System.out.println("\t[--backend <file|rocks>]");
+		System.out.println("\t[--checkpoint-dir <filepath>]");
+		System.out.println("\t[--async-checkpoints <true|false>]");
+		System.out.println("\t[--incremental-checkpoints <true|false>]");
+		System.out.println("\t[--output <filepath> OR null for stdout]");
 		System.out.println();
 
 		// ---- determine whether to use the built-in source, or read from Kafka ----
@@ -92,7 +101,23 @@ public class StateMachineExample {
 
 		// create the environment to create streams and configure execution
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.enableCheckpointing(5000);
+		env.enableCheckpointing(2000L);
+
+		final String stateBackend = params.get("backend", "memory");
+		if ("file".equals(stateBackend)) {
+			final String checkpointDir = params.get("checkpoint-dir");
+			boolean asyncCheckpoints = params.getBoolean("async-checkpoints", false);
+			env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
+		} else if ("rocks".equals(stateBackend)) {
+			final String checkpointDir = params.get("checkpoint-dir");
+			boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false);
+			env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
+		}
+
+		final String outputFile = params.get("output");
+
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(params);
 
 		DataStream<Event> events = env.addSource(source);
 
@@ -105,7 +130,13 @@ public class StateMachineExample {
 				.flatMap(new StateMachineMapper());
 
 		// output the alerts to std-out
-		alerts.print();
+		if (outputFile == null) {
+			alerts.print();
+		} else {
+			alerts
+				.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE)
+				.setParallelism(1);
+		}
 
 		// trigger program execution
 		env.execute("State machine job");
@@ -140,7 +171,7 @@ public class StateMachineExample {
 				state = State.Initial;
 			}
 
-			// ask the state machine what state we should go to based on teh given event
+			// ask the state machine what state we should go to based on the given event
 			State nextState = state.transition(evt.type());
 
 			if (nextState == State.InvalidTransition) {