You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/04/03 10:36:51 UTC
flink git commit: [FLINK-8973] [E2E] HA end-to-end test with
StateMachineExample.
Repository: flink
Updated Branches:
refs/heads/master 6d0d366eb -> a666455c9
[FLINK-8973] [E2E] HA end-to-end test with StateMachineExample.
Adds an end-to-end test that runs the StateMachineExample on a local
cluster with HA enabled. There is a single JM which gets killed and
re-created and we check if the new JM picks up the job execution and
if at the end the StateMachine has no ALERTs printed.
This closes #5750.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a666455c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a666455c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a666455c
Branch: refs/heads/master
Commit: a666455c98c269c63373e991c6ca2751e132a7c8
Parents: 6d0d366
Author: kkloudas <kk...@gmail.com>
Authored: Thu Mar 15 13:13:46 2018 +0100
Committer: Timo Walther <tw...@apache.org>
Committed: Tue Apr 3 11:57:33 2018 +0200
----------------------------------------------------------------------
flink-end-to-end-tests/run-nightly-tests.sh | 9 +
flink-end-to-end-tests/test-scripts/common.sh | 106 +++++++++-
flink-end-to-end-tests/test-scripts/test_ha.sh | 209 +++++++++++++++++++
flink-examples/flink-examples-streaming/pom.xml | 8 +-
.../statemachine/StateMachineExample.java | 37 +++-
5 files changed, 362 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 1ece1db..714dd2d 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -47,6 +47,15 @@ EXIT_CODE=0
# EXIT_CODE=$?
# fi
+
+if [ $EXIT_CODE == 0 ]; then
+ printf "\n==============================================================================\n"
+ printf "Running HA end-to-end test\n"
+ printf "==============================================================================\n"
+ $END_TO_END_DIR/test-scripts/test_ha.sh
+ EXIT_CODE=$?
+fi
+
if [ $EXIT_CODE == 0 ]; then
printf "\n==============================================================================\n"
printf "Running Resuming Savepoint (no parallelism change) end-to-end test\n"
http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index d4b9126..0db735a 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -39,6 +39,101 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function revert_default_config() {
+
+ # revert our modifications to the masters file
+ if [ -f $FLINK_DIR/conf/masters.bak ]; then
+ rm $FLINK_DIR/conf/masters
+ mv $FLINK_DIR/conf/masters.bak $FLINK_DIR/conf/masters
+ fi
+
+ # revert our modifications to the Flink conf yaml
+ if [ -f $FLINK_DIR/conf/flink-conf.yaml.bak ]; then
+ rm $FLINK_DIR/conf/flink-conf.yaml
+ mv $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+ fi
+}
+
+function create_ha_config() {
+
+ # back up the masters and flink-conf.yaml
+ cp $FLINK_DIR/conf/masters $FLINK_DIR/conf/masters.bak
+ cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+
+ # clean up the dir that will be used for zookeeper storage
+ # (see high-availability.zookeeper.storageDir below)
+ if [ -e $TEST_DATA_DIR/recovery ]; then
+ echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
+ rm -rf $TEST_DATA_DIR/recovery
+ fi
+
+ # create the masters file (only one currently).
+ # This must have all the masters to be used in HA.
+ echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+ # then move on to create the flink-conf.yaml
+ sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+ #==============================================================================
+ # Common
+ #==============================================================================
+
+ jobmanager.rpc.address: localhost
+ jobmanager.rpc.port: 6123
+ jobmanager.heap.mb: 1024
+ taskmanager.heap.mb: 1024
+ taskmanager.numberOfTaskSlots: 4
+ parallelism.default: 1
+
+ #==============================================================================
+ # High Availability
+ #==============================================================================
+
+ high-availability: zookeeper
+ high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
+ high-availability.zookeeper.quorum: localhost:2181
+ high-availability.zookeeper.path.root: /flink
+ high-availability.cluster-id: /test_cluster_one
+
+ #==============================================================================
+ # Web Frontend
+ #==============================================================================
+
+ web.port: 8081
+EOL
+}
+
+function start_ha_cluster {
+ create_ha_config
+ start_local_zk
+ start_cluster
+}
+
+function start_local_zk {
+ # Parses the zoo.cfg and starts locally zk.
+
+ # This is almost the same code as the
+ # /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost.
+
+ while read server ; do
+ server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
+
+ # match server.id=address[:port[:port]]
+ if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
+ id=${BASH_REMATCH[1]}
+ address=${BASH_REMATCH[2]}
+
+ if [ "${address}" != "localhost" ]; then
+ echo "[ERROR] Parse error. Only available for localhost."
+ PASS=""
+ exit 1
+ fi
+ ${FLINK_DIR}/bin/zookeeper.sh start $id
+ else
+ echo "[WARN] Parse error. Skipping config entry '$server'."
+ fi
+ done < <(grep "^server\." "${FLINK_DIR}/conf/zoo.cfg")
+}
+
function start_cluster {
"$FLINK_DIR"/bin/start-cluster.sh
@@ -62,6 +157,11 @@ function start_cluster {
function stop_cluster {
"$FLINK_DIR"/bin/stop-cluster.sh
+ # stop zookeeper only if there are processes running
+ if ! [ `jps | grep 'FlinkZooKeeperQuorumPeer' | wc -l` -eq 0 ]; then
+ "$FLINK_DIR"/bin/zookeeper.sh stop
+ fi
+
if grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \
| grep -v "RetriableCommitFailedException" \
| grep -v "NoAvailableBrokersException" \
@@ -107,8 +207,6 @@ function stop_cluster {
cat $FLINK_DIR/log/*.out
PASS=""
fi
-
- rm $FLINK_DIR/log/*
}
function wait_job_running {
@@ -201,7 +299,9 @@ function s3_delete {
# make sure to clean up even in case of failures
function cleanup {
stop_cluster
- rm -r $TEST_DATA_DIR
check_all_pass
+ rm -rf $TEST_DATA_DIR
+ rm $FLINK_DIR/log/*
+ revert_default_config
}
trap cleanup EXIT
http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-end-to-end-tests/test-scripts/test_ha.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_ha.sh b/flink-end-to-end-tests/test-scripts/test_ha.sh
new file mode 100755
index 0000000..2e65504
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_ha.sh
@@ -0,0 +1,209 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR=$FLINK_DIR/examples/streaming/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2
+
+JM_WATCHDOG_PID=0
+TM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+ if [ ${CLEARED} -eq 0 ]; then
+
+ if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+ echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+ kill ${JM_WATCHDOG_PID} 2> /dev/null
+ wait ${JM_WATCHDOG_PID} 2> /dev/null
+ fi
+
+ if ! [ ${TM_WATCHDOG_PID} -eq 0 ]; then
+ echo "Killing TM watchdog @ ${TM_WATCHDOG_PID}"
+ kill ${TM_WATCHDOG_PID} 2> /dev/null
+ wait ${TM_WATCHDOG_PID} 2> /dev/null
+ fi
+
+ cleanup
+ CLEARED=1
+ fi
+}
+
+function verify_logs() {
+ local OUTPUT=$1
+ local JM_FAILURES=$2
+
+ # verify that we have no alerts
+ if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+ echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate."
+ PASS=""
+ fi
+
+ # checks that all apart from the first JM recover the failed jobgraph.
+ if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then
+ echo "FAILURE: A JM did not take over."
+ PASS=""
+ fi
+
+ # search the logs for JMs that log completed checkpoints
+ if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then
+ echo "FAILURE: A JM did not execute the job."
+ PASS=""
+ fi
+
+ if [[ ! "$PASS" ]]; then
+ echo "One or more tests FAILED."
+ exit 1
+ fi
+}
+
+function jm_watchdog() {
+ local EXPECTED_JMS=$1
+ local IP_PORT=$2
+
+ while true; do
+ local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`;
+ local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+ for (( c=0; c<MISSING_JMS; c++ )); do
+ "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${IP_PORT}
+ done
+ sleep 5;
+ done
+}
+
+function kill_jm {
+ local JM_PIDS=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
+ local JM_PIDS=(${JM_PIDS[@]})
+ local PID=${JM_PIDS[0]}
+ kill -9 ${PID}
+
+ echo "Killed JM @ ${PID}"
+}
+
+function tm_watchdog() {
+ local JOB_ID=$1
+ local EXPECTED_TMS=$2
+
+ # the number of already seen successful checkpoints
+ local SUCCESSFUL_CHCKP=0
+
+ while true; do
+
+ # check how many successful checkpoints we have
+ # and kill a TM only if the previous one already had some
+
+ local CHECKPOINTS=`curl -s "http://localhost:8081/jobs/${JOB_ID}/checkpoints" | cut -d ":" -f 6 | sed 's/,.*//'`
+
+ if [[ ${CHECKPOINTS} =~ '^[0-9]+$' ]] || [[ ${CHECKPOINTS} == "" ]]; then
+
+ # this may be the case during leader election.
+ # in this case we retry later with a smaller interval
+ sleep 5; continue
+
+ elif [ "${CHECKPOINTS}" -ne "${SUCCESSFUL_CHCKP}" ]; then
+
+ # we are not only searching for > because when the JM goes down,
+ # the job starts with reporting 0 successful checkpoints
+
+ local RUNNING_TMS=`jps | grep 'TaskManager' | wc -l`
+ local TM_PIDS=`jps | grep 'TaskManager' | cut -d " " -f 1`
+
+ local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS))
+ if [ ${MISSING_TMS} -eq 0 ]; then
+ # start a new TM only if we have exactly the expected number
+ "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null
+ fi
+
+ # kill an existing one
+ local TM_PIDS=(${TM_PIDS[@]})
+ local PID=${TM_PIDS[0]}
+ kill -9 ${PID}
+
+ echo "Killed TM @ ${PID}"
+
+ SUCCESSFUL_CHCKP=${CHECKPOINTS}
+ fi
+
+ sleep 11;
+ done
+}
+
+function run_ha_test() {
+ local PARALLELISM=$1
+ local BACKEND=$2
+ local ASYNC=$3
+ local INCREM=$4
+ local OUTPUT=$5
+
+ local JM_KILLS=3
+ local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/"
+
+ CLEARED=0
+
+ # start the cluster on HA mode
+ start_ha_cluster
+
+ echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
+
+ # submit a job in detached mode and let it run
+ local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
+ $TEST_PROGRAM_JAR \
+ --backend ${BACKEND} \
+ --checkpoint-dir "file://${CHECKPOINT_DIR}" \
+ --async-checkpoints ${ASYNC} \
+ --incremental-checkpoints ${INCREM} \
+ --output ${OUTPUT} | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+ wait_job_running ${JOB_ID}
+
+ # start the watchdog that keeps the number of JMs stable
+ jm_watchdog 1 "8081" &
+ JM_WATCHDOG_PID=$!
+ echo "Running JM watchdog @ ${JM_WATCHDOG_PID}"
+
+ sleep 5
+
+ # start the watchdog that keeps the number of TMs stable
+ tm_watchdog ${JOB_ID} 1 &
+ TM_WATCHDOG_PID=$!
+ echo "Running TM watchdog @ ${TM_WATCHDOG_PID}"
+
+ # let the job run for a while to take some checkpoints
+ sleep 20
+
+ for (( c=0; c<${JM_KILLS}; c++ )); do
+ # kill the JM and wait for watchdog to
+ # create a new one which will take over
+ kill_jm
+ sleep 60
+ done
+
+ verify_logs ${OUTPUT} ${JM_KILLS}
+
+ # kill the cluster and zookeeper
+ stop_cluster_and_watchdog
+}
+
+trap stop_cluster_and_watchdog EXIT
+run_ha_test 4 "file" "false" "false" "${TEST_DATA_DIR}/output.txt"
+run_ha_test 4 "rocks" "false" "false" "${TEST_DATA_DIR}/output.txt"
+run_ha_test 4 "file" "true" "false" "${TEST_DATA_DIR}/output.txt"
+run_ha_test 4 "rocks" "false" "true" "${TEST_DATA_DIR}/output.txt"
http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-examples/flink-examples-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index ea253d8..6ff5512 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -84,7 +84,13 @@ under the License.
<type>test-jar</type>
</dependency>
- </dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
index 14757fb..054ed0a 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -55,6 +58,12 @@ public class StateMachineExample {
System.out.println("Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]");
System.out.println("Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]");
+ System.out.println("Options for both the above setups: ");
+ System.out.println("\t[--backend <file|rocks>]");
+ System.out.println("\t[--checkpoint-dir <filepath>]");
+ System.out.println("\t[--async-checkpoints <true|false>]");
+ System.out.println("\t[--incremental-checkpoints <true|false>]");
+ System.out.println("\t[--output <filepath> OR null for stdout]");
System.out.println();
// ---- determine whether to use the built-in source, or read from Kafka ----
@@ -92,7 +101,23 @@ public class StateMachineExample {
// create the environment to create streams and configure execution
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(5000);
+ env.enableCheckpointing(2000L);
+
+ final String stateBackend = params.get("backend", "memory");
+ if ("file".equals(stateBackend)) {
+ final String checkpointDir = params.get("checkpoint-dir");
+ boolean asyncCheckpoints = params.getBoolean("async-checkpoints", false);
+ env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
+ } else if ("rocks".equals(stateBackend)) {
+ final String checkpointDir = params.get("checkpoint-dir");
+ boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false);
+ env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
+ }
+
+ final String outputFile = params.get("output");
+
+ // make parameters available in the web interface
+ env.getConfig().setGlobalJobParameters(params);
DataStream<Event> events = env.addSource(source);
@@ -105,7 +130,13 @@ public class StateMachineExample {
.flatMap(new StateMachineMapper());
// output the alerts to std-out
- alerts.print();
+ if (outputFile == null) {
+ alerts.print();
+ } else {
+ alerts
+ .writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(1);
+ }
// trigger program execution
env.execute("State machine job");
@@ -140,7 +171,7 @@ public class StateMachineExample {
state = State.Initial;
}
- // ask the state machine what state we should go to based on teh given event
+ // ask the state machine what state we should go to based on the given event
State nextState = state.transition(evt.type());
if (nextState == State.InvalidTransition) {