You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/30 08:58:51 UTC
[flink] branch release-1.6 updated: [FLINK-8974] Run all-round
DataSet job with failures in HA mode
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 2a217a5 [FLINK-8974] Run all-round DataSet job with failures in HA mode
2a217a5 is described below
commit 2a217a596da2b689b9648a465db1628debf19a85
Author: Dawid Wysakowicz <wy...@gmail.com>
AuthorDate: Mon Jul 30 10:57:26 2018 +0200
[FLINK-8974] Run all-round DataSet job with failures in HA mode
---
.../batch/tests/DataSetAllroundTestProgram.java | 123 ++---------------
.../org/apache/flink/batch/tests/Generator.java | 148 +++++++++++++++++++++
flink-end-to-end-tests/run-nightly-tests.sh | 10 +-
.../test-scripts/{test_ha.sh => common_ha.sh} | 102 +++-----------
.../test-scripts/test_ha_dataset.sh | 71 ++++++++++
.../test-scripts/test_ha_datastream.sh | 93 +++++++++++++
6 files changed, 352 insertions(+), 195 deletions(-)
diff --git a/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
index 4abbb35..afdbdef 100644
--- a/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
@@ -21,9 +21,6 @@ package org.apache.flink.batch.tests;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.typeinfo.Types;
@@ -34,11 +31,7 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.util.Preconditions;
/**
* Program to test a large chunk of DataSet API operators and primitives:
@@ -55,6 +48,8 @@ import org.apache.flink.util.Preconditions;
* <ul>
* <li>loadFactor (int): controls generated data volume. Does not affect result.</li>
* <li>outputPath (String): path to write the result</li>
+ * <li>infinite (Boolean): if set to true one of the sources will be infinite. The job will never end.
+ * (default: false(</li>
* </ul>
*/
public class DataSetAllroundTestProgram {
@@ -66,13 +61,20 @@ public class DataSetAllroundTestProgram {
ParameterTool params = ParameterTool.fromArgs(args);
int loadFactor = Integer.parseInt(params.getRequired("loadFactor"));
String outputPath = params.getRequired("outputPath");
+ boolean infinite = params.getBoolean("infinite", false);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
int numKeys = loadFactor * 128 * 1024;
- DataSet<Tuple2<String, Integer>> x1Keys = env.createInput(new Generator(numKeys, 1)).setParallelism(4);
- DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(new Generator(numKeys * 32, 2)).setParallelism(4);
- DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(new Generator(numKeys, 8)).setParallelism(4);
+ DataSet<Tuple2<String, Integer>> x1Keys;
+ DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(Generator.generate(numKeys * 32, 2)).setParallelism(4);
+ DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(Generator.generate(numKeys, 8)).setParallelism(4);
+
+ if (infinite) {
+ x1Keys = env.createInput(Generator.generateInfinitely(numKeys)).setParallelism(4);
+ } else {
+ x1Keys = env.createInput(Generator.generate(numKeys, 1)).setParallelism(4);
+ }
DataSet<Tuple2<String, Integer>> joined = x2Keys
// shift keys (check for correct handling of key positions)
@@ -179,105 +181,4 @@ public class DataSetAllroundTestProgram {
env.execute();
}
- /**
- * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer)
- * <ul>
- * <li>String: key, can be repeated.</li>
- * <li>Integer: uniformly distributed int between 0 and 127</li>
- * </ul>
- */
- public static class Generator implements InputFormat<Tuple2<String, Integer>, GenericInputSplit> {
-
- // total number of records
- private final long numRecords;
- // total number of keys
- private final long numKeys;
-
- // records emitted per partition
- private long recordsPerPartition;
- // number of keys per partition
- private long keysPerPartition;
-
- // number of currently emitted records
- private long recordCnt;
-
- // id of current partition
- private int partitionId;
- // total number of partitions
- private int numPartitions;
-
- public Generator(long numKeys, int recordsPerKey) {
- this.numKeys = numKeys;
- this.numRecords = numKeys * recordsPerKey;
- }
-
- @Override
- public void configure(Configuration parameters) { }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
- return null;
- }
-
- @Override
- public GenericInputSplit[] createInputSplits(int minNumSplits) {
-
- GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
- for (int i = 0; i < minNumSplits; i++) {
- splits[i] = new GenericInputSplit(i, minNumSplits);
- }
- return splits;
- }
-
- @Override
- public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) {
- return new DefaultInputSplitAssigner(inputSplits);
- }
-
- @Override
- public void open(GenericInputSplit split) {
- this.partitionId = split.getSplitNumber();
- this.numPartitions = split.getTotalNumberOfSplits();
-
- // ensure even distribution of records and keys
- Preconditions.checkArgument(
- numRecords % numPartitions == 0,
- "Records cannot be evenly distributed among partitions");
- Preconditions.checkArgument(
- numKeys % numPartitions == 0,
- "Keys cannot be evenly distributed among partitions");
-
- this.recordsPerPartition = numRecords / numPartitions;
- this.keysPerPartition = numKeys / numPartitions;
-
- this.recordCnt = 0;
- }
-
- @Override
- public boolean reachedEnd() {
- return this.recordCnt >= this.recordsPerPartition;
- }
-
- @Override
- public Tuple2<String, Integer> nextRecord(Tuple2<String, Integer> reuse) {
-
- // build key from partition id and count per partition
- String key = String.format(
- "%d-%d",
- this.partitionId,
- this.recordCnt % this.keysPerPartition);
- // 128 values to filter on
- int filterVal = (int) this.recordCnt % 128;
-
- this.recordCnt++;
-
- reuse.f0 = key;
- reuse.f1 = filterVal;
- return reuse;
- }
-
- @Override
- public void close() { }
- }
-
}
diff --git a/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
new file mode 100644
index 0000000..9da5259
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer)
+ * <ul>
+ * <li>String: key, can be repeated.</li>
+ * <li>Integer: uniformly distributed int between 0 and 127</li>
+ * </ul>
+ */
+public class Generator implements InputFormat<Tuple2<String, Integer>, GenericInputSplit> {
+
+ // total number of records
+ private final long numRecords;
+ // total number of keys
+ private final long numKeys;
+
+ // records emitted per partition
+ private long recordsPerPartition;
+ // number of keys per partition
+ private long keysPerPartition;
+
+ // number of currently emitted records
+ private long recordCnt;
+
+ // id of current partition
+ private int partitionId;
+
+ private final boolean infinite;
+
+ public static Generator generate(long numKeys, int recordsPerKey) {
+ return new Generator(numKeys, recordsPerKey, false);
+ }
+
+ public static Generator generateInfinitely(long numKeys) {
+ return new Generator(numKeys, 0, true);
+ }
+
+ private Generator(long numKeys, int recordsPerKey, boolean infinite) {
+ this.numKeys = numKeys;
+ if (infinite) {
+ this.numRecords = Long.MAX_VALUE;
+ } else {
+ this.numRecords = numKeys * recordsPerKey;
+ }
+ this.infinite = infinite;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+ return null;
+ }
+
+ @Override
+ public GenericInputSplit[] createInputSplits(int minNumSplits) {
+
+ GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
+ for (int i = 0; i < minNumSplits; i++) {
+ splits[i] = new GenericInputSplit(i, minNumSplits);
+ }
+ return splits;
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) {
+ return new DefaultInputSplitAssigner(inputSplits);
+ }
+
+ @Override
+ public void open(GenericInputSplit split) throws IOException {
+ this.partitionId = split.getSplitNumber();
+ // total number of partitions
+ int numPartitions = split.getTotalNumberOfSplits();
+
+ // ensure even distribution of records and keys
+ Preconditions.checkArgument(
+ numRecords % numPartitions == 0,
+ "Records cannot be evenly distributed among partitions");
+ Preconditions.checkArgument(
+ numKeys % numPartitions == 0,
+ "Keys cannot be evenly distributed among partitions");
+
+ this.recordsPerPartition = numRecords / numPartitions;
+ this.keysPerPartition = numKeys / numPartitions;
+
+ this.recordCnt = 0;
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ return !infinite && this.recordCnt >= this.recordsPerPartition;
+ }
+
+ @Override
+ public Tuple2<String, Integer> nextRecord(Tuple2<String, Integer> reuse) throws IOException {
+
+ // build key from partition id and count per partition
+ String key = String.format(
+ "%d-%d",
+ this.partitionId,
+ this.recordCnt % this.keysPerPartition);
+
+ // 128 values to filter on
+ int filterVal = (int) this.recordCnt % 128;
+
+ this.recordCnt++;
+
+ reuse.f0 = key;
+ reuse.f1 = filterVal;
+ return reuse;
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 960ba89..d06e80c 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -48,10 +48,12 @@ run_test "ConnectedComponents iterations with high parallelism end-to-end test"
run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb"
run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"
-run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
-run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
-run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"
-run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true"
+run_test "Running HA dataset end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_dataset.sh"
+
+run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file true false"
+run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file false false"
+run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true false"
+run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true true"
run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
diff --git a/flink-end-to-end-tests/test-scripts/test_ha.sh b/flink-end-to-end-tests/test-scripts/common_ha.sh
old mode 100755
new mode 100644
similarity index 61%
rename from flink-end-to-end-tests/test-scripts/test_ha.sh
rename to flink-end-to-end-tests/test-scripts/common_ha.sh
index a44a139..f476bac
--- a/flink-end-to-end-tests/test-scripts/test_ha.sh
+++ b/flink-end-to-end-tests/test-scripts/common_ha.sh
@@ -1,4 +1,5 @@
#!/usr/bin/env bash
+
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -17,16 +18,12 @@
# limitations under the License.
################################################################################
-source "$(dirname "$0")"/common.sh
-
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
JM_WATCHDOG_PID=0
TM_WATCHDOG_PID=0
-# flag indicating if we have already cleared up things after a test
-CLEARED=0
-
function stop_cluster_and_watchdog() {
if [ ${CLEARED} -eq 0 ]; then
@@ -50,23 +47,26 @@ function verify_logs() {
local OUTPUT=$FLINK_DIR/log/*.out
local JM_FAILURES=$1
local EXIT_CODE=0
+ local VERIFY_CHECKPOINTS=$2
# verify that we have no alerts
if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
- echo "FAILURE: Alerts found at the general purpose DataStream job."
+ echo "FAILURE: Alerts found at the general purpose job."
EXIT_CODE=1
fi
# checks that all apart from the first JM recover the failed jobgraph.
- if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then
+ if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then
echo "FAILURE: A JM did not take over."
EXIT_CODE=1
fi
+ if [ "$VERIFY_CHECKPOINTS" = true ]; then
# search the logs for JMs that log completed checkpoints
- if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then
- echo "FAILURE: A JM did not execute the job."
- EXIT_CODE=1
+ if ! [ `grep -r --include '*standalonesession*.log' 'Completed checkpoint' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then
+ echo "FAILURE: A JM did not execute the job."
+ EXIT_CODE=1
+ fi
fi
if [[ $EXIT_CODE != 0 ]]; then
@@ -85,10 +85,16 @@ function jm_watchdog() {
for (( c=0; c<MISSING_JMS; c++ )); do
"$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${IP_PORT}
done
- sleep 5;
+ sleep 1;
done
}
+function start_ha_jm_watchdog() {
+ jm_watchdog $1 $2 &
+ JM_WATCHDOG_PID=$!
+ echo "Running JM watchdog @ ${JM_WATCHDOG_PID}"
+}
+
function kill_jm {
local JM_PIDS=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
local JM_PIDS=(${JM_PIDS[@]})
@@ -98,7 +104,8 @@ function kill_jm {
echo "Killed JM @ ${PID}"
}
-function tm_watchdog() {
+# ha prefix to differentiate from the one in common.sh
+function ha_tm_watchdog() {
local JOB_ID=$1
local EXPECTED_TMS=$2
@@ -146,74 +153,9 @@ function tm_watchdog() {
done
}
-function run_ha_test() {
- local PARALLELISM=$1
- local BACKEND=$2
- local ASYNC=$3
- local INCREM=$4
-
- local JM_KILLS=3
- local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/"
-
- CLEARED=0
-
- # start the cluster on HA mode
- start_ha_cluster
-
- echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
-
- # submit a job in detached mode and let it run
- local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
- $TEST_PROGRAM_JAR \
- --environment.parallelism ${PARALLELISM} \
- --test.semantics exactly-once \
- --test.simulate_failure true \
- --test.simulate_failure.num_records 200 \
- --test.simulate_failure.num_checkpoints 1 \
- --test.simulate_failure.max_failures 20 \
- --state_backend ${BACKEND} \
- --state_backend.checkpoint_directory "file://${CHECKPOINT_DIR}" \
- --state_backend.file.async ${ASYNC} \
- --state_backend.rocks.incremental ${INCREM} \
- --sequence_generator_source.sleep_time 15 \
- --sequence_generator_source.sleep_after_elements 1 \
- | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
- wait_job_running ${JOB_ID}
-
- # start the watchdog that keeps the number of JMs stable
- jm_watchdog 1 "8081" &
- JM_WATCHDOG_PID=$!
- echo "Running JM watchdog @ ${JM_WATCHDOG_PID}"
-
- sleep 5
-
- # start the watchdog that keeps the number of TMs stable
- tm_watchdog ${JOB_ID} 1 &
+function start_ha_tm_watchdog() {
+ ha_tm_watchdog $1 $2 &
TM_WATCHDOG_PID=$!
echo "Running TM watchdog @ ${TM_WATCHDOG_PID}"
-
- # let the job run for a while to take some checkpoints
- sleep 20
-
- for (( c=0; c<${JM_KILLS}; c++ )); do
- # kill the JM and wait for watchdog to
- # create a new one which will take over
- kill_jm
- sleep 60
- done
-
- verify_logs ${JM_KILLS}
-
- # kill the cluster and zookeeper
- stop_cluster_and_watchdog
}
-trap stop_cluster_and_watchdog INT
-trap stop_cluster_and_watchdog EXIT
-
-STATE_BACKEND_TYPE=${1:-file}
-STATE_BACKEND_FILE_ASYNC=${2:-true}
-STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false}
-
-run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} ${STATE_BACKEND_ROCKS_INCREMENTAL}
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
new file mode 100755
index 0000000..716b80c
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/common_ha.sh
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+function run_ha_test() {
+ local PARALLELISM=$1
+
+ local JM_KILLS=3
+
+ CLEARED=0
+ mkdir -p ${TEST_DATA_DIR}/control
+ touch ${TEST_DATA_DIR}/control/test.txt
+
+ # start the cluster on HA mode
+ start_ha_cluster
+
+ echo "Running on HA mode: parallelism=${PARALLELISM}."
+
+ # submit a job in detached mode and let it run
+ local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
+ $TEST_PROGRAM_JAR \
+ --loadFactor 4 \
+ --outputPath $TEST_DATA_DIR/out/dataset_allround \
+ --source true \
+ | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+ wait_job_running ${JOB_ID}
+
+ # start the watchdog that keeps the number of JMs stable
+ start_ha_jm_watchdog 1 "8081"
+
+ for (( c=0; c<${JM_KILLS}; c++ )); do
+ # kill the JM and wait for watchdog to
+ # create a new one which will take over
+ kill_jm
+ wait_job_running ${JOB_ID}
+ done
+
+ cancel_job ${JOB_ID}
+
+ # do not verify checkpoints in the logs
+ verify_logs ${JM_KILLS} false
+
+ # kill the cluster and zookeeper
+ stop_cluster_and_watchdog
+}
+
+trap stop_cluster_and_watchdog INT
+trap stop_cluster_and_watchdog EXIT
+
+run_ha_test 4
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
new file mode 100755
index 0000000..862055e
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/common_ha.sh
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+
+function run_ha_test() {
+ local PARALLELISM=$1
+ local BACKEND=$2
+ local ASYNC=$3
+ local INCREM=$4
+
+ local JM_KILLS=3
+ local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/"
+
+ CLEARED=0
+
+ # start the cluster on HA mode
+ start_ha_cluster
+
+ echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
+
+ # submit a job in detached mode and let it run
+ local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
+ $TEST_PROGRAM_JAR \
+ --environment.parallelism ${PARALLELISM} \
+ --test.semantics exactly-once \
+ --test.simulate_failure true \
+ --test.simulate_failure.num_records 200 \
+ --test.simulate_failure.num_checkpoints 1 \
+ --test.simulate_failure.max_failures 20 \
+ --state_backend ${BACKEND} \
+ --state_backend.checkpoint_directory "file://${CHECKPOINT_DIR}" \
+ --state_backend.file.async ${ASYNC} \
+ --state_backend.rocks.incremental ${INCREM} \
+ --sequence_generator_source.sleep_time 15 \
+ --sequence_generator_source.sleep_after_elements 1 \
+ | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+ wait_job_running ${JOB_ID}
+
+ # start the watchdog that keeps the number of JMs stable
+ start_ha_jm_watchdog 1 "8081"
+
+ sleep 5
+
+ # start the watchdog that keeps the number of TMs stable
+ start_ha_tm_watchdog ${JOB_ID} 1
+
+ # let the job run for a while to take some checkpoints
+ sleep 20
+
+ for (( c=0; c<${JM_KILLS}; c++ )); do
+ # kill the JM and wait for watchdog to
+ # create a new one which will take over
+ kill_jm
+ # let the job start and take some checkpoints
+ sleep 60
+ done
+
+ # verify checkpoints in the logs
+ verify_logs ${JM_KILLS} true
+
+ # kill the cluster and zookeeper
+ stop_cluster_and_watchdog
+}
+
+trap stop_cluster_and_watchdog INT
+trap stop_cluster_and_watchdog EXIT
+
+STATE_BACKEND_TYPE=${1:-file}
+STATE_BACKEND_FILE_ASYNC=${2:-true}
+STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false}
+
+run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} ${STATE_BACKEND_ROCKS_INCREMENTAL}