You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/30 08:57:30 UTC

[flink] branch master updated: [FLINK-8974] Run all-round DataSet job with failures in HA mode

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bf6db7d  [FLINK-8974] Run all-round DataSet job with failures in HA mode
bf6db7d is described below

commit bf6db7d2483561b7fc5d7f9484acfcfa6c5c67c2
Author: Dawid Wysakowicz <wy...@gmail.com>
AuthorDate: Mon Jul 30 10:57:26 2018 +0200

    [FLINK-8974] Run all-round DataSet job with failures in HA mode
---
 .../batch/tests/DataSetAllroundTestProgram.java    | 123 ++---------------
 .../org/apache/flink/batch/tests/Generator.java    | 148 +++++++++++++++++++++
 flink-end-to-end-tests/run-nightly-tests.sh        |  10 +-
 .../test-scripts/{test_ha.sh => common_ha.sh}      | 102 +++-----------
 .../test-scripts/test_ha_dataset.sh                |  71 ++++++++++
 .../test-scripts/test_ha_datastream.sh             |  93 +++++++++++++
 6 files changed, 352 insertions(+), 195 deletions(-)

diff --git a/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
index 4abbb35..afdbdef 100644
--- a/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
@@ -21,9 +21,6 @@ package org.apache.flink.batch.tests;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -34,11 +31,7 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.util.Preconditions;
 
 /**
  * Program to test a large chunk of DataSet API operators and primitives:
@@ -55,6 +48,8 @@ import org.apache.flink.util.Preconditions;
  * <ul>
  *     <li>loadFactor (int): controls generated data volume. Does not affect result.</li>
  *     <li>outputPath (String): path to write the result</li>
+ *     <li>infinite (Boolean): if set to true one of the sources will be infinite. The job will never end.
+ *     (default: false(</li>
  * </ul>
  */
 public class DataSetAllroundTestProgram {
@@ -66,13 +61,20 @@ public class DataSetAllroundTestProgram {
 		ParameterTool params = ParameterTool.fromArgs(args);
 		int loadFactor = Integer.parseInt(params.getRequired("loadFactor"));
 		String outputPath = params.getRequired("outputPath");
+		boolean infinite = params.getBoolean("infinite", false);
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		int numKeys = loadFactor * 128 * 1024;
-		DataSet<Tuple2<String, Integer>> x1Keys = env.createInput(new Generator(numKeys, 1)).setParallelism(4);
-		DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(new Generator(numKeys * 32, 2)).setParallelism(4);
-		DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(new Generator(numKeys, 8)).setParallelism(4);
+		DataSet<Tuple2<String, Integer>> x1Keys;
+		DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(Generator.generate(numKeys * 32, 2)).setParallelism(4);
+		DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(Generator.generate(numKeys, 8)).setParallelism(4);
+
+		if (infinite) {
+			x1Keys = env.createInput(Generator.generateInfinitely(numKeys)).setParallelism(4);
+		} else {
+			x1Keys = env.createInput(Generator.generate(numKeys, 1)).setParallelism(4);
+		}
 
 		DataSet<Tuple2<String, Integer>> joined = x2Keys
 			// shift keys (check for correct handling of key positions)
@@ -179,105 +181,4 @@ public class DataSetAllroundTestProgram {
 		env.execute();
 	}
 
-	/**
-	 * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer)
-	 * <ul>
-	 *     <li>String: key, can be repeated.</li>
-	 *     <li>Integer: uniformly distributed int between 0 and 127</li>
-	 * </ul>
-	 */
-	public static class Generator implements InputFormat<Tuple2<String, Integer>, GenericInputSplit> {
-
-		// total number of records
-		private final long numRecords;
-		// total number of keys
-		private final long numKeys;
-
-		// records emitted per partition
-		private long recordsPerPartition;
-		// number of keys per partition
-		private long keysPerPartition;
-
-		// number of currently emitted records
-		private long recordCnt;
-
-		// id of current partition
-		private int partitionId;
-		// total number of partitions
-		private int numPartitions;
-
-		public Generator(long numKeys, int recordsPerKey) {
-			this.numKeys = numKeys;
-			this.numRecords = numKeys * recordsPerKey;
-		}
-
-		@Override
-		public void configure(Configuration parameters) { }
-
-		@Override
-		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-			return null;
-		}
-
-		@Override
-		public GenericInputSplit[] createInputSplits(int minNumSplits) {
-
-			GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
-			for (int i = 0; i < minNumSplits; i++) {
-				splits[i] = new GenericInputSplit(i, minNumSplits);
-			}
-			return splits;
-		}
-
-		@Override
-		public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) {
-			return new DefaultInputSplitAssigner(inputSplits);
-		}
-
-		@Override
-		public void open(GenericInputSplit split) {
-			this.partitionId = split.getSplitNumber();
-			this.numPartitions = split.getTotalNumberOfSplits();
-
-			// ensure even distribution of records and keys
-			Preconditions.checkArgument(
-				numRecords % numPartitions == 0,
-				"Records cannot be evenly distributed among partitions");
-			Preconditions.checkArgument(
-				numKeys % numPartitions == 0,
-				"Keys cannot be evenly distributed among partitions");
-
-			this.recordsPerPartition = numRecords / numPartitions;
-			this.keysPerPartition = numKeys / numPartitions;
-
-			this.recordCnt = 0;
-		}
-
-		@Override
-		public boolean reachedEnd() {
-			return this.recordCnt >= this.recordsPerPartition;
-		}
-
-		@Override
-		public Tuple2<String, Integer> nextRecord(Tuple2<String, Integer> reuse) {
-
-			// build key from partition id and count per partition
-			String key = String.format(
-				"%d-%d",
-				this.partitionId,
-				this.recordCnt % this.keysPerPartition);
-			// 128 values to filter on
-			int filterVal = (int) this.recordCnt % 128;
-
-			this.recordCnt++;
-
-			reuse.f0 = key;
-			reuse.f1 = filterVal;
-			return reuse;
-		}
-
-		@Override
-		public void close() { }
-	}
-
 }
diff --git a/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
new file mode 100644
index 0000000..9da5259
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer)
+ * <ul>
+ *     <li>String: key, can be repeated.</li>
+ *     <li>Integer: uniformly distributed int between 0 and 127</li>
+ * </ul>
+ */
+public class Generator implements InputFormat<Tuple2<String, Integer>, GenericInputSplit> {
+
+	// total number of records
+	private final long numRecords;
+	// total number of keys
+	private final long numKeys;
+
+	// records emitted per partition
+	private long recordsPerPartition;
+	// number of keys per partition
+	private long keysPerPartition;
+
+	// number of currently emitted records
+	private long recordCnt;
+
+	// id of current partition
+	private int partitionId;
+
+	private final boolean infinite;
+
+	public static Generator generate(long numKeys, int recordsPerKey) {
+		return new Generator(numKeys, recordsPerKey, false);
+	}
+
+	public static Generator generateInfinitely(long numKeys) {
+		return new Generator(numKeys, 0, true);
+	}
+
+	private Generator(long numKeys, int recordsPerKey, boolean infinite) {
+		this.numKeys = numKeys;
+		if (infinite) {
+			this.numRecords = Long.MAX_VALUE;
+		} else {
+			this.numRecords = numKeys * recordsPerKey;
+		}
+		this.infinite = infinite;
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+		return null;
+	}
+
+	@Override
+	public GenericInputSplit[] createInputSplits(int minNumSplits) {
+
+		GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
+		for (int i = 0; i < minNumSplits; i++) {
+			splits[i] = new GenericInputSplit(i, minNumSplits);
+		}
+		return splits;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) {
+		return new DefaultInputSplitAssigner(inputSplits);
+	}
+
+	@Override
+	public void open(GenericInputSplit split) throws IOException {
+		this.partitionId = split.getSplitNumber();
+		// total number of partitions
+		int numPartitions = split.getTotalNumberOfSplits();
+
+		// ensure even distribution of records and keys
+		Preconditions.checkArgument(
+			numRecords % numPartitions == 0,
+			"Records cannot be evenly distributed among partitions");
+		Preconditions.checkArgument(
+			numKeys % numPartitions == 0,
+			"Keys cannot be evenly distributed among partitions");
+
+		this.recordsPerPartition = numRecords / numPartitions;
+		this.keysPerPartition = numKeys / numPartitions;
+
+		this.recordCnt = 0;
+	}
+
+	@Override
+	public boolean reachedEnd() {
+		return !infinite && this.recordCnt >= this.recordsPerPartition;
+	}
+
+	@Override
+	public Tuple2<String, Integer> nextRecord(Tuple2<String, Integer> reuse) throws IOException {
+
+		// build key from partition id and count per partition
+		String key = String.format(
+			"%d-%d",
+			this.partitionId,
+			this.recordCnt % this.keysPerPartition);
+
+		// 128 values to filter on
+		int filterVal = (int) this.recordCnt % 128;
+
+		this.recordCnt++;
+
+		reuse.f0 = key;
+		reuse.f1 = filterVal;
+		return reuse;
+	}
+
+	@Override
+	public void close() {
+	}
+}
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 960ba89..d06e80c 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -48,10 +48,12 @@ run_test "ConnectedComponents iterations with high parallelism end-to-end test"
 run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb"
 run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"
 
-run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
-run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
-run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"
-run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true"
+run_test "Running HA dataset end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_dataset.sh"
+
+run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file true false"
+run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file false false"
+run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true false"
+run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true true"
 
 run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
 run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
diff --git a/flink-end-to-end-tests/test-scripts/test_ha.sh b/flink-end-to-end-tests/test-scripts/common_ha.sh
old mode 100755
new mode 100644
similarity index 61%
rename from flink-end-to-end-tests/test-scripts/test_ha.sh
rename to flink-end-to-end-tests/test-scripts/common_ha.sh
index a44a139..f476bac
--- a/flink-end-to-end-tests/test-scripts/test_ha.sh
+++ b/flink-end-to-end-tests/test-scripts/common_ha.sh
@@ -1,4 +1,5 @@
 #!/usr/bin/env bash
+
 ################################################################################
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -17,16 +18,12 @@
 # limitations under the License.
 ################################################################################
 
-source "$(dirname "$0")"/common.sh
-
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
 
 JM_WATCHDOG_PID=0
 TM_WATCHDOG_PID=0
 
-# flag indicating if we have already cleared up things after a test
-CLEARED=0
-
 function stop_cluster_and_watchdog() {
     if [ ${CLEARED} -eq 0 ]; then
 
@@ -50,23 +47,26 @@ function verify_logs() {
     local OUTPUT=$FLINK_DIR/log/*.out
     local JM_FAILURES=$1
     local EXIT_CODE=0
+    local VERIFY_CHECKPOINTS=$2
 
     # verify that we have no alerts
     if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
-        echo "FAILURE: Alerts found at the general purpose DataStream job."
+        echo "FAILURE: Alerts found at the general purpose job."
         EXIT_CODE=1
     fi
 
     # checks that all apart from the first JM recover the failed jobgraph.
-    if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then
+    if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then
         echo "FAILURE: A JM did not take over."
         EXIT_CODE=1
     fi
 
+    if [ "$VERIFY_CHECKPOINTS" = true ]; then
     # search the logs for JMs that log completed checkpoints
-    if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then
-        echo "FAILURE: A JM did not execute the job."
-        EXIT_CODE=1
+        if ! [ `grep -r --include '*standalonesession*.log' 'Completed checkpoint' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then
+            echo "FAILURE: A JM did not execute the job."
+            EXIT_CODE=1
+        fi
     fi
 
     if [[ $EXIT_CODE != 0 ]]; then
@@ -85,10 +85,16 @@ function jm_watchdog() {
         for (( c=0; c<MISSING_JMS; c++ )); do
             "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${IP_PORT}
         done
-        sleep 5;
+        sleep 1;
     done
 }
 
+function start_ha_jm_watchdog() {
+    jm_watchdog $1 $2 &
+    JM_WATCHDOG_PID=$!
+    echo "Running JM watchdog @ ${JM_WATCHDOG_PID}"
+}
+
 function kill_jm {
     local JM_PIDS=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
     local JM_PIDS=(${JM_PIDS[@]})
@@ -98,7 +104,8 @@ function kill_jm {
     echo "Killed JM @ ${PID}"
 }
 
-function tm_watchdog() {
+# ha prefix to differentiate from the one in common.sh
+function ha_tm_watchdog() {
     local JOB_ID=$1
     local EXPECTED_TMS=$2
 
@@ -146,74 +153,9 @@ function tm_watchdog() {
     done
 }
 
-function run_ha_test() {
-    local PARALLELISM=$1
-    local BACKEND=$2
-    local ASYNC=$3
-    local INCREM=$4
-
-    local JM_KILLS=3
-    local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/"
-
-    CLEARED=0
-
-    # start the cluster on HA mode
-    start_ha_cluster
-
-    echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
-
-    # submit a job in detached mode and let it run
-    local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
-     $TEST_PROGRAM_JAR \
-        --environment.parallelism ${PARALLELISM} \
-        --test.semantics exactly-once \
-        --test.simulate_failure true \
-        --test.simulate_failure.num_records 200 \
-        --test.simulate_failure.num_checkpoints 1 \
-        --test.simulate_failure.max_failures 20 \
-        --state_backend ${BACKEND} \
-        --state_backend.checkpoint_directory "file://${CHECKPOINT_DIR}" \
-        --state_backend.file.async ${ASYNC} \
-        --state_backend.rocks.incremental ${INCREM} \
-        --sequence_generator_source.sleep_time 15 \
-        --sequence_generator_source.sleep_after_elements 1 \
-        | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-    wait_job_running ${JOB_ID}
-
-    # start the watchdog that keeps the number of JMs stable
-    jm_watchdog 1 "8081" &
-    JM_WATCHDOG_PID=$!
-    echo "Running JM watchdog @ ${JM_WATCHDOG_PID}"
-
-    sleep 5
-
-    # start the watchdog that keeps the number of TMs stable
-    tm_watchdog ${JOB_ID} 1 &
+function start_ha_tm_watchdog() {
+    ha_tm_watchdog $1 $2 &
     TM_WATCHDOG_PID=$!
     echo "Running TM watchdog @ ${TM_WATCHDOG_PID}"
-
-    # let the job run for a while to take some checkpoints
-    sleep 20
-
-    for (( c=0; c<${JM_KILLS}; c++ )); do
-        # kill the JM and wait for watchdog to
-        # create a new one which will take over
-        kill_jm
-        sleep 60
-    done
-
-    verify_logs ${JM_KILLS}
-
-    # kill the cluster and zookeeper
-    stop_cluster_and_watchdog
 }
 
-trap stop_cluster_and_watchdog INT
-trap stop_cluster_and_watchdog EXIT
-
-STATE_BACKEND_TYPE=${1:-file}
-STATE_BACKEND_FILE_ASYNC=${2:-true}
-STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false}
-
-run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} ${STATE_BACKEND_ROCKS_INCREMENTAL}
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
new file mode 100755
index 0000000..716b80c
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/common_ha.sh
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+function run_ha_test() {
+    local PARALLELISM=$1
+
+    local JM_KILLS=3
+
+    CLEARED=0
+    mkdir -p ${TEST_DATA_DIR}/control
+    touch ${TEST_DATA_DIR}/control/test.txt
+
+    # start the cluster on HA mode
+    start_ha_cluster
+
+    echo "Running on HA mode: parallelism=${PARALLELISM}."
+
+    # submit a job in detached mode and let it run
+    local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
+        $TEST_PROGRAM_JAR \
+        --loadFactor 4 \
+        --outputPath $TEST_DATA_DIR/out/dataset_allround \
+        --source true \
+        | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+    wait_job_running ${JOB_ID}
+
+    # start the watchdog that keeps the number of JMs stable
+    start_ha_jm_watchdog 1 "8081"
+
+    for (( c=0; c<${JM_KILLS}; c++ )); do
+        # kill the JM and wait for watchdog to
+        # create a new one which will take over
+        kill_jm
+        wait_job_running ${JOB_ID}
+    done
+
+    cancel_job ${JOB_ID}
+
+    # do not verify checkpoints in the logs
+    verify_logs ${JM_KILLS} false
+
+    # kill the cluster and zookeeper
+    stop_cluster_and_watchdog
+}
+
+trap stop_cluster_and_watchdog INT
+trap stop_cluster_and_watchdog EXIT
+
+run_ha_test 4
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
new file mode 100755
index 0000000..862055e
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/common_ha.sh
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+
+function run_ha_test() {
+    local PARALLELISM=$1
+    local BACKEND=$2
+    local ASYNC=$3
+    local INCREM=$4
+
+    local JM_KILLS=3
+    local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/"
+
+    CLEARED=0
+
+    # start the cluster on HA mode
+    start_ha_cluster
+
+    echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
+
+    # submit a job in detached mode and let it run
+    local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
+     $TEST_PROGRAM_JAR \
+        --environment.parallelism ${PARALLELISM} \
+        --test.semantics exactly-once \
+        --test.simulate_failure true \
+        --test.simulate_failure.num_records 200 \
+        --test.simulate_failure.num_checkpoints 1 \
+        --test.simulate_failure.max_failures 20 \
+        --state_backend ${BACKEND} \
+        --state_backend.checkpoint_directory "file://${CHECKPOINT_DIR}" \
+        --state_backend.file.async ${ASYNC} \
+        --state_backend.rocks.incremental ${INCREM} \
+        --sequence_generator_source.sleep_time 15 \
+        --sequence_generator_source.sleep_after_elements 1 \
+        | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+    wait_job_running ${JOB_ID}
+
+    # start the watchdog that keeps the number of JMs stable
+    start_ha_jm_watchdog 1 "8081"
+
+    sleep 5
+
+    # start the watchdog that keeps the number of TMs stable
+    start_ha_tm_watchdog ${JOB_ID} 1
+
+    # let the job run for a while to take some checkpoints
+    sleep 20
+
+    for (( c=0; c<${JM_KILLS}; c++ )); do
+        # kill the JM and wait for watchdog to
+        # create a new one which will take over
+        kill_jm
+        # let the job start and take some checkpoints
+        sleep 60
+    done
+
+    # verify checkpoints in the logs
+    verify_logs ${JM_KILLS} true
+
+    # kill the cluster and zookeeper
+    stop_cluster_and_watchdog
+}
+
+trap stop_cluster_and_watchdog INT
+trap stop_cluster_and_watchdog EXIT
+
+STATE_BACKEND_TYPE=${1:-file}
+STATE_BACKEND_FILE_ASYNC=${2:-true}
+STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false}
+
+run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} ${STATE_BACKEND_ROCKS_INCREMENTAL}