You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/07/23 19:25:37 UTC

[flink] 03/03: [FLINK-13145][runtime, tests] Enable fine grained failover in E2E HA data set test

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1c653ceb25b456a0abe65b22b2eada17ba2bed53
Author: Gary Yao <ga...@apache.org>
AuthorDate: Tue Jul 9 15:21:59 2019 +0200

    [FLINK-13145][runtime, tests] Enable fine grained failover in E2E HA data set test
    
    - Introduce new test job (DataSetFineGrainedRecoveryTestProgram), which waits
      for an  external file to be created before finishing.
    - Introduce killing of TMs to HA data set test.
    - Reduce JM kills to 2.
    - Reduce heartbeat interval and timeout to speed up TM loss detection.
    
    This closes #9060.
---
 .../pom.xml                                        |  89 ++++++++++++++
 .../tests/BlockingIncrementingMapFunction.java     |  60 ++++++++++
 .../DataSetFineGrainedRecoveryTestProgram.java     |  48 ++++++++
 .../batch/tests/util/FileBasedOneShotLatch.java    | 129 +++++++++++++++++++++
 .../tests/util/FileBasedOneShotLatchTest.java      |  92 +++++++++++++++
 flink-end-to-end-tests/pom.xml                     |   1 +
 .../test-scripts/test_ha_dataset.sh                |  59 +++++++---
 7 files changed, 463 insertions(+), 15 deletions(-)

diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml
new file mode 100644
index 0000000..ecde754
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-dataset-fine-grained-recovery-test</artifactId>
+	<name>flink-dataset-fine-grained-recovery-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>dataset-fine-grained-recovery</id>
+						<phase>test</phase>
+						<goals>
+							<goal>test</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>**/*Test.*</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>DataSetFineGrainedRecoveryTestProgram</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>DataSetFineGrainedRecoveryTestProgram</finalName>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.batch.tests.DataSetFineGrainedRecoveryTestProgram</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java
new file mode 100644
index 0000000..c5d92b8
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.batch.tests.util.FileBasedOneShotLatch;
+import org.apache.flink.configuration.Configuration;
+
+import java.nio.file.Paths;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A map function that increments values by one.
+ *
+ * <p>Processing of elements is held until a latch file is created.
+ */
+public class BlockingIncrementingMapFunction extends RichMapFunction<Long, Long> {
+
+	private final String latchFilePath;
+
+	private transient FileBasedOneShotLatch latch;
+
+	public BlockingIncrementingMapFunction(final String latchFilePath) {
+		this.latchFilePath = checkNotNull(latchFilePath);
+	}
+
+	@Override
+	public void open(final Configuration parameters) {
+		latch = new FileBasedOneShotLatch(Paths.get(latchFilePath));
+	}
+
+	@Override
+	public void close() throws Exception {
+		latch.close();
+	}
+
+	@Override
+	public Long map(final Long value) throws InterruptedException {
+		latch.await();
+		return value + 1;
+	}
+}
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java
new file mode 100644
index 0000000..749d232
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+
+/**
+ * Program to test fine grained recovery.
+ */
+public class DataSetFineGrainedRecoveryTestProgram {
+
+	public static void main(String[] args) throws Exception {
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		final String latchFilePath = params.getRequired("latchFilePath");
+		final String outputPath = params.getRequired("outputPath");
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
+		env.setParallelism(4);
+
+		env.generateSequence(0, 1000)
+			.map(new BlockingIncrementingMapFunction(latchFilePath))
+			.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE)
+			.setParallelism(1);
+
+		env.execute();
+	}
+}
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java
new file mode 100644
index 0000000..dee2414
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests.util;
+
+import com.sun.nio.file.SensitivityWatchEventModifier;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A synchronization aid that allows a single thread to wait on the creation of a specified file.
+ */
+@NotThreadSafe
+public class FileBasedOneShotLatch implements Closeable {
+
+	private final Path latchFile;
+
+	private final WatchService watchService;
+
+	private boolean released;
+
+	public FileBasedOneShotLatch(final Path latchFile) {
+		this.latchFile = checkNotNull(latchFile);
+
+		final Path parentDir = checkNotNull(latchFile.getParent(), "latchFile must have a parent");
+		this.watchService = initWatchService(parentDir);
+	}
+
+	private static WatchService initWatchService(final Path parentDir) {
+		final WatchService watchService = createWatchService(parentDir);
+		watchForLatchFile(watchService, parentDir);
+		return watchService;
+	}
+
+	private static WatchService createWatchService(final Path parentDir) {
+		try {
+			return parentDir.getFileSystem().newWatchService();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	private static void watchForLatchFile(final WatchService watchService, final Path parentDir) {
+		try {
+			parentDir.register(
+				watchService,
+				new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE},
+				SensitivityWatchEventModifier.HIGH);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	/**
+	 * Waits until the latch file is created.
+	 *
+	 * <p>When this method returns, subsequent invocations will not block even after the latch file
+	 * is deleted. Note that this method may not return if the latch file is deleted before this
+	 * method returns.
+	 *
+	 * @throws InterruptedException if interrupted while waiting
+	 */
+	public void await() throws InterruptedException {
+		if (isReleasedOrReleasable()) {
+			return;
+		}
+
+		awaitLatchFile(watchService);
+	}
+
+	private void awaitLatchFile(final WatchService watchService) throws InterruptedException {
+		while (true) {
+			WatchKey watchKey = watchService.take();
+			if (isReleasedOrReleasable()) {
+				break;
+			}
+			watchKey.reset();
+		}
+	}
+
+	private boolean isReleasedOrReleasable() {
+		if (released) {
+			return true;
+		}
+
+		if (Files.exists(latchFile)) {
+			releaseLatch();
+			return true;
+		}
+
+		return false;
+	}
+
+	private void releaseLatch() {
+		released = true;
+	}
+
+	@Override
+	public void close() throws IOException {
+		watchService.close();
+	}
+}
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java
new file mode 100644
index 0000000..42d5cff
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests.util;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link FileBasedOneShotLatch}.
+ */
+public class FileBasedOneShotLatchTest {
+
+	@Rule
+	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	private FileBasedOneShotLatch latch;
+
+	private File latchFile;
+
+	@Before
+	public void setUp() {
+		latchFile = new File(temporaryFolder.getRoot(), "latchFile");
+		latch = new FileBasedOneShotLatch(latchFile.toPath());
+	}
+
+	@Test
+	public void awaitReturnsWhenFileIsCreated() throws Exception {
+		final AtomicBoolean awaitCompleted = new AtomicBoolean();
+		final Thread thread = new Thread(() -> {
+			try {
+				latch.await();
+				awaitCompleted.set(true);
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+			}
+		});
+		thread.start();
+
+		latchFile.createNewFile();
+		thread.join();
+
+		assertTrue(awaitCompleted.get());
+	}
+
+	@Test
+	public void subsequentAwaitDoesNotBlock() throws Exception {
+		latchFile.createNewFile();
+		latch.await();
+		latch.await();
+	}
+
+	@Test
+	public void subsequentAwaitDoesNotBlockEvenIfLatchFileIsDeleted() throws Exception {
+		latchFile.createNewFile();
+		latch.await();
+
+		latchFile.delete();
+		latch.await();
+	}
+
+	@Test
+	public void doesNotBlockIfFileExistsPriorToCreatingLatch() throws Exception {
+		latchFile.createNewFile();
+
+		final FileBasedOneShotLatch latch = new FileBasedOneShotLatch(latchFile.toPath());
+		latch.await();
+	}
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 46bfde2..d676291 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -39,6 +39,7 @@ under the License.
 		<module>flink-parent-child-classloading-test-program</module>
 		<module>flink-parent-child-classloading-test-lib-package</module>
 		<module>flink-dataset-allround-test</module>
+		<module>flink-dataset-fine-grained-recovery-test</module>
 		<module>flink-datastream-allround-test</module>
 		<module>flink-stream-sql-test</module>
 		<module>flink-bucketing-sink-test</module>
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
index b619e16..b547142 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
@@ -20,7 +20,7 @@
 source "$(dirname "$0")"/common.sh
 source "$(dirname "$0")"/common_ha.sh
 
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-fine-grained-recovery-test/target/DataSetFineGrainedRecoveryTestProgram.jar
 
 function ha_cleanup() {
   # kill the cluster and zookeeper
@@ -32,30 +32,26 @@ on_exit ha_cleanup
 function run_ha_test() {
     local PARALLELISM=$1
 
-    local JM_KILLS=3
+    local JM_KILLS=2
+    local TM_KILLS=2
 
-    CLEARED=0
-    mkdir -p ${TEST_DATA_DIR}/control
-    touch ${TEST_DATA_DIR}/control/test.txt
+    local LATCH_FILE_PATH=$TEST_DATA_DIR/latchFile
 
-    # start the cluster on HA mode
-    start_ha_cluster
+    CLEARED=0
 
+    setup_and_start_cluster ${PARALLELISM}
     echo "Running on HA mode: parallelism=${PARALLELISM}."
 
     # submit a job in detached mode and let it run
     local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
         $TEST_PROGRAM_JAR \
-        --loadFactor 4 \
+        --latchFilePath $LATCH_FILE_PATH \
         --outputPath $TEST_DATA_DIR/out/dataset_allround \
-        --source true \
         | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
     wait_job_running ${JOB_ID}
 
-    # start the watchdog that keeps the number of JMs stable
-    start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081"
-
+    local c
     for (( c=0; c<${JM_KILLS}; c++ )); do
         # kill the JM and wait for watchdog to
         # create a new one which will take over
@@ -63,10 +59,43 @@ function run_ha_test() {
         wait_job_running ${JOB_ID}
     done
 
-    cancel_job ${JOB_ID}
+    for (( c=0; c<${TM_KILLS}; c++ )); do
+        kill_and_replace_random_task_manager
+    done
+
+    touch ${LATCH_FILE_PATH}
+
+    wait_job_terminal_state ${JOB_ID} "FINISHED"
+    check_result_hash "DataSet-FineGrainedRecovery-Test" $TEST_DATA_DIR/out/dataset_allround "ac3d26e1afce19aa657527f000acb88b"
+}
+
+function setup_and_start_cluster() {
+    local NUM_TASK_MANAGERS=$1
+
+    create_ha_config
+
+    set_config_key "jobmanager.execution.failover-strategy" "region"
+    set_config_key "jobmanager.scheduler.partition.force-release-on-consumption" "false"
+    set_config_key "taskmanager.numberOfTaskSlots" "1"
+
+    set_config_key "restart-strategy" "fixed-delay"
+    set_config_key "restart-strategy.fixed-delay.attempts" "2147483647"
+
+    set_config_key "heartbeat.interval" "2000"
+    set_config_key "heartbeat.timeout" "10000"
+
+    start_local_zk
+    start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081"
+    start_taskmanagers ${NUM_TASK_MANAGERS}
+}
+
+function kill_and_replace_random_task_manager() {
+    local NUM_TASK_MANAGERS=$(query_number_of_running_tms)
 
-    # do not verify checkpoints in the logs
-    verify_logs ${JM_KILLS} false
+    kill_random_taskmanager
+    wait_for_number_of_running_tms $(( ${NUM_TASK_MANAGERS} - 1 ))
+    start_taskmanagers 1
+    wait_for_number_of_running_tms ${NUM_TASK_MANAGERS}
 }
 
 run_ha_test 4