You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/07/23 19:25:37 UTC
[flink] 03/03: [FLINK-13145][runtime,
tests] Enable fine grained failover in E2E HA data set test
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1c653ceb25b456a0abe65b22b2eada17ba2bed53
Author: Gary Yao <ga...@apache.org>
AuthorDate: Tue Jul 9 15:21:59 2019 +0200
[FLINK-13145][runtime, tests] Enable fine grained failover in E2E HA data set test
- Introduce new test job (DataSetFineGrainedRecoveryTestProgram), which waits
for an external file to be created before finishing.
- Introduce killing of TMs to HA data set test.
- Reduce JM kills to 2.
- Reduce heartbeat interval and timeout to speed up TM loss detection.
This closes #9060.
---
.../pom.xml | 89 ++++++++++++++
.../tests/BlockingIncrementingMapFunction.java | 60 ++++++++++
.../DataSetFineGrainedRecoveryTestProgram.java | 48 ++++++++
.../batch/tests/util/FileBasedOneShotLatch.java | 129 +++++++++++++++++++++
.../tests/util/FileBasedOneShotLatchTest.java | 92 +++++++++++++++
flink-end-to-end-tests/pom.xml | 1 +
.../test-scripts/test_ha_dataset.sh | 59 +++++++---
7 files changed, 463 insertions(+), 15 deletions(-)
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml
new file mode 100644
index 0000000..ecde754
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <version>1.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-dataset-fine-grained-recovery-test</artifactId>
+ <name>flink-dataset-fine-grained-recovery-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>dataset-fine-grained-recovery</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>**/*Test.*</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>DataSetFineGrainedRecoveryTestProgram</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>DataSetFineGrainedRecoveryTestProgram</finalName>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.batch.tests.DataSetFineGrainedRecoveryTestProgram</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java
new file mode 100644
index 0000000..c5d92b8
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.batch.tests.util.FileBasedOneShotLatch;
+import org.apache.flink.configuration.Configuration;
+
+import java.nio.file.Paths;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A map function that increments values by one.
+ *
+ * <p>Processing of elements is held until a latch file is created.
+ */
+public class BlockingIncrementingMapFunction extends RichMapFunction<Long, Long> {
+
+ private final String latchFilePath;
+
+ private transient FileBasedOneShotLatch latch;
+
+ public BlockingIncrementingMapFunction(final String latchFilePath) {
+ this.latchFilePath = checkNotNull(latchFilePath);
+ }
+
+ @Override
+ public void open(final Configuration parameters) {
+ latch = new FileBasedOneShotLatch(Paths.get(latchFilePath));
+ }
+
+ @Override
+ public void close() throws Exception {
+ latch.close();
+ }
+
+ @Override
+ public Long map(final Long value) throws InterruptedException {
+ latch.await();
+ return value + 1;
+ }
+}
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java
new file mode 100644
index 0000000..749d232
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+
+/**
+ * Program to test fine grained recovery.
+ */
+public class DataSetFineGrainedRecoveryTestProgram {
+
+ public static void main(String[] args) throws Exception {
+ final ParameterTool params = ParameterTool.fromArgs(args);
+ final String latchFilePath = params.getRequired("latchFilePath");
+ final String outputPath = params.getRequired("outputPath");
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
+ env.setParallelism(4);
+
+ env.generateSequence(0, 1000)
+ .map(new BlockingIncrementingMapFunction(latchFilePath))
+ .writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(1);
+
+ env.execute();
+ }
+}
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java
new file mode 100644
index 0000000..dee2414
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests.util;
+
+import com.sun.nio.file.SensitivityWatchEventModifier;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A synchronization aid that allows a single thread to wait on the creation of a specified file.
+ */
+@NotThreadSafe
+public class FileBasedOneShotLatch implements Closeable {
+
+ private final Path latchFile;
+
+ private final WatchService watchService;
+
+ private boolean released;
+
+ public FileBasedOneShotLatch(final Path latchFile) {
+ this.latchFile = checkNotNull(latchFile);
+
+ final Path parentDir = checkNotNull(latchFile.getParent(), "latchFile must have a parent");
+ this.watchService = initWatchService(parentDir);
+ }
+
+ private static WatchService initWatchService(final Path parentDir) {
+ final WatchService watchService = createWatchService(parentDir);
+ watchForLatchFile(watchService, parentDir);
+ return watchService;
+ }
+
+ private static WatchService createWatchService(final Path parentDir) {
+ try {
+ return parentDir.getFileSystem().newWatchService();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void watchForLatchFile(final WatchService watchService, final Path parentDir) {
+ try {
+ parentDir.register(
+ watchService,
+ new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE},
+ SensitivityWatchEventModifier.HIGH);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Waits until the latch file is created.
+ *
+ * <p>When this method returns, subsequent invocations will not block even after the latch file
+ * is deleted. Note that this method may not return if the latch file is deleted before this
+ * method returns.
+ *
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public void await() throws InterruptedException {
+ if (isReleasedOrReleasable()) {
+ return;
+ }
+
+ awaitLatchFile(watchService);
+ }
+
+ private void awaitLatchFile(final WatchService watchService) throws InterruptedException {
+ while (true) {
+ WatchKey watchKey = watchService.take();
+ if (isReleasedOrReleasable()) {
+ break;
+ }
+ watchKey.reset();
+ }
+ }
+
+ private boolean isReleasedOrReleasable() {
+ if (released) {
+ return true;
+ }
+
+ if (Files.exists(latchFile)) {
+ releaseLatch();
+ return true;
+ }
+
+ return false;
+ }
+
+ private void releaseLatch() {
+ released = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ watchService.close();
+ }
+}
diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java
new file mode 100644
index 0000000..42d5cff
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests.util;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link FileBasedOneShotLatch}.
+ */
+public class FileBasedOneShotLatchTest {
+
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private FileBasedOneShotLatch latch;
+
+ private File latchFile;
+
+ @Before
+ public void setUp() {
+ latchFile = new File(temporaryFolder.getRoot(), "latchFile");
+ latch = new FileBasedOneShotLatch(latchFile.toPath());
+ }
+
+ @Test
+ public void awaitReturnsWhenFileIsCreated() throws Exception {
+ final AtomicBoolean awaitCompleted = new AtomicBoolean();
+ final Thread thread = new Thread(() -> {
+ try {
+ latch.await();
+ awaitCompleted.set(true);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ thread.start();
+
+ latchFile.createNewFile();
+ thread.join();
+
+ assertTrue(awaitCompleted.get());
+ }
+
+ @Test
+ public void subsequentAwaitDoesNotBlock() throws Exception {
+ latchFile.createNewFile();
+ latch.await();
+ latch.await();
+ }
+
+ @Test
+ public void subsequentAwaitDoesNotBlockEvenIfLatchFileIsDeleted() throws Exception {
+ latchFile.createNewFile();
+ latch.await();
+
+ latchFile.delete();
+ latch.await();
+ }
+
+ @Test
+ public void doesNotBlockIfFileExistsPriorToCreatingLatch() throws Exception {
+ latchFile.createNewFile();
+
+ final FileBasedOneShotLatch latch = new FileBasedOneShotLatch(latchFile.toPath());
+ latch.await();
+ }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 46bfde2..d676291 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -39,6 +39,7 @@ under the License.
<module>flink-parent-child-classloading-test-program</module>
<module>flink-parent-child-classloading-test-lib-package</module>
<module>flink-dataset-allround-test</module>
+ <module>flink-dataset-fine-grained-recovery-test</module>
<module>flink-datastream-allround-test</module>
<module>flink-stream-sql-test</module>
<module>flink-bucketing-sink-test</module>
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
index b619e16..b547142 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
@@ -20,7 +20,7 @@
source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/common_ha.sh
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-fine-grained-recovery-test/target/DataSetFineGrainedRecoveryTestProgram.jar
function ha_cleanup() {
# kill the cluster and zookeeper
@@ -32,30 +32,26 @@ on_exit ha_cleanup
function run_ha_test() {
local PARALLELISM=$1
- local JM_KILLS=3
+ local JM_KILLS=2
+ local TM_KILLS=2
- CLEARED=0
- mkdir -p ${TEST_DATA_DIR}/control
- touch ${TEST_DATA_DIR}/control/test.txt
+ local LATCH_FILE_PATH=$TEST_DATA_DIR/latchFile
- # start the cluster on HA mode
- start_ha_cluster
+ CLEARED=0
+ setup_and_start_cluster ${PARALLELISM}
echo "Running on HA mode: parallelism=${PARALLELISM}."
# submit a job in detached mode and let it run
local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
$TEST_PROGRAM_JAR \
- --loadFactor 4 \
+ --latchFilePath $LATCH_FILE_PATH \
--outputPath $TEST_DATA_DIR/out/dataset_allround \
- --source true \
| grep "Job has been submitted with JobID" | sed 's/.* //g')
wait_job_running ${JOB_ID}
- # start the watchdog that keeps the number of JMs stable
- start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081"
-
+ local c
for (( c=0; c<${JM_KILLS}; c++ )); do
# kill the JM and wait for watchdog to
# create a new one which will take over
@@ -63,10 +59,43 @@ function run_ha_test() {
wait_job_running ${JOB_ID}
done
- cancel_job ${JOB_ID}
+ for (( c=0; c<${TM_KILLS}; c++ )); do
+ kill_and_replace_random_task_manager
+ done
+
+ touch ${LATCH_FILE_PATH}
+
+ wait_job_terminal_state ${JOB_ID} "FINISHED"
+ check_result_hash "DataSet-FineGrainedRecovery-Test" $TEST_DATA_DIR/out/dataset_allround "ac3d26e1afce19aa657527f000acb88b"
+}
+
+function setup_and_start_cluster() {
+ local NUM_TASK_MANAGERS=$1
+
+ create_ha_config
+
+ set_config_key "jobmanager.execution.failover-strategy" "region"
+ set_config_key "jobmanager.scheduler.partition.force-release-on-consumption" "false"
+ set_config_key "taskmanager.numberOfTaskSlots" "1"
+
+ set_config_key "restart-strategy" "fixed-delay"
+ set_config_key "restart-strategy.fixed-delay.attempts" "2147483647"
+
+ set_config_key "heartbeat.interval" "2000"
+ set_config_key "heartbeat.timeout" "10000"
+
+ start_local_zk
+ start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081"
+ start_taskmanagers ${NUM_TASK_MANAGERS}
+}
+
+function kill_and_replace_random_task_manager() {
+ local NUM_TASK_MANAGERS=$(query_number_of_running_tms)
- # do not verify checkpoints in the logs
- verify_logs ${JM_KILLS} false
+ kill_random_taskmanager
+ wait_for_number_of_running_tms $(( ${NUM_TASK_MANAGERS} - 1 ))
+ start_taskmanagers 1
+ wait_for_number_of_running_tms ${NUM_TASK_MANAGERS}
}
run_ha_test 4