You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/17 08:06:58 UTC

[7/7] flink git commit: [FLINK-8910][e2e] Automated test for local recovery (including sticky allocation)

[FLINK-8910][e2e] Automated test for local recovery (including sticky allocation)

This closes #5676.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/489e4281
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/489e4281
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/489e4281

Branch: refs/heads/master
Commit: 489e42811157a9b2575f259f7cda2a2ee680d008
Parents: edece9c
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Mar 6 10:35:44 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:04 2018 +0200

----------------------------------------------------------------------
 .../pom.xml                                     |  96 ++++
 ...StickyAllocationAndLocalRecoveryTestJob.java | 478 +++++++++++++++++++
 .../src/main/resources/log4j-test.properties    |  27 ++
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh     |   8 +
 flink-end-to-end-tests/test-scripts/common.sh   |  43 +-
 .../test_local_recovery_and_scheduling.sh       | 121 +++++
 .../TaskExecutorLocalStateStoresManager.java    |   2 +-
 .../api/operators/BackendRestorerProcedure.java |   2 +
 9 files changed, 773 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
new file mode 100644
index 0000000..4b966e2
--- /dev/null
+++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.6-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-local-recovery-and-allocation-test</artifactId>
+	<name>flink-local-recovery-and-allocation-test</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.4</version>
+				<executions>
+					<!-- StickyAllocationAndLocalRecoveryTestJob -->
+					<execution>
+						<id>StickyAllocationAndLocalRecoveryTestJob</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<finalName>StickyAllocationAndLocalRecoveryTestJob</finalName>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.class</include>
+								<include>org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
new file mode 100644
index 0000000..b03791e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky allocation).
+ *
+ * <p>List of possible input parameters for this job:
+ * <ul>
+ * 	<li>checkpointDir: the checkpoint directory, required.</li>
+ * 	<li>parallelism: the parallelism of the job, default 1.</li>
+ *	<li>maxParallelism: the maximum parallelism of the job, default 1.</li>
+ * 	<li>checkpointInterval: the checkpointing interval in milliseconds, default 1000.</li>
+ * 	<li>restartDelay: the delay of the fixed delay restart strategy, default 0.</li>
+ * 	<li>externalizedCheckpoints: flag to activate externalized checkpoints, default <code>false</code>.</li>
+ * 	<li>stateBackend: choice for state backend between <code>file</code> and <code>rocks</code>, default <code>file</code>.</li>
+ * 	<li>killJvmOnFail: flag that determines whether or not an artificial failure induced by the test kills the JVM or not.</li>
+ * 	<li>asyncCheckpoints: flag for async checkpoints with file state backend, default <code>true</code>.</li>
+ * 	<li>incrementalCheckpoints: flag for incremental checkpoint with rocks state backend, default <code>false</code>.</li>
+ * 	<li>delay: sleep delay to throttle down the production of the source, default 0.</li>
+ * 	<li>maxAttempts: the maximum number of run attempts, before the job finishes with success, default 3.</li>
+ * 	<li>valueSize: size of the artificial value for each key in bytes, default 10.</li>
+ * </ul>
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool pt = ParameterTool.fromArgs(args);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setParallelism(pt.getInt("parallelism", 1));
+		env.setMaxParallelism(pt.getInt("maxParallelism", pt.getInt("parallelism", 1)));
+		env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, pt.getInt("restartDelay", 0)));
+		if (pt.getBoolean("externalizedCheckpoints", false)) {
+			env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+		}
+
+		String stateBackend = pt.get("stateBackend", "file");
+		String checkpointDir = pt.getRequired("checkpointDir");
+
+		boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+		if ("file".equals(stateBackend)) {
+			boolean asyncCheckpoints = pt.getBoolean("asyncCheckpoints", true);
+			env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
+		} else if ("rocks".equals(stateBackend)) {
+			boolean incrementalCheckpoints = pt.getBoolean("incrementalCheckpoints", false);
+			env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
+		} else {
+			throw new IllegalArgumentException("Unknown backend: " + stateBackend);
+		}
+
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(pt);
+
+		// delay to throttle down the production of the source
+		long delay = pt.getLong("delay", 0L);
+
+		// the maximum number of attempts, before the job finishes with success
+		int maxAttempts = pt.getInt("maxAttempts", 3);
+
+		// size of one artificial value
+		int valueSize = pt.getInt("valueSize", 10);
+
+		env.addSource(new RandomLongSource(maxAttempts, delay))
+			.keyBy((KeySelector<Long, Long>) aLong -> aLong)
+			.flatMap(new StateCreatingFlatMap(valueSize, killJvmOnFail))
+			.addSink(new PrintSinkFunction<>());
+
+		env.execute("Sticky Allocation And Local Recovery Test");
+	}
+
+	/**
+	 * Source function that produces a long sequence.
+	 */
+	private static final class RandomLongSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction {
+
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * Generator delay between two events.
+		 */
+		final long delay;
+
+		/**
+		 * Maximum restarts before shutting down this source.
+		 */
+		final int maxAttempts;
+
+		/**
+		 * State that holds the current key for recovery.
+		 */
+		transient ListState<Long> sourceCurrentKeyState;
+
+		/**
+		 * Generator's current key.
+		 */
+		long currentKey;
+
+		/**
+		 * Generator runs while this is true.
+		 */
+		volatile boolean running;
+
+		RandomLongSource(int maxAttempts, long delay) {
+			this.delay = delay;
+			this.maxAttempts = maxAttempts;
+			this.running = true;
+		}
+
+		@Override
+		public void run(SourceContext<Long> sourceContext) throws Exception {
+
+			int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+			int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+			// the source emits one final event and shuts down once we have reached max attempts.
+			if (getRuntimeContext().getAttemptNumber() > maxAttempts) {
+				synchronized (sourceContext.getCheckpointLock()) {
+					sourceContext.collect(Long.MAX_VALUE - subtaskIdx);
+				}
+				return;
+			}
+
+			while (running) {
+
+				synchronized (sourceContext.getCheckpointLock()) {
+					sourceContext.collect(currentKey);
+					currentKey += numberOfParallelSubtasks;
+				}
+
+				if (delay > 0) {
+					Thread.sleep(delay);
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+			sourceCurrentKeyState.clear();
+			sourceCurrentKeyState.add(currentKey);
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+
+			ListStateDescriptor<Long> currentKeyDescriptor = new ListStateDescriptor<>("currentKey", Long.class);
+			sourceCurrentKeyState = context.getOperatorStateStore().getListState(currentKeyDescriptor);
+
+			currentKey = getRuntimeContext().getIndexOfThisSubtask();
+			Iterable<Long> iterable = sourceCurrentKeyState.get();
+			if (iterable != null) {
+				Iterator<Long> iterator = iterable.iterator();
+				if (iterator.hasNext()) {
+					currentKey = iterator.next();
+					Preconditions.checkState(!iterator.hasNext());
+				}
+			}
+		}
+	}
+
+	/**
+	 * Stateful map function. Failure creation and checks happen here.
+	 */
+	private static final class StateCreatingFlatMap
+		extends RichFlatMapFunction<Long, String> implements CheckpointedFunction, CheckpointListener {
+
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * User configured size of the generated artificial values in the keyed state.
+		 */
+		final int valueSize;
+
+		/**
+		 * Holds the user configuration if the artificial test failure is killing the JVM.
+		 */
+		final boolean killTaskOnFailure;
+
+		/**
+		 * This state is used to create artificial keyed state in the backend.
+		 */
+		transient ValueState<String> valueState;
+
+		/**
+		 * This state is used to persist the schedulingAndFailureInfo to state.
+		 */
+		transient ListState<MapperSchedulingAndFailureInfo> schedulingAndFailureState;
+
+		/**
+		 * This contains the current scheduling and failure meta data.
+		 */
+		transient MapperSchedulingAndFailureInfo currentSchedulingAndFailureInfo;
+
+		/**
+		 * Message to indicate that recovery detected a failure with sticky allocation.
+		 */
+		transient volatile String allocationFailureMessage;
+
+		/**
+		 * If this flag is true, the next invocation of the map function introduces a test failure.
+		 */
+		transient volatile boolean failTask;
+
+		StateCreatingFlatMap(int valueSize, boolean killTaskOnFailure) {
+			this.valueSize = valueSize;
+			this.failTask = false;
+			this.killTaskOnFailure = killTaskOnFailure;
+			this.allocationFailureMessage = null;
+		}
+
+		@Override
+		public void flatMap(Long key, Collector<String> collector) throws IOException {
+
+			if (allocationFailureMessage != null) {
+				// Report the failure downstream, so that we can get the message from the output.
+				collector.collect(allocationFailureMessage);
+				allocationFailureMessage = null;
+			}
+
+			if (failTask) {
+				// we fail the task, either by killing the JVM hard, or by throwing a user code exception.
+				if (killTaskOnFailure) {
+					Runtime.getRuntime().halt(-1);
+				} else {
+					throw new RuntimeException("Artificial user code exception.");
+				}
+			}
+
+			// sanity check
+			if (null != valueState.value()) {
+				throw new IllegalStateException("This should never happen, keys are generated monotonously.");
+			}
+
+			// store artificial data to blow up the state
+			valueState.update(RandomStringUtils.random(valueSize, true, true));
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+			ValueStateDescriptor<String> stateDescriptor =
+				new ValueStateDescriptor<>("state", String.class);
+			valueState = functionInitializationContext.getKeyedStateStore().getState(stateDescriptor);
+
+			ListStateDescriptor<MapperSchedulingAndFailureInfo> mapperInfoStateDescriptor =
+				new ListStateDescriptor<>("mapperState", MapperSchedulingAndFailureInfo.class);
+			schedulingAndFailureState =
+				functionInitializationContext.getOperatorStateStore().getUnionListState(mapperInfoStateDescriptor);
+
+			StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
+			String allocationID = runtimeContext.getAllocationIDAsString();
+
+			final int thisJvmPid = getJvmPid();
+			final Set<Integer> killedJvmPids = new HashSet<>();
+
+			// here we check if the sticky scheduling worked as expected
+			if (functionInitializationContext.isRestored()) {
+				Iterable<MapperSchedulingAndFailureInfo> iterable = schedulingAndFailureState.get();
+				String taskNameWithSubtasks = runtimeContext.getTaskNameWithSubtasks();
+
+				MapperSchedulingAndFailureInfo infoForThisTask = null;
+				List<MapperSchedulingAndFailureInfo> completeInfo = new ArrayList<>();
+				if (iterable != null) {
+					for (MapperSchedulingAndFailureInfo testInfo : iterable) {
+
+						completeInfo.add(testInfo);
+
+						if (taskNameWithSubtasks.equals(testInfo.taskNameWithSubtask)) {
+							infoForThisTask = testInfo;
+						}
+
+						if (testInfo.killedJvm) {
+							killedJvmPids.add(testInfo.jvmPid);
+						}
+					}
+				}
+
+				Preconditions.checkNotNull(infoForThisTask, "Expected to find info here.");
+
+				if (!isScheduledToCorrectAllocation(infoForThisTask, allocationID, killedJvmPids)) {
+					allocationFailureMessage = String.format(
+						"Sticky allocation test failed: Subtask %s in attempt %d was rescheduled from allocation %s " +
+							"on JVM with PID %d to unexpected allocation %s on JVM with PID %d.\n" +
+							"Complete information from before the crash: %s.",
+						runtimeContext.getTaskNameWithSubtasks(),
+						runtimeContext.getAttemptNumber(),
+						infoForThisTask.allocationId,
+						infoForThisTask.jvmPid,
+						allocationID,
+						thisJvmPid,
+						completeInfo);
+				}
+			}
+
+			// We determine which of the subtasks will produce the artificial failure
+			boolean failingTask = shouldTaskFailForThisAttempt();
+
+			// We take note of all the meta info that we require to check sticky scheduling in the next re-attempt
+			this.currentSchedulingAndFailureInfo = new MapperSchedulingAndFailureInfo(
+				failingTask,
+				failingTask && killTaskOnFailure,
+				thisJvmPid,
+				runtimeContext.getTaskNameWithSubtasks(),
+				allocationID);
+
+			schedulingAndFailureState.clear();
+			schedulingAndFailureState.add(currentSchedulingAndFailureInfo);
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) {
+			// we can only fail the task after at least one checkpoint is completed to record progress.
+			failTask = currentSchedulingAndFailureInfo.failingTask;
+		}
+
+		private boolean shouldTaskFailForThisAttempt() {
+			RuntimeContext runtimeContext = getRuntimeContext();
+			int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+			int subtaskIdx = runtimeContext.getIndexOfThisSubtask();
+			int attempt = runtimeContext.getAttemptNumber();
+			return (attempt % numSubtasks) == subtaskIdx;
+		}
+
+		private boolean isScheduledToCorrectAllocation(
+			MapperSchedulingAndFailureInfo infoForThisTask,
+			String allocationID,
+			Set<Integer> killedJvmPids) {
+
+			return (infoForThisTask.allocationId.equals(allocationID)
+				|| killedJvmPids.contains(infoForThisTask.jvmPid));
+		}
+	}
+
+	/**
+	 * This code is copied from Stack Overflow.
+	 *
+	 * <p><a href="https://stackoverflow.com/questions/35842">https://stackoverflow.com/questions/35842</a>, answer
+	 * <a href="https://stackoverflow.com/a/12066696/9193881">https://stackoverflow.com/a/12066696/9193881</a>
+	 *
+	 * <p>Author: <a href="https://stackoverflow.com/users/446591/brad-mace">Brad Mace</a>)
+	 */
+	private static int getJvmPid() throws Exception {
+		java.lang.management.RuntimeMXBean runtime =
+			java.lang.management.ManagementFactory.getRuntimeMXBean();
+		java.lang.reflect.Field jvm = runtime.getClass().getDeclaredField("jvm");
+		jvm.setAccessible(true);
+		sun.management.VMManagement mgmt =
+			(sun.management.VMManagement) jvm.get(runtime);
+		java.lang.reflect.Method pidMethod =
+			mgmt.getClass().getDeclaredMethod("getProcessId");
+		pidMethod.setAccessible(true);
+
+		return (int) (Integer) pidMethod.invoke(mgmt);
+	}
+
+	/**
+	 * Records the information required to check sticky scheduling after a restart.
+	 */
+	public static class MapperSchedulingAndFailureInfo implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * True iff this task inflicts a test failure.
+		 */
+		final boolean failingTask;
+
+		/**
+		 * True iff this task kills its JVM.
+		 */
+		final boolean killedJvm;
+
+		/**
+		 * PID of the task JVM.
+		 */
+		final int jvmPid;
+
+		/**
+		 * Name and subtask index of the task.
+		 */
+		final String taskNameWithSubtask;
+
+		/**
+		 * The current allocation id of this task.
+		 */
+		final String allocationId;
+
+		MapperSchedulingAndFailureInfo(
+			boolean failingTask,
+			boolean killedJvm,
+			int jvmPid,
+			String taskNameWithSubtask,
+			String allocationId) {
+
+			this.failingTask = failingTask;
+			this.killedJvm = killedJvm;
+			this.jvmPid = jvmPid;
+			this.taskNameWithSubtask = taskNameWithSubtask;
+			this.allocationId = allocationId;
+		}
+
+		@Override
+		public String toString() {
+			return "MapperTestInfo{" +
+				"failingTask=" + failingTask +
+				", killedJvm=" + killedJvm +
+				", jvmPid=" + jvmPid +
+				", taskNameWithSubtask='" + taskNameWithSubtask + '\'' +
+				", allocationId='" + allocationId + '\'' +
+				'}';
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
new file mode 100644
index 0000000..37c65e9
--- /dev/null
+++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index dadb46f..367e120 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -43,6 +43,7 @@ under the License.
 		<module>flink-distributed-cache-via-blob-test</module>
 		<module>flink-high-parallelism-iterations-test</module>
 		<module>flink-stream-stateful-job-upgrade-test</module>
+		<module>flink-local-recovery-and-allocation-test</module>
 	</modules>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index d4309c1..bd91bb2 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -132,5 +132,13 @@ if [ $EXIT_CODE == 0 ]; then
   EXIT_CODE=$?
 fi
 
+if [ $EXIT_CODE == 0 ]; then
+  printf "\n==============================================================================\n"
+  printf "Running local recovery and sticky scheduling nightly end-to-end test\n"
+  printf "==============================================================================\n"
+  $END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh
+  EXIT_CODE=$?
+fi
+
 # Exit code for Travis build success/failure
 exit $EXIT_CODE

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index ec963c5..56a5d27 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -320,6 +320,39 @@ function s3_delete {
     https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
+# This function starts the given number of task managers and monitors their processes. If a task manager process goes
+# away a replacement is started.
+function tm_watchdog {
+  local expectedTm=$1
+  while true;
+  do
+    runningTm=`jps | grep -Eo 'TaskManagerRunner|TaskManager' | wc -l`;
+    count=$((expectedTm-runningTm))
+    for (( c=0; c<count; c++ ))
+    do
+      $FLINK_DIR/bin/taskmanager.sh start > /dev/null
+    done
+    sleep 5;
+  done
+}
+
+# Kills all job manager.
+function jm_kill_all {
+  kill_all 'StandaloneSessionClusterEntrypoint'
+}
+
+# Kills all task manager.
+function tm_kill_all {
+  kill_all 'TaskManagerRunner|TaskManager'
+}
+
+# Kills all processes that match the given name.
+function kill_all {
+  local pid=`jps | grep -E "${1}" | cut -d " " -f 1`
+  kill ${pid} 2> /dev/null
+  wait ${pid} 2> /dev/null
+}
+
 function kill_random_taskmanager {
   KILL_TM=$(jps | grep "TaskManager" | sort -R | head -n 1 | awk '{print $1}')
   kill -9 "$KILL_TM"
@@ -432,12 +465,14 @@ function run_test {
     return "${exit_code}"
 }
 
-# make sure to clean up even in case of failures
+# Shuts down the cluster and cleans up all temporary folders and files. Make sure to clean up even in case of failures.
 function cleanup {
   stop_cluster
-  check_all_pass
-  rm -rf $TEST_DATA_DIR
-  rm -f $FLINK_DIR/log/*
+  tm_kill_all
+  jm_kill_all
+  rm -rf $TEST_DATA_DIR 2> /dev/null
   revert_default_config
+  check_all_pass
+  rm -rf $FLINK_DIR/log/* 2> /dev/null
 }
 trap cleanup EXIT

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
new file mode 100755
index 0000000..98ef01f
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+# This function checks the logs for entries that indicate problems with local recovery
+function check_logs {
+    local parallelism=$1
+    local attempts=$2
+    (( expected_count=parallelism * (attempts + 1) ))
+
+    # Search for the log message that indicates restore problem from existing local state for the keyed backend.
+    local failed_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+    # Search for attempts to recover locally.
+    local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+    if [ ${failed_local_recovery} -ne 0 ]
+    then
+        PASS=""
+        echo "FAILURE: Found ${failed_local_recovery} failed attempt(s) for local recovery of correctly scheduled task(s)."
+    fi
+
+    if [ ${attempt_local_recovery} -eq 0 ]
+    then
+        PASS=""
+        echo "FAILURE: Found no attempt for local recovery. Configuration problem?"
+    fi
+}
+
+# This function does a cleanup after the test. The configuration is restored, the watchdog is terminated and temporary
+# files and folders are deleted.
+function cleanup_after_test {
+    # Reset the configurations
+    sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' "$FLINK_DIR/conf/log4j.properties"
+    #
+    kill ${watchdog_pid} 2> /dev/null
+    wait ${watchdog_pid} 2> /dev/null
+    #
+    cleanup
+}
+
+# Calls the cleanup step for this tests and exits with an error.
+function cleanup_after_test_and_exit_fail {
+    cleanup_after_test
+    exit 1
+}
+
+## This function executes one run for a certain configuration
+function run_local_recovery_test {
+    local parallelism=$1
+    local max_attempts=$2
+    local backend=$3
+    local incremental=$4
+    local kill_jvm=$5
+
+    echo "Running local recovery test on ${backend} backend: incremental checkpoints = ${incremental}, kill JVM = ${kill_jvm}."
+    TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar
+
+    # Backup conf and configure for HA
+    backup_config
+    create_ha_config
+
+    # Enable debug logging
+    sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' "$FLINK_DIR/conf/log4j.properties"
+
+    # Enable local recovery
+    set_conf "state.backend.local-recovery" "true"
+    # Ensure that each TM only has one operator(chain)
+    set_conf "taskmanager.numberOfTaskSlots" "1"
+
+    rm $FLINK_DIR/log/* 2> /dev/null
+
+    # Start HA server
+    start_local_zk
+    start_cluster
+
+    tm_watchdog ${parallelism} &
+    watchdog_pid=$!
+
+    echo "Started TM watchdog with PID ${watchdog_pid}."
+
+    $FLINK_DIR/bin/flink run -c org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob \
+    -p ${parallelism} $TEST_PROGRAM_JAR \
+    -D state.backend.local-recovery=ENABLE_FILE_BASED \
+    --checkpointDir file://$TEST_DATA_DIR/local_recovery_test/checkpoints \
+    --output $TEST_DATA_DIR/out/local_recovery_test/out --killJvmOnFail ${kill_jvm} --checkpointInterval 1000 \
+    --maxAttempts ${max_attempts} --parallelism ${parallelism} --stateBackend ${backend} \
+    --incrementalCheckpoints ${incremental}
+
+    check_logs ${parallelism} ${max_attempts}
+    cleanup_after_test
+}
+
+## MAIN
+trap cleanup_after_test_and_exit_fail EXIT
+run_local_recovery_test 4 3 "file" "false" "false"
+run_local_recovery_test 4 3 "file" "false" "true"
+run_local_recovery_test 4 10 "rocks" "false" "false"
+run_local_recovery_test 4 10 "rocks" "true" "false"
+run_local_recovery_test 4 10 "rocks" "false" "true"
+run_local_recovery_test 4 10 "rocks" "true" "true"
+trap - EXIT
+exit 0

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 6826fbd..095dc86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -142,7 +142,7 @@ public class TaskExecutorLocalStateStoresManager {
 				LocalRecoveryConfig localRecoveryConfig =
 					new LocalRecoveryConfig(localRecoveryEnabled, directoryProvider);
 
-				taskLocalStateStore = (localRecoveryMode != LocalRecoveryConfig.LocalRecoveryMode.DISABLED) ?
+				taskLocalStateStore = localRecoveryConfig.isLocalRecoveryEnabled() ?
 
 						// Real store implementation if local recovery is enabled
 						new TaskLocalStateStoreImpl(

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
index 0f5b0e0..29746dc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
@@ -105,6 +105,8 @@ public class BackendRestorerProcedure<
 
 			++alternativeIdx;
 
+			// IMPORTANT: please be careful when modifying the log statements because they are used for validation in
+			// the automatic end-to-end tests. Those tests might fail if they are not aligned with the log message!
 			if (restoreState.isEmpty()) {
 				LOG.debug("Creating {} with empty state.", logDescription);
 			} else {