You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/07/30 08:57:28 UTC

[GitHub] zentol closed pull request #6415: [FLINK-8974] Run all-round DataSet job with failures in HA mode

zentol closed pull request #6415: [FLINK-8974] Run all-round DataSet job with failures in HA mode
URL: https://github.com/apache/flink/pull/6415
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
index 4abbb3548b0..afdbdefd311 100644
--- a/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
@@ -21,9 +21,6 @@
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -34,11 +31,7 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.util.Preconditions;
 
 /**
  * Program to test a large chunk of DataSet API operators and primitives:
@@ -55,6 +48,8 @@
  * <ul>
  *     <li>loadFactor (int): controls generated data volume. Does not affect result.</li>
  *     <li>outputPath (String): path to write the result</li>
+ *     <li>infinite (Boolean): if set to true one of the sources will be infinite. The job will never end.
+ *     (default: false(</li>
  * </ul>
  */
 public class DataSetAllroundTestProgram {
@@ -66,13 +61,20 @@ public static void main(String[] args) throws Exception {
 		ParameterTool params = ParameterTool.fromArgs(args);
 		int loadFactor = Integer.parseInt(params.getRequired("loadFactor"));
 		String outputPath = params.getRequired("outputPath");
+		boolean infinite = params.getBoolean("infinite", false);
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		int numKeys = loadFactor * 128 * 1024;
-		DataSet<Tuple2<String, Integer>> x1Keys = env.createInput(new Generator(numKeys, 1)).setParallelism(4);
-		DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(new Generator(numKeys * 32, 2)).setParallelism(4);
-		DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(new Generator(numKeys, 8)).setParallelism(4);
+		DataSet<Tuple2<String, Integer>> x1Keys;
+		DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(Generator.generate(numKeys * 32, 2)).setParallelism(4);
+		DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(Generator.generate(numKeys, 8)).setParallelism(4);
+
+		if (infinite) {
+			x1Keys = env.createInput(Generator.generateInfinitely(numKeys)).setParallelism(4);
+		} else {
+			x1Keys = env.createInput(Generator.generate(numKeys, 1)).setParallelism(4);
+		}
 
 		DataSet<Tuple2<String, Integer>> joined = x2Keys
 			// shift keys (check for correct handling of key positions)
@@ -179,105 +181,4 @@ public String getKey(Tuple2<String, Integer> value) {
 		env.execute();
 	}
 
-	/**
-	 * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer)
-	 * <ul>
-	 *     <li>String: key, can be repeated.</li>
-	 *     <li>Integer: uniformly distributed int between 0 and 127</li>
-	 * </ul>
-	 */
-	public static class Generator implements InputFormat<Tuple2<String, Integer>, GenericInputSplit> {
-
-		// total number of records
-		private final long numRecords;
-		// total number of keys
-		private final long numKeys;
-
-		// records emitted per partition
-		private long recordsPerPartition;
-		// number of keys per partition
-		private long keysPerPartition;
-
-		// number of currently emitted records
-		private long recordCnt;
-
-		// id of current partition
-		private int partitionId;
-		// total number of partitions
-		private int numPartitions;
-
-		public Generator(long numKeys, int recordsPerKey) {
-			this.numKeys = numKeys;
-			this.numRecords = numKeys * recordsPerKey;
-		}
-
-		@Override
-		public void configure(Configuration parameters) { }
-
-		@Override
-		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-			return null;
-		}
-
-		@Override
-		public GenericInputSplit[] createInputSplits(int minNumSplits) {
-
-			GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
-			for (int i = 0; i < minNumSplits; i++) {
-				splits[i] = new GenericInputSplit(i, minNumSplits);
-			}
-			return splits;
-		}
-
-		@Override
-		public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) {
-			return new DefaultInputSplitAssigner(inputSplits);
-		}
-
-		@Override
-		public void open(GenericInputSplit split) {
-			this.partitionId = split.getSplitNumber();
-			this.numPartitions = split.getTotalNumberOfSplits();
-
-			// ensure even distribution of records and keys
-			Preconditions.checkArgument(
-				numRecords % numPartitions == 0,
-				"Records cannot be evenly distributed among partitions");
-			Preconditions.checkArgument(
-				numKeys % numPartitions == 0,
-				"Keys cannot be evenly distributed among partitions");
-
-			this.recordsPerPartition = numRecords / numPartitions;
-			this.keysPerPartition = numKeys / numPartitions;
-
-			this.recordCnt = 0;
-		}
-
-		@Override
-		public boolean reachedEnd() {
-			return this.recordCnt >= this.recordsPerPartition;
-		}
-
-		@Override
-		public Tuple2<String, Integer> nextRecord(Tuple2<String, Integer> reuse) {
-
-			// build key from partition id and count per partition
-			String key = String.format(
-				"%d-%d",
-				this.partitionId,
-				this.recordCnt % this.keysPerPartition);
-			// 128 values to filter on
-			int filterVal = (int) this.recordCnt % 128;
-
-			this.recordCnt++;
-
-			reuse.f0 = key;
-			reuse.f1 = filterVal;
-			return reuse;
-		}
-
-		@Override
-		public void close() { }
-	}
-
 }
diff --git a/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
new file mode 100644
index 00000000000..9da5259d395
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer)
+ * <ul>
+ *     <li>String: key, can be repeated.</li>
+ *     <li>Integer: uniformly distributed int between 0 and 127</li>
+ * </ul>
+ */
+public class Generator implements InputFormat<Tuple2<String, Integer>, GenericInputSplit> {
+
+	// total number of records
+	private final long numRecords;
+	// total number of keys
+	private final long numKeys;
+
+	// records emitted per partition
+	private long recordsPerPartition;
+	// number of keys per partition
+	private long keysPerPartition;
+
+	// number of currently emitted records
+	private long recordCnt;
+
+	// id of current partition
+	private int partitionId;
+
+	private final boolean infinite;
+
+	public static Generator generate(long numKeys, int recordsPerKey) {
+		return new Generator(numKeys, recordsPerKey, false);
+	}
+
+	public static Generator generateInfinitely(long numKeys) {
+		return new Generator(numKeys, 0, true);
+	}
+
+	private Generator(long numKeys, int recordsPerKey, boolean infinite) {
+		this.numKeys = numKeys;
+		if (infinite) {
+			this.numRecords = Long.MAX_VALUE;
+		} else {
+			this.numRecords = numKeys * recordsPerKey;
+		}
+		this.infinite = infinite;
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+		return null;
+	}
+
+	@Override
+	public GenericInputSplit[] createInputSplits(int minNumSplits) {
+
+		GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
+		for (int i = 0; i < minNumSplits; i++) {
+			splits[i] = new GenericInputSplit(i, minNumSplits);
+		}
+		return splits;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) {
+		return new DefaultInputSplitAssigner(inputSplits);
+	}
+
+	@Override
+	public void open(GenericInputSplit split) throws IOException {
+		this.partitionId = split.getSplitNumber();
+		// total number of partitions
+		int numPartitions = split.getTotalNumberOfSplits();
+
+		// ensure even distribution of records and keys
+		Preconditions.checkArgument(
+			numRecords % numPartitions == 0,
+			"Records cannot be evenly distributed among partitions");
+		Preconditions.checkArgument(
+			numKeys % numPartitions == 0,
+			"Keys cannot be evenly distributed among partitions");
+
+		this.recordsPerPartition = numRecords / numPartitions;
+		this.keysPerPartition = numKeys / numPartitions;
+
+		this.recordCnt = 0;
+	}
+
+	@Override
+	public boolean reachedEnd() {
+		return !infinite && this.recordCnt >= this.recordsPerPartition;
+	}
+
+	@Override
+	public Tuple2<String, Integer> nextRecord(Tuple2<String, Integer> reuse) throws IOException {
+
+		// build key from partition id and count per partition
+		String key = String.format(
+			"%d-%d",
+			this.partitionId,
+			this.recordCnt % this.keysPerPartition);
+
+		// 128 values to filter on
+		int filterVal = (int) this.recordCnt % 128;
+
+		this.recordCnt++;
+
+		reuse.f0 = key;
+		reuse.f1 = filterVal;
+		return reuse;
+	}
+
+	@Override
+	public void close() {
+	}
+}
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 960ba894903..d06e80c7315 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -48,10 +48,12 @@ run_test "ConnectedComponents iterations with high parallelism end-to-end test"
 run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb"
 run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"
 
-run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
-run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
-run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"
-run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true"
+run_test "Running HA dataset end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_dataset.sh"
+
+run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file true false"
+run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file false false"
+run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true false"
+run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true true"
 
 run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
 run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
diff --git a/flink-end-to-end-tests/test-scripts/test_ha.sh b/flink-end-to-end-tests/test-scripts/common_ha.sh
old mode 100755
new mode 100644
similarity index 61%
rename from flink-end-to-end-tests/test-scripts/test_ha.sh
rename to flink-end-to-end-tests/test-scripts/common_ha.sh
index a44a1396ebf..f476bac5e66
--- a/flink-end-to-end-tests/test-scripts/test_ha.sh
+++ b/flink-end-to-end-tests/test-scripts/common_ha.sh
@@ -1,4 +1,5 @@
 #!/usr/bin/env bash
+
 ################################################################################
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -17,16 +18,12 @@
 # limitations under the License.
 ################################################################################
 
-source "$(dirname "$0")"/common.sh
-
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
 
 JM_WATCHDOG_PID=0
 TM_WATCHDOG_PID=0
 
-# flag indicating if we have already cleared up things after a test
-CLEARED=0
-
 function stop_cluster_and_watchdog() {
     if [ ${CLEARED} -eq 0 ]; then
 
@@ -50,23 +47,26 @@ function verify_logs() {
     local OUTPUT=$FLINK_DIR/log/*.out
     local JM_FAILURES=$1
     local EXIT_CODE=0
+    local VERIFY_CHECKPOINTS=$2
 
     # verify that we have no alerts
     if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
-        echo "FAILURE: Alerts found at the general purpose DataStream job."
+        echo "FAILURE: Alerts found at the general purpose job."
         EXIT_CODE=1
     fi
 
     # checks that all apart from the first JM recover the failed jobgraph.
-    if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then
+    if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then
         echo "FAILURE: A JM did not take over."
         EXIT_CODE=1
     fi
 
+    if [ "$VERIFY_CHECKPOINTS" = true ]; then
     # search the logs for JMs that log completed checkpoints
-    if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then
-        echo "FAILURE: A JM did not execute the job."
-        EXIT_CODE=1
+        if ! [ `grep -r --include '*standalonesession*.log' 'Completed checkpoint' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then
+            echo "FAILURE: A JM did not execute the job."
+            EXIT_CODE=1
+        fi
     fi
 
     if [[ $EXIT_CODE != 0 ]]; then
@@ -85,10 +85,16 @@ function jm_watchdog() {
         for (( c=0; c<MISSING_JMS; c++ )); do
             "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${IP_PORT}
         done
-        sleep 5;
+        sleep 1;
     done
 }
 
+function start_ha_jm_watchdog() {
+    jm_watchdog $1 $2 &
+    JM_WATCHDOG_PID=$!
+    echo "Running JM watchdog @ ${JM_WATCHDOG_PID}"
+}
+
 function kill_jm {
     local JM_PIDS=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
     local JM_PIDS=(${JM_PIDS[@]})
@@ -98,7 +104,8 @@ function kill_jm {
     echo "Killed JM @ ${PID}"
 }
 
-function tm_watchdog() {
+# ha prefix to differentiate from the one in common.sh
+function ha_tm_watchdog() {
     local JOB_ID=$1
     local EXPECTED_TMS=$2
 
@@ -146,74 +153,9 @@ function tm_watchdog() {
     done
 }
 
-function run_ha_test() {
-    local PARALLELISM=$1
-    local BACKEND=$2
-    local ASYNC=$3
-    local INCREM=$4
-
-    local JM_KILLS=3
-    local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/"
-
-    CLEARED=0
-
-    # start the cluster on HA mode
-    start_ha_cluster
-
-    echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
-
-    # submit a job in detached mode and let it run
-    local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
-     $TEST_PROGRAM_JAR \
-        --environment.parallelism ${PARALLELISM} \
-        --test.semantics exactly-once \
-        --test.simulate_failure true \
-        --test.simulate_failure.num_records 200 \
-        --test.simulate_failure.num_checkpoints 1 \
-        --test.simulate_failure.max_failures 20 \
-        --state_backend ${BACKEND} \
-        --state_backend.checkpoint_directory "file://${CHECKPOINT_DIR}" \
-        --state_backend.file.async ${ASYNC} \
-        --state_backend.rocks.incremental ${INCREM} \
-        --sequence_generator_source.sleep_time 15 \
-        --sequence_generator_source.sleep_after_elements 1 \
-        | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-    wait_job_running ${JOB_ID}
-
-    # start the watchdog that keeps the number of JMs stable
-    jm_watchdog 1 "8081" &
-    JM_WATCHDOG_PID=$!
-    echo "Running JM watchdog @ ${JM_WATCHDOG_PID}"
-
-    sleep 5
-
-    # start the watchdog that keeps the number of TMs stable
-    tm_watchdog ${JOB_ID} 1 &
+function start_ha_tm_watchdog() {
+    ha_tm_watchdog $1 $2 &
     TM_WATCHDOG_PID=$!
     echo "Running TM watchdog @ ${TM_WATCHDOG_PID}"
-
-    # let the job run for a while to take some checkpoints
-    sleep 20
-
-    for (( c=0; c<${JM_KILLS}; c++ )); do
-        # kill the JM and wait for watchdog to
-        # create a new one which will take over
-        kill_jm
-        sleep 60
-    done
-
-    verify_logs ${JM_KILLS}
-
-    # kill the cluster and zookeeper
-    stop_cluster_and_watchdog
 }
 
-trap stop_cluster_and_watchdog INT
-trap stop_cluster_and_watchdog EXIT
-
-STATE_BACKEND_TYPE=${1:-file}
-STATE_BACKEND_FILE_ASYNC=${2:-true}
-STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false}
-
-run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} ${STATE_BACKEND_ROCKS_INCREMENTAL}
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
new file mode 100755
index 00000000000..716b80cc775
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/common_ha.sh
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+function run_ha_test() {
+    local PARALLELISM=$1
+
+    local JM_KILLS=3
+
+    CLEARED=0
+    mkdir -p ${TEST_DATA_DIR}/control
+    touch ${TEST_DATA_DIR}/control/test.txt
+
+    # start the cluster on HA mode
+    start_ha_cluster
+
+    echo "Running on HA mode: parallelism=${PARALLELISM}."
+
+    # submit a job in detached mode and let it run
+    local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
+        $TEST_PROGRAM_JAR \
+        --loadFactor 4 \
+        --outputPath $TEST_DATA_DIR/out/dataset_allround \
+        --source true \
+        | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+    wait_job_running ${JOB_ID}
+
+    # start the watchdog that keeps the number of JMs stable
+    start_ha_jm_watchdog 1 "8081"
+
+    for (( c=0; c<${JM_KILLS}; c++ )); do
+        # kill the JM and wait for watchdog to
+        # create a new one which will take over
+        kill_jm
+        wait_job_running ${JOB_ID}
+    done
+
+    cancel_job ${JOB_ID}
+
+    # do not verify checkpoints in the logs
+    verify_logs ${JM_KILLS} false
+
+    # kill the cluster and zookeeper
+    stop_cluster_and_watchdog
+}
+
+trap stop_cluster_and_watchdog INT
+trap stop_cluster_and_watchdog EXIT
+
+run_ha_test 4
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
new file mode 100755
index 00000000000..862055e2173
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/common_ha.sh
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+
+function run_ha_test() {
+    local PARALLELISM=$1
+    local BACKEND=$2
+    local ASYNC=$3
+    local INCREM=$4
+
+    local JM_KILLS=3
+    local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/"
+
+    CLEARED=0
+
+    # start the cluster on HA mode
+    start_ha_cluster
+
+    echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
+
+    # submit a job in detached mode and let it run
+    local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
+     $TEST_PROGRAM_JAR \
+        --environment.parallelism ${PARALLELISM} \
+        --test.semantics exactly-once \
+        --test.simulate_failure true \
+        --test.simulate_failure.num_records 200 \
+        --test.simulate_failure.num_checkpoints 1 \
+        --test.simulate_failure.max_failures 20 \
+        --state_backend ${BACKEND} \
+        --state_backend.checkpoint_directory "file://${CHECKPOINT_DIR}" \
+        --state_backend.file.async ${ASYNC} \
+        --state_backend.rocks.incremental ${INCREM} \
+        --sequence_generator_source.sleep_time 15 \
+        --sequence_generator_source.sleep_after_elements 1 \
+        | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+    wait_job_running ${JOB_ID}
+
+    # start the watchdog that keeps the number of JMs stable
+    start_ha_jm_watchdog 1 "8081"
+
+    sleep 5
+
+    # start the watchdog that keeps the number of TMs stable
+    start_ha_tm_watchdog ${JOB_ID} 1
+
+    # let the job run for a while to take some checkpoints
+    sleep 20
+
+    for (( c=0; c<${JM_KILLS}; c++ )); do
+        # kill the JM and wait for watchdog to
+        # create a new one which will take over
+        kill_jm
+        # let the job start and take some checkpoints
+        sleep 60
+    done
+
+    # verify checkpoints in the logs
+    verify_logs ${JM_KILLS} true
+
+    # kill the cluster and zookeeper
+    stop_cluster_and_watchdog
+}
+
+trap stop_cluster_and_watchdog INT
+trap stop_cluster_and_watchdog EXIT
+
+STATE_BACKEND_TYPE=${1:-file}
+STATE_BACKEND_FILE_ASYNC=${2:-true}
+STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false}
+
+run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} ${STATE_BACKEND_ROCKS_INCREMENTAL}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services