You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/07/23 19:27:52 UTC

[flink] branch release-1.9 updated (22538cc -> 6a79ab2)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 22538cc  [FLINK-13266][table] Port function-related descriptors to flink-table-common
     new 746c165  [hotfix][tests] Declare variables in start_taskmanagers() local
     new 470975d  [hotfix][tests] Improve wait_job_terminal_state() function
     new 6a79ab2  [FLINK-13145][runtime, tests] Enable fine grained failover in E2E HA data set test

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pom.xml                                        |  28 ++++-
 .../tests/BlockingIncrementingMapFunction.java     |  60 ++++++++++
 .../DataSetFineGrainedRecoveryTestProgram.java     |  48 ++++++++
 .../batch/tests/util/FileBasedOneShotLatch.java    | 129 +++++++++++++++++++++
 .../tests/util/FileBasedOneShotLatchTest.java      |  92 +++++++++++++++
 flink-end-to-end-tests/pom.xml                     |   1 +
 flink-end-to-end-tests/test-scripts/common.sh      |  20 +++-
 .../test-scripts/test_ha_dataset.sh                |  59 +++++++---
 8 files changed, 411 insertions(+), 26 deletions(-)
 copy flink-end-to-end-tests/{flink-dataset-allround-test => flink-dataset-fine-grained-recovery-test}/pom.xml (72%)
 create mode 100644 flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java
 create mode 100644 flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java
 create mode 100644 flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java
 create mode 100644 flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java


[flink] 02/03: [hotfix][tests] Improve wait_job_terminal_state() function

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 470975d424923487829059e95aa7bbd267b0f0ac
Author: Gary Yao <ga...@apache.org>
AuthorDate: Tue Jul 9 15:05:03 2019 +0200

    [hotfix][tests] Improve wait_job_terminal_state() function
    
    Always terminate script if job is in globally terminal state. Assert that
    expected terminal state matches actual state.
---
 flink-end-to-end-tests/test-scripts/common.sh | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 10061d1..714150c 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -450,17 +450,23 @@ function wait_job_running {
 
 function wait_job_terminal_state {
   local job=$1
-  local terminal_state=$2
+  local expected_terminal_state=$2
 
-  echo "Waiting for job ($job) to reach terminal state $terminal_state ..."
+  echo "Waiting for job ($job) to reach terminal state $expected_terminal_state ..."
 
   while : ; do
-    N=$(grep -o "Job $job reached globally terminal state $terminal_state" $FLINK_DIR/log/*standalonesession*.log | tail -1 || true)
-
+    local N=$(grep -o "Job $job reached globally terminal state .*" $FLINK_DIR/log/*standalonesession*.log | tail -1 || true)
     if [[ -z $N ]]; then
       sleep 1
     else
-      break
+      local actual_terminal_state=$(echo $N | sed -n 's/.*state \([A-Z]*\).*/\1/p')
+      if [[ -z $expected_terminal_state ]] || [[ "$expected_terminal_state" == "$actual_terminal_state" ]]; then
+        echo "Job ($job) reached terminal state $actual_terminal_state"
+        break
+      else
+        echo "Job ($job) is in state $actual_terminal_state but expected $expected_terminal_state"
+        exit 1
+      fi
     fi
   done
 }


[flink] 01/03: [hotfix][tests] Declare variables in start_taskmanagers() local

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 746c165c4fa4561f699fadce11589a8d8cb4ab13
Author: Gary Yao <ga...@apache.org>
AuthorDate: Tue Jul 9 10:51:04 2019 +0200

    [hotfix][tests] Declare variables in start_taskmanagers() local
---
 flink-end-to-end-tests/test-scripts/common.sh | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index a4fff94..10061d1 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -260,7 +260,9 @@ function start_cluster {
 }
 
 function start_taskmanagers {
-    tmnum=$1
+    local tmnum=$1
+    local c
+
     echo "Start ${tmnum} more task managers"
     for (( c=0; c<tmnum; c++ ))
     do


[flink] 03/03: [FLINK-13145][runtime, tests] Enable fine grained failover in E2E HA data set test

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6a79ab2549e58623d9116b4dce31e3a83df8f795
Author: Gary Yao <ga...@apache.org>
AuthorDate: Tue Jul 9 15:21:59 2019 +0200

    [FLINK-13145][runtime, tests] Enable fine grained failover in E2E HA data set test
    
    - Introduce new test job (DataSetFineGrainedRecoveryTestProgram), which waits
      for an  external file to be created before finishing.
    - Introduce killing of TMs to HA data set test.
    - Reduce JM kills to 2.
    - Reduce heartbeat interval and timeout to speed up TM loss detection.
---
 .../pom.xml                                        |  89 ++++++++++++++
 .../tests/BlockingIncrementingMapFunction.java     |  60 ++++++++++
 .../DataSetFineGrainedRecoveryTestProgram.java     |  48 ++++++++
 .../batch/tests/util/FileBasedOneShotLatch.java    | 129 +++++++++++++++++++++
 .../tests/util/FileBasedOneShotLatchTest.java      |  92 +++++++++++++++
 flink-end-to-end-tests/pom.xml                     |   1 +
 .../test-scripts/test_ha_dataset.sh                |  59 +++++++---
 7 files changed, 463 insertions(+), 15 deletions(-)

diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml
new file mode 100644
index 0000000..653ab3c
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-dataset-fine-grained-recovery-test</artifactId>
+	<name>flink-dataset-fine-grained-recovery-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>dataset-fine-grained-recovery</id>
+						<phase>test</phase>
+						<goals>
+							<goal>test</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>**/*Test.*</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>DataSetFineGrainedRecoveryTestProgram</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>DataSetFineGrainedRecoveryTestProgram</finalName>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.batch.tests.DataSetFineGrainedRecoveryTestProgram</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java
new file mode 100644
index 0000000..c5d92b8
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.batch.tests.util.FileBasedOneShotLatch;
+import org.apache.flink.configuration.Configuration;
+
+import java.nio.file.Paths;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A map function that increments values by one.
+ *
+ * <p>Processing of elements is held until a latch file is created.
+ */
+public class BlockingIncrementingMapFunction extends RichMapFunction<Long, Long> {
+
+	private final String latchFilePath;
+
+	private transient FileBasedOneShotLatch latch;
+
+	public BlockingIncrementingMapFunction(final String latchFilePath) {
+		this.latchFilePath = checkNotNull(latchFilePath);
+	}
+
+	@Override
+	public void open(final Configuration parameters) {
+		latch = new FileBasedOneShotLatch(Paths.get(latchFilePath));
+	}
+
+	@Override
+	public void close() throws Exception {
+		latch.close();
+	}
+
+	@Override
+	public Long map(final Long value) throws InterruptedException {
+		latch.await();
+		return value + 1;
+	}
+}
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java
new file mode 100644
index 0000000..749d232
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+
+/**
+ * Program to test fine grained recovery.
+ */
+public class DataSetFineGrainedRecoveryTestProgram {
+
+	public static void main(String[] args) throws Exception {
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		final String latchFilePath = params.getRequired("latchFilePath");
+		final String outputPath = params.getRequired("outputPath");
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
+		env.setParallelism(4);
+
+		env.generateSequence(0, 1000)
+			.map(new BlockingIncrementingMapFunction(latchFilePath))
+			.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE)
+			.setParallelism(1);
+
+		env.execute();
+	}
+}
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java
new file mode 100644
index 0000000..dee2414
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests.util;
+
+import com.sun.nio.file.SensitivityWatchEventModifier;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A synchronization aid that allows a single thread to wait on the creation of a specified file.
+ */
+@NotThreadSafe
+public class FileBasedOneShotLatch implements Closeable {
+
+	private final Path latchFile;
+
+	private final WatchService watchService;
+
+	private boolean released;
+
+	public FileBasedOneShotLatch(final Path latchFile) {
+		this.latchFile = checkNotNull(latchFile);
+
+		final Path parentDir = checkNotNull(latchFile.getParent(), "latchFile must have a parent");
+		this.watchService = initWatchService(parentDir);
+	}
+
+	private static WatchService initWatchService(final Path parentDir) {
+		final WatchService watchService = createWatchService(parentDir);
+		watchForLatchFile(watchService, parentDir);
+		return watchService;
+	}
+
+	private static WatchService createWatchService(final Path parentDir) {
+		try {
+			return parentDir.getFileSystem().newWatchService();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	private static void watchForLatchFile(final WatchService watchService, final Path parentDir) {
+		try {
+			parentDir.register(
+				watchService,
+				new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE},
+				SensitivityWatchEventModifier.HIGH);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	/**
+	 * Waits until the latch file is created.
+	 *
+	 * <p>When this method returns, subsequent invocations will not block even after the latch file
+	 * is deleted. Note that this method may not return if the latch file is deleted before this
+	 * method returns.
+	 *
+	 * @throws InterruptedException if interrupted while waiting
+	 */
+	public void await() throws InterruptedException {
+		if (isReleasedOrReleasable()) {
+			return;
+		}
+
+		awaitLatchFile(watchService);
+	}
+
+	private void awaitLatchFile(final WatchService watchService) throws InterruptedException {
+		while (true) {
+			WatchKey watchKey = watchService.take();
+			if (isReleasedOrReleasable()) {
+				break;
+			}
+			watchKey.reset();
+		}
+	}
+
+	private boolean isReleasedOrReleasable() {
+		if (released) {
+			return true;
+		}
+
+		if (Files.exists(latchFile)) {
+			releaseLatch();
+			return true;
+		}
+
+		return false;
+	}
+
+	private void releaseLatch() {
+		released = true;
+	}
+
+	@Override
+	public void close() throws IOException {
+		watchService.close();
+	}
+}
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java
new file mode 100644
index 0000000..42d5cff
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests.util;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link FileBasedOneShotLatch}.
+ */
+public class FileBasedOneShotLatchTest {
+
+	@Rule
+	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	private FileBasedOneShotLatch latch;
+
+	private File latchFile;
+
+	@Before
+	public void setUp() {
+		latchFile = new File(temporaryFolder.getRoot(), "latchFile");
+		latch = new FileBasedOneShotLatch(latchFile.toPath());
+	}
+
+	@Test
+	public void awaitReturnsWhenFileIsCreated() throws Exception {
+		final AtomicBoolean awaitCompleted = new AtomicBoolean();
+		final Thread thread = new Thread(() -> {
+			try {
+				latch.await();
+				awaitCompleted.set(true);
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+			}
+		});
+		thread.start();
+
+		latchFile.createNewFile();
+		thread.join();
+
+		assertTrue(awaitCompleted.get());
+	}
+
+	@Test
+	public void subsequentAwaitDoesNotBlock() throws Exception {
+		latchFile.createNewFile();
+		latch.await();
+		latch.await();
+	}
+
+	@Test
+	public void subsequentAwaitDoesNotBlockEvenIfLatchFileIsDeleted() throws Exception {
+		latchFile.createNewFile();
+		latch.await();
+
+		latchFile.delete();
+		latch.await();
+	}
+
+	@Test
+	public void doesNotBlockIfFileExistsPriorToCreatingLatch() throws Exception {
+		latchFile.createNewFile();
+
+		final FileBasedOneShotLatch latch = new FileBasedOneShotLatch(latchFile.toPath());
+		latch.await();
+	}
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index baea71d..d3dd283 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -39,6 +39,7 @@ under the License.
 		<module>flink-parent-child-classloading-test-program</module>
 		<module>flink-parent-child-classloading-test-lib-package</module>
 		<module>flink-dataset-allround-test</module>
+		<module>flink-dataset-fine-grained-recovery-test</module>
 		<module>flink-datastream-allround-test</module>
 		<module>flink-stream-sql-test</module>
 		<module>flink-bucketing-sink-test</module>
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
index b619e16..b547142 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
@@ -20,7 +20,7 @@
 source "$(dirname "$0")"/common.sh
 source "$(dirname "$0")"/common_ha.sh
 
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-fine-grained-recovery-test/target/DataSetFineGrainedRecoveryTestProgram.jar
 
 function ha_cleanup() {
   # kill the cluster and zookeeper
@@ -32,30 +32,26 @@ on_exit ha_cleanup
 function run_ha_test() {
     local PARALLELISM=$1
 
-    local JM_KILLS=3
+    local JM_KILLS=2
+    local TM_KILLS=2
 
-    CLEARED=0
-    mkdir -p ${TEST_DATA_DIR}/control
-    touch ${TEST_DATA_DIR}/control/test.txt
+    local LATCH_FILE_PATH=$TEST_DATA_DIR/latchFile
 
-    # start the cluster on HA mode
-    start_ha_cluster
+    CLEARED=0
 
+    setup_and_start_cluster ${PARALLELISM}
     echo "Running on HA mode: parallelism=${PARALLELISM}."
 
     # submit a job in detached mode and let it run
     local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
         $TEST_PROGRAM_JAR \
-        --loadFactor 4 \
+        --latchFilePath $LATCH_FILE_PATH \
         --outputPath $TEST_DATA_DIR/out/dataset_allround \
-        --source true \
         | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
     wait_job_running ${JOB_ID}
 
-    # start the watchdog that keeps the number of JMs stable
-    start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081"
-
+    local c
     for (( c=0; c<${JM_KILLS}; c++ )); do
         # kill the JM and wait for watchdog to
         # create a new one which will take over
@@ -63,10 +59,43 @@ function run_ha_test() {
         wait_job_running ${JOB_ID}
     done
 
-    cancel_job ${JOB_ID}
+    for (( c=0; c<${TM_KILLS}; c++ )); do
+        kill_and_replace_random_task_manager
+    done
+
+    touch ${LATCH_FILE_PATH}
+
+    wait_job_terminal_state ${JOB_ID} "FINISHED"
+    check_result_hash "DataSet-FineGrainedRecovery-Test" $TEST_DATA_DIR/out/dataset_allround "ac3d26e1afce19aa657527f000acb88b"
+}
+
+function setup_and_start_cluster() {
+    local NUM_TASK_MANAGERS=$1
+
+    create_ha_config
+
+    set_config_key "jobmanager.execution.failover-strategy" "region"
+    set_config_key "jobmanager.scheduler.partition.force-release-on-consumption" "false"
+    set_config_key "taskmanager.numberOfTaskSlots" "1"
+
+    set_config_key "restart-strategy" "fixed-delay"
+    set_config_key "restart-strategy.fixed-delay.attempts" "2147483647"
+
+    set_config_key "heartbeat.interval" "2000"
+    set_config_key "heartbeat.timeout" "10000"
+
+    start_local_zk
+    start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081"
+    start_taskmanagers ${NUM_TASK_MANAGERS}
+}
+
+function kill_and_replace_random_task_manager() {
+    local NUM_TASK_MANAGERS=$(query_number_of_running_tms)
 
-    # do not verify checkpoints in the logs
-    verify_logs ${JM_KILLS} false
+    kill_random_taskmanager
+    wait_for_number_of_running_tms $(( ${NUM_TASK_MANAGERS} - 1 ))
+    start_taskmanagers 1
+    wait_for_number_of_running_tms ${NUM_TASK_MANAGERS}
 }
 
 run_ha_test 4