You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/17 08:06:58 UTC
[7/7] flink git commit: [FLINK-8910][e2e] Automated test for local
recovery (including sticky allocation)
[FLINK-8910][e2e] Automated test for local recovery (including sticky allocation)
This closes #5676.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/489e4281
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/489e4281
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/489e4281
Branch: refs/heads/master
Commit: 489e42811157a9b2575f259f7cda2a2ee680d008
Parents: edece9c
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Mar 6 10:35:44 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:04 2018 +0200
----------------------------------------------------------------------
.../pom.xml | 96 ++++
...StickyAllocationAndLocalRecoveryTestJob.java | 478 +++++++++++++++++++
.../src/main/resources/log4j-test.properties | 27 ++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/run-nightly-tests.sh | 8 +
flink-end-to-end-tests/test-scripts/common.sh | 43 +-
.../test_local_recovery_and_scheduling.sh | 121 +++++
.../TaskExecutorLocalStateStoresManager.java | 2 +-
.../api/operators/BackendRestorerProcedure.java | 2 +
9 files changed, 773 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
new file mode 100644
index 0000000..4b966e2
--- /dev/null
+++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.6-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-local-recovery-and-allocation-test</artifactId>
+ <name>flink-local-recovery-and-allocation-test</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <!-- StickyAllocationAndLocalRecoveryTestJob -->
+ <execution>
+ <id>StickyAllocationAndLocalRecoveryTestJob</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <finalName>StickyAllocationAndLocalRecoveryTestJob</finalName>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.class</include>
+ <include>org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob$*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
new file mode 100644
index 0000000..b03791e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky allocation).
+ *
+ * <p>List of possible input parameters for this job:
+ * <ul>
+ * <li>checkpointDir: the checkpoint directory, required.</li>
+ * <li>parallelism: the parallelism of the job, default 1.</li>
+ * <li>maxParallelism: the maximum parallelism of the job, default 1.</li>
+ * <li>checkpointInterval: the checkpointing interval in milliseconds, default 1000.</li>
+ * <li>restartDelay: the delay of the fixed delay restart strategy, default 0.</li>
+ * <li>externalizedCheckpoints: flag to activate externalized checkpoints, default <code>false</code>.</li>
+ * <li>stateBackend: choice for state backend between <code>file</code> and <code>rocks</code>, default <code>file</code>.</li>
+ * <li>killJvmOnFail: flag that determines whether or not an artificial failure induced by the test kills the JVM or not.</li>
+ * <li>asyncCheckpoints: flag for async checkpoints with file state backend, default <code>true</code>.</li>
+ * <li>incrementalCheckpoints: flag for incremental checkpoint with rocks state backend, default <code>false</code>.</li>
+ * <li>delay: sleep delay to throttle down the production of the source, default 0.</li>
+ * <li>maxAttempts: the maximum number of run attempts, before the job finishes with success, default 3.</li>
+ * <li>valueSize: size of the artificial value for each key in bytes, default 10.</li>
+ * </ul>
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+ public static void main(String[] args) throws Exception {
+
+ final ParameterTool pt = ParameterTool.fromArgs(args);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setParallelism(pt.getInt("parallelism", 1));
+ env.setMaxParallelism(pt.getInt("maxParallelism", pt.getInt("parallelism", 1)));
+ env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, pt.getInt("restartDelay", 0)));
+ if (pt.getBoolean("externalizedCheckpoints", false)) {
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ }
+
+ String stateBackend = pt.get("stateBackend", "file");
+ String checkpointDir = pt.getRequired("checkpointDir");
+
+ boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+ if ("file".equals(stateBackend)) {
+ boolean asyncCheckpoints = pt.getBoolean("asyncCheckpoints", true);
+ env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
+ } else if ("rocks".equals(stateBackend)) {
+ boolean incrementalCheckpoints = pt.getBoolean("incrementalCheckpoints", false);
+ env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
+ } else {
+ throw new IllegalArgumentException("Unknown backend: " + stateBackend);
+ }
+
+ // make parameters available in the web interface
+ env.getConfig().setGlobalJobParameters(pt);
+
+ // delay to throttle down the production of the source
+ long delay = pt.getLong("delay", 0L);
+
+ // the maximum number of attempts, before the job finishes with success
+ int maxAttempts = pt.getInt("maxAttempts", 3);
+
+ // size of one artificial value
+ int valueSize = pt.getInt("valueSize", 10);
+
+ env.addSource(new RandomLongSource(maxAttempts, delay))
+ .keyBy((KeySelector<Long, Long>) aLong -> aLong)
+ .flatMap(new StateCreatingFlatMap(valueSize, killJvmOnFail))
+ .addSink(new PrintSinkFunction<>());
+
+ env.execute("Sticky Allocation And Local Recovery Test");
+ }
+
+ /**
+ * Source function that produces a long sequence.
+ */
+ private static final class RandomLongSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Generator delay between two events.
+ */
+ final long delay;
+
+ /**
+ * Maximum restarts before shutting down this source.
+ */
+ final int maxAttempts;
+
+ /**
+ * State that holds the current key for recovery.
+ */
+ transient ListState<Long> sourceCurrentKeyState;
+
+ /**
+ * Generator's current key.
+ */
+ long currentKey;
+
+ /**
+ * Generator runs while this is true.
+ */
+ volatile boolean running;
+
+ RandomLongSource(int maxAttempts, long delay) {
+ this.delay = delay;
+ this.maxAttempts = maxAttempts;
+ this.running = true;
+ }
+
+ @Override
+ public void run(SourceContext<Long> sourceContext) throws Exception {
+
+ int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+ // the source emits one final event and shuts down once we have reached max attempts.
+ if (getRuntimeContext().getAttemptNumber() > maxAttempts) {
+ synchronized (sourceContext.getCheckpointLock()) {
+ sourceContext.collect(Long.MAX_VALUE - subtaskIdx);
+ }
+ return;
+ }
+
+ while (running) {
+
+ synchronized (sourceContext.getCheckpointLock()) {
+ sourceContext.collect(currentKey);
+ currentKey += numberOfParallelSubtasks;
+ }
+
+ if (delay > 0) {
+ Thread.sleep(delay);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ sourceCurrentKeyState.clear();
+ sourceCurrentKeyState.add(currentKey);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+
+ ListStateDescriptor<Long> currentKeyDescriptor = new ListStateDescriptor<>("currentKey", Long.class);
+ sourceCurrentKeyState = context.getOperatorStateStore().getListState(currentKeyDescriptor);
+
+ currentKey = getRuntimeContext().getIndexOfThisSubtask();
+ Iterable<Long> iterable = sourceCurrentKeyState.get();
+ if (iterable != null) {
+ Iterator<Long> iterator = iterable.iterator();
+ if (iterator.hasNext()) {
+ currentKey = iterator.next();
+ Preconditions.checkState(!iterator.hasNext());
+ }
+ }
+ }
+ }
+
+ /**
+ * Stateful map function. Failure creation and checks happen here.
+ */
+ private static final class StateCreatingFlatMap
+ extends RichFlatMapFunction<Long, String> implements CheckpointedFunction, CheckpointListener {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * User configured size of the generated artificial values in the keyed state.
+ */
+ final int valueSize;
+
+ /**
+ * Holds the user configuration if the artificial test failure is killing the JVM.
+ */
+ final boolean killTaskOnFailure;
+
+ /**
+ * This state is used to create artificial keyed state in the backend.
+ */
+ transient ValueState<String> valueState;
+
+ /**
+ * This state is used to persist the schedulingAndFailureInfo to state.
+ */
+ transient ListState<MapperSchedulingAndFailureInfo> schedulingAndFailureState;
+
+ /**
+ * This contains the current scheduling and failure meta data.
+ */
+ transient MapperSchedulingAndFailureInfo currentSchedulingAndFailureInfo;
+
+ /**
+ * Message to indicate that recovery detected a failure with sticky allocation.
+ */
+ transient volatile String allocationFailureMessage;
+
+ /**
+ * If this flag is true, the next invocation of the map function introduces a test failure.
+ */
+ transient volatile boolean failTask;
+
+ StateCreatingFlatMap(int valueSize, boolean killTaskOnFailure) {
+ this.valueSize = valueSize;
+ this.failTask = false;
+ this.killTaskOnFailure = killTaskOnFailure;
+ this.allocationFailureMessage = null;
+ }
+
+ @Override
+ public void flatMap(Long key, Collector<String> collector) throws IOException {
+
+ if (allocationFailureMessage != null) {
+ // Report the failure downstream, so that we can get the message from the output.
+ collector.collect(allocationFailureMessage);
+ allocationFailureMessage = null;
+ }
+
+ if (failTask) {
+ // we fail the task, either by killing the JVM hard, or by throwing a user code exception.
+ if (killTaskOnFailure) {
+ Runtime.getRuntime().halt(-1);
+ } else {
+ throw new RuntimeException("Artificial user code exception.");
+ }
+ }
+
+ // sanity check
+ if (null != valueState.value()) {
+ throw new IllegalStateException("This should never happen, keys are generated monotonously.");
+ }
+
+ // store artificial data to blow up the state
+ valueState.update(RandomStringUtils.random(valueSize, true, true));
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+ ValueStateDescriptor<String> stateDescriptor =
+ new ValueStateDescriptor<>("state", String.class);
+ valueState = functionInitializationContext.getKeyedStateStore().getState(stateDescriptor);
+
+ ListStateDescriptor<MapperSchedulingAndFailureInfo> mapperInfoStateDescriptor =
+ new ListStateDescriptor<>("mapperState", MapperSchedulingAndFailureInfo.class);
+ schedulingAndFailureState =
+ functionInitializationContext.getOperatorStateStore().getUnionListState(mapperInfoStateDescriptor);
+
+ StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
+ String allocationID = runtimeContext.getAllocationIDAsString();
+
+ final int thisJvmPid = getJvmPid();
+ final Set<Integer> killedJvmPids = new HashSet<>();
+
+ // here we check if the sticky scheduling worked as expected
+ if (functionInitializationContext.isRestored()) {
+ Iterable<MapperSchedulingAndFailureInfo> iterable = schedulingAndFailureState.get();
+ String taskNameWithSubtasks = runtimeContext.getTaskNameWithSubtasks();
+
+ MapperSchedulingAndFailureInfo infoForThisTask = null;
+ List<MapperSchedulingAndFailureInfo> completeInfo = new ArrayList<>();
+ if (iterable != null) {
+ for (MapperSchedulingAndFailureInfo testInfo : iterable) {
+
+ completeInfo.add(testInfo);
+
+ if (taskNameWithSubtasks.equals(testInfo.taskNameWithSubtask)) {
+ infoForThisTask = testInfo;
+ }
+
+ if (testInfo.killedJvm) {
+ killedJvmPids.add(testInfo.jvmPid);
+ }
+ }
+ }
+
+ Preconditions.checkNotNull(infoForThisTask, "Expected to find info here.");
+
+ if (!isScheduledToCorrectAllocation(infoForThisTask, allocationID, killedJvmPids)) {
+ allocationFailureMessage = String.format(
+ "Sticky allocation test failed: Subtask %s in attempt %d was rescheduled from allocation %s " +
+ "on JVM with PID %d to unexpected allocation %s on JVM with PID %d.\n" +
+ "Complete information from before the crash: %s.",
+ runtimeContext.getTaskNameWithSubtasks(),
+ runtimeContext.getAttemptNumber(),
+ infoForThisTask.allocationId,
+ infoForThisTask.jvmPid,
+ allocationID,
+ thisJvmPid,
+ completeInfo);
+ }
+ }
+
+ // We determine which of the subtasks will produce the artificial failure
+ boolean failingTask = shouldTaskFailForThisAttempt();
+
+ // We take note of all the meta info that we require to check sticky scheduling in the next re-attempt
+ this.currentSchedulingAndFailureInfo = new MapperSchedulingAndFailureInfo(
+ failingTask,
+ failingTask && killTaskOnFailure,
+ thisJvmPid,
+ runtimeContext.getTaskNameWithSubtasks(),
+ allocationID);
+
+ schedulingAndFailureState.clear();
+ schedulingAndFailureState.add(currentSchedulingAndFailureInfo);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // we can only fail the task after at least one checkpoint is completed to record progress.
+ failTask = currentSchedulingAndFailureInfo.failingTask;
+ }
+
+ private boolean shouldTaskFailForThisAttempt() {
+ RuntimeContext runtimeContext = getRuntimeContext();
+ int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+ int subtaskIdx = runtimeContext.getIndexOfThisSubtask();
+ int attempt = runtimeContext.getAttemptNumber();
+ return (attempt % numSubtasks) == subtaskIdx;
+ }
+
+ private boolean isScheduledToCorrectAllocation(
+ MapperSchedulingAndFailureInfo infoForThisTask,
+ String allocationID,
+ Set<Integer> killedJvmPids) {
+
+ return (infoForThisTask.allocationId.equals(allocationID)
+ || killedJvmPids.contains(infoForThisTask.jvmPid));
+ }
+ }
+
+ /**
+ * This code is copied from Stack Overflow.
+ *
+ * <p><a href="https://stackoverflow.com/questions/35842">https://stackoverflow.com/questions/35842</a>, answer
+ * <a href="https://stackoverflow.com/a/12066696/9193881">https://stackoverflow.com/a/12066696/9193881</a>
+ *
+ * <p>Author: <a href="https://stackoverflow.com/users/446591/brad-mace">Brad Mace</a>)
+ */
+ private static int getJvmPid() throws Exception {
+ java.lang.management.RuntimeMXBean runtime =
+ java.lang.management.ManagementFactory.getRuntimeMXBean();
+ java.lang.reflect.Field jvm = runtime.getClass().getDeclaredField("jvm");
+ jvm.setAccessible(true);
+ sun.management.VMManagement mgmt =
+ (sun.management.VMManagement) jvm.get(runtime);
+ java.lang.reflect.Method pidMethod =
+ mgmt.getClass().getDeclaredMethod("getProcessId");
+ pidMethod.setAccessible(true);
+
+ return (int) (Integer) pidMethod.invoke(mgmt);
+ }
+
+ /**
+ * Records the information required to check sticky scheduling after a restart.
+ */
+ public static class MapperSchedulingAndFailureInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * True iff this task inflicts a test failure.
+ */
+ final boolean failingTask;
+
+ /**
+ * True iff this task kills its JVM.
+ */
+ final boolean killedJvm;
+
+ /**
+ * PID of the task JVM.
+ */
+ final int jvmPid;
+
+ /**
+ * Name and subtask index of the task.
+ */
+ final String taskNameWithSubtask;
+
+ /**
+ * The current allocation id of this task.
+ */
+ final String allocationId;
+
+ MapperSchedulingAndFailureInfo(
+ boolean failingTask,
+ boolean killedJvm,
+ int jvmPid,
+ String taskNameWithSubtask,
+ String allocationId) {
+
+ this.failingTask = failingTask;
+ this.killedJvm = killedJvm;
+ this.jvmPid = jvmPid;
+ this.taskNameWithSubtask = taskNameWithSubtask;
+ this.allocationId = allocationId;
+ }
+
+ @Override
+ public String toString() {
+ return "MapperTestInfo{" +
+ "failingTask=" + failingTask +
+ ", killedJvm=" + killedJvm +
+ ", jvmPid=" + jvmPid +
+ ", taskNameWithSubtask='" + taskNameWithSubtask + '\'' +
+ ", allocationId='" + allocationId + '\'' +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
new file mode 100644
index 0000000..37c65e9
--- /dev/null
+++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index dadb46f..367e120 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -43,6 +43,7 @@ under the License.
<module>flink-distributed-cache-via-blob-test</module>
<module>flink-high-parallelism-iterations-test</module>
<module>flink-stream-stateful-job-upgrade-test</module>
+ <module>flink-local-recovery-and-allocation-test</module>
</modules>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index d4309c1..bd91bb2 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -132,5 +132,13 @@ if [ $EXIT_CODE == 0 ]; then
EXIT_CODE=$?
fi
+if [ $EXIT_CODE == 0 ]; then
+ printf "\n==============================================================================\n"
+ printf "Running local recovery and sticky scheduling nightly end-to-end test\n"
+ printf "==============================================================================\n"
+ $END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh
+ EXIT_CODE=$?
+fi
+
# Exit code for Travis build success/failure
exit $EXIT_CODE
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index ec963c5..56a5d27 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -320,6 +320,39 @@ function s3_delete {
https://${bucket}.s3.amazonaws.com/${s3_file}
}
+# This function starts the given number of task managers and monitors their processes. If a task manager process goes
+# away a replacement is started.
+function tm_watchdog {
+ local expectedTm=$1
+ while true;
+ do
+ runningTm=`jps | grep -Eo 'TaskManagerRunner|TaskManager' | wc -l`;
+ count=$((expectedTm-runningTm))
+ for (( c=0; c<count; c++ ))
+ do
+ $FLINK_DIR/bin/taskmanager.sh start > /dev/null
+ done
+ sleep 5;
+ done
+}
+
+# Kills all job manager.
+function jm_kill_all {
+ kill_all 'StandaloneSessionClusterEntrypoint'
+}
+
+# Kills all task manager.
+function tm_kill_all {
+ kill_all 'TaskManagerRunner|TaskManager'
+}
+
+# Kills all processes that match the given name.
+function kill_all {
+ local pid=`jps | grep -E "${1}" | cut -d " " -f 1`
+ kill ${pid} 2> /dev/null
+ wait ${pid} 2> /dev/null
+}
+
function kill_random_taskmanager {
KILL_TM=$(jps | grep "TaskManager" | sort -R | head -n 1 | awk '{print $1}')
kill -9 "$KILL_TM"
@@ -432,12 +465,14 @@ function run_test {
return "${exit_code}"
}
-# make sure to clean up even in case of failures
+# Shuts down the cluster and cleans up all temporary folders and files. Make sure to clean up even in case of failures.
function cleanup {
stop_cluster
- check_all_pass
- rm -rf $TEST_DATA_DIR
- rm -f $FLINK_DIR/log/*
+ tm_kill_all
+ jm_kill_all
+ rm -rf $TEST_DATA_DIR 2> /dev/null
revert_default_config
+ check_all_pass
+ rm -rf $FLINK_DIR/log/* 2> /dev/null
}
trap cleanup EXIT
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
new file mode 100755
index 0000000..98ef01f
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+# This function checks the logs for entries that indicate problems with local recovery
+function check_logs {
+ local parallelism=$1
+ local attempts=$2
+ (( expected_count=parallelism * (attempts + 1) ))
+
+ # Search for the log message that indicates restore problem from existing local state for the keyed backend.
+ local failed_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+ # Search for attempts to recover locally.
+ local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+ if [ ${failed_local_recovery} -ne 0 ]
+ then
+ PASS=""
+ echo "FAILURE: Found ${failed_local_recovery} failed attempt(s) for local recovery of correctly scheduled task(s)."
+ fi
+
+ if [ ${attempt_local_recovery} -eq 0 ]
+ then
+ PASS=""
+ echo "FAILURE: Found no attempt for local recovery. Configuration problem?"
+ fi
+}
+
+# This function does a cleanup after the test. The configuration is restored, the watchdog is terminated and temporary
+# files and folders are deleted.
+function cleanup_after_test {
+ # Reset the configurations
+ sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' "$FLINK_DIR/conf/log4j.properties"
+ #
+ kill ${watchdog_pid} 2> /dev/null
+ wait ${watchdog_pid} 2> /dev/null
+ #
+ cleanup
+}
+
+# Calls the cleanup step for this tests and exits with an error.
+function cleanup_after_test_and_exit_fail {
+ cleanup_after_test
+ exit 1
+}
+
+## This function executes one run for a certain configuration
+function run_local_recovery_test {
+ local parallelism=$1
+ local max_attempts=$2
+ local backend=$3
+ local incremental=$4
+ local kill_jvm=$5
+
+ echo "Running local recovery test on ${backend} backend: incremental checkpoints = ${incremental}, kill JVM = ${kill_jvm}."
+ TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar
+
+ # Backup conf and configure for HA
+ backup_config
+ create_ha_config
+
+ # Enable debug logging
+ sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' "$FLINK_DIR/conf/log4j.properties"
+
+ # Enable local recovery
+ set_conf "state.backend.local-recovery" "true"
+ # Ensure that each TM only has one operator(chain)
+ set_conf "taskmanager.numberOfTaskSlots" "1"
+
+ rm $FLINK_DIR/log/* 2> /dev/null
+
+ # Start HA server
+ start_local_zk
+ start_cluster
+
+ tm_watchdog ${parallelism} &
+ watchdog_pid=$!
+
+ echo "Started TM watchdog with PID ${watchdog_pid}."
+
+ $FLINK_DIR/bin/flink run -c org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob \
+ -p ${parallelism} $TEST_PROGRAM_JAR \
+ -D state.backend.local-recovery=ENABLE_FILE_BASED \
+ --checkpointDir file://$TEST_DATA_DIR/local_recovery_test/checkpoints \
+ --output $TEST_DATA_DIR/out/local_recovery_test/out --killJvmOnFail ${kill_jvm} --checkpointInterval 1000 \
+ --maxAttempts ${max_attempts} --parallelism ${parallelism} --stateBackend ${backend} \
+ --incrementalCheckpoints ${incremental}
+
+ check_logs ${parallelism} ${max_attempts}
+ cleanup_after_test
+}
+
+## MAIN
+trap cleanup_after_test_and_exit_fail EXIT
+run_local_recovery_test 4 3 "file" "false" "false"
+run_local_recovery_test 4 3 "file" "false" "true"
+run_local_recovery_test 4 10 "rocks" "false" "false"
+run_local_recovery_test 4 10 "rocks" "true" "false"
+run_local_recovery_test 4 10 "rocks" "false" "true"
+run_local_recovery_test 4 10 "rocks" "true" "true"
+trap - EXIT
+exit 0
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 6826fbd..095dc86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -142,7 +142,7 @@ public class TaskExecutorLocalStateStoresManager {
LocalRecoveryConfig localRecoveryConfig =
new LocalRecoveryConfig(localRecoveryEnabled, directoryProvider);
- taskLocalStateStore = (localRecoveryMode != LocalRecoveryConfig.LocalRecoveryMode.DISABLED) ?
+ taskLocalStateStore = localRecoveryConfig.isLocalRecoveryEnabled() ?
// Real store implementation if local recovery is enabled
new TaskLocalStateStoreImpl(
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
index 0f5b0e0..29746dc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
@@ -105,6 +105,8 @@ public class BackendRestorerProcedure<
++alternativeIdx;
+ // IMPORTANT: please be careful when modifying the log statements because they are used for validation in
+ // the automatic end-to-end tests. Those tests might fail if they are not aligned with the log message!
if (restoreState.isEmpty()) {
LOG.debug("Creating {} with empty state.", logDescription);
} else {