You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/17 08:06:52 UTC
[1/7] flink git commit: [hotfix] Introduce
NoOpTaskLocalStateStoreImpl that is used as store if local recovery is
disabled
Repository: flink
Updated Branches:
refs/heads/master 0deaa3b1b -> 489e42811
[hotfix] Introduce NoOpTaskLocalStateStoreImpl that is used as store if local recovery is disabled
This implementation will no go through all the registration/lookup steps or a normal state store, beause
they are not required if local recovery is disabled.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bc1eaa4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bc1eaa4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bc1eaa4
Branch: refs/heads/master
Commit: 2bc1eaa4118d81bc00d7ebde1a435507d2cffb2a
Parents: b17be26
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Mar 13 10:49:33 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:03 2018 +0200
----------------------------------------------------------------------
.../state/NoOpTaskLocalStateStoreImpl.java | 71 ++++++++++++++++++++
.../runtime/state/OwnedTaskLocalStateStore.java | 38 +++++++++++
.../TaskExecutorLocalStateStoresManager.java | 36 +++++-----
.../runtime/state/TaskLocalStateStore.java | 2 +
.../runtime/state/TaskLocalStateStoreImpl.java | 3 +-
5 files changed, 134 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2bc1eaa4/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java
new file mode 100644
index 0000000..11841a1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.LongPredicate;
+
+/**
+ * This class implements a {@link TaskLocalStateStore} with no functionality and is used when local recovery is
+ * disabled.
+ */
+public final class NoOpTaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
+
+ /** The configuration for local recovery. */
+ @Nonnull
+ private final LocalRecoveryConfig localRecoveryConfig;
+
+ NoOpTaskLocalStateStoreImpl(@Nonnull LocalRecoveryConfig localRecoveryConfig) {
+ this.localRecoveryConfig = localRecoveryConfig;
+ }
+
+ @Nonnull
+ @Override
+ public LocalRecoveryConfig getLocalRecoveryConfig() {
+ return localRecoveryConfig;
+ }
+
+ @Override
+ public CompletableFuture<Void> dispose() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot localState) {
+ }
+
+ @Nullable
+ @Override
+ public TaskStateSnapshot retrieveLocalState(long checkpointID) {
+ return null;
+ }
+
+ @Override
+ public void confirmCheckpoint(long confirmedCheckpointId) {
+ }
+
+ @Override
+ public void pruneMatchingCheckpoints(LongPredicate matcher) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2bc1eaa4/flink-runtime/src/main/java/org/apache/flink/runtime/state/OwnedTaskLocalStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OwnedTaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OwnedTaskLocalStateStore.java
new file mode 100644
index 0000000..b73626c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OwnedTaskLocalStateStore.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This interface represents the administrative interface to {@link TaskLocalStateStore}, that only the owner of the
+ * object should see. All clients that want to use the service should only see the {@link TaskLocalStateStore}
+ * interface.
+ */
+@Internal
+public interface OwnedTaskLocalStateStore extends TaskLocalStateStore {
+
+ /**
+ * Disposes the task local state store. Disposal can happen asynchronously and completion is signaled through the
+ * returned future.
+ */
+ CompletableFuture<Void> dispose();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2bc1eaa4/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 4919f80..cb3b680 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -51,7 +51,7 @@ public class TaskExecutorLocalStateStoresManager {
* this. Maps from allocation id to all the subtask's local state stores.
*/
@GuardedBy("lock")
- private final Map<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> taskStateStoresByAllocationID;
+ private final Map<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> taskStateStoresByAllocationID;
/** The configured mode for local recovery on this task manager. */
private final boolean localRecoveryEnabled;
@@ -111,7 +111,7 @@ public class TaskExecutorLocalStateStoresManager {
"register a new TaskLocalStateStore.");
}
- Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> taskStateManagers =
+ Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore> taskStateManagers =
this.taskStateStoresByAllocationID.get(allocationID);
if (taskStateManagers == null) {
@@ -126,7 +126,7 @@ public class TaskExecutorLocalStateStoresManager {
final JobVertexSubtaskKey taskKey = new JobVertexSubtaskKey(jobVertexID, subtaskIndex);
- TaskLocalStateStoreImpl taskLocalStateStore = taskStateManagers.get(taskKey);
+ OwnedTaskLocalStateStore taskLocalStateStore = taskStateManagers.get(taskKey);
if (taskLocalStateStore == null) {
@@ -142,13 +142,19 @@ public class TaskExecutorLocalStateStoresManager {
LocalRecoveryConfig localRecoveryConfig =
new LocalRecoveryConfig(localRecoveryEnabled, directoryProvider);
- taskLocalStateStore = new TaskLocalStateStoreImpl(
- jobId,
- allocationID,
- jobVertexID,
- subtaskIndex,
- localRecoveryConfig,
- discardExecutor);
+ taskLocalStateStore = (localRecoveryMode != LocalRecoveryConfig.LocalRecoveryMode.DISABLED) ?
+
+ // Real store implementation if local recovery is enabled
+ new TaskLocalStateStoreImpl(
+ jobId,
+ allocationID,
+ jobVertexID,
+ subtaskIndex,
+ localRecoveryConfig,
+ discardExecutor) :
+
+ // NOP implementation if local recovery is disabled
+ new NoOpTaskLocalStateStoreImpl(localRecoveryConfig);
taskStateManagers.put(taskKey, taskLocalStateStore);
@@ -173,7 +179,7 @@ public class TaskExecutorLocalStateStoresManager {
LOG.debug("Releasing local state under allocation id {}.", allocationID);
}
- Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> cleanupLocalStores;
+ Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore> cleanupLocalStores;
synchronized (lock) {
if (closed) {
@@ -191,7 +197,7 @@ public class TaskExecutorLocalStateStoresManager {
public void shutdown() {
- HashMap<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> toRelease;
+ HashMap<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> toRelease;
synchronized (lock) {
@@ -208,7 +214,7 @@ public class TaskExecutorLocalStateStoresManager {
LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");
- for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> entry :
+ for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> entry :
toRelease.entrySet()) {
doRelease(entry.getValue().values());
@@ -240,11 +246,11 @@ public class TaskExecutorLocalStateStoresManager {
return allocationDirectories;
}
- private void doRelease(Iterable<TaskLocalStateStoreImpl> toRelease) {
+ private void doRelease(Iterable<OwnedTaskLocalStateStore> toRelease) {
if (toRelease != null) {
- for (TaskLocalStateStoreImpl stateStore : toRelease) {
+ for (OwnedTaskLocalStateStore stateStore : toRelease) {
try {
stateStore.dispose();
} catch (Exception disposeEx) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2bc1eaa4/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
index 686f4f6..b0d8a82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import javax.annotation.Nonnegative;
@@ -33,6 +34,7 @@ import java.util.function.LongPredicate;
* state is typically lost in case of machine failures. In such cases (and others), client code of this class must fall
* back to using the slower but highly available store.
*/
+@Internal
public interface TaskLocalStateStore {
/**
* Stores the local state for the given checkpoint id.
http://git-wip-us.apache.org/repos/asf/flink/blob/2bc1eaa4/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index 29adc4a..9d105e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -53,7 +53,7 @@ import java.util.function.LongPredicate;
/**
* Main implementation of a {@link TaskLocalStateStore}.
*/
-public class TaskLocalStateStoreImpl implements TaskLocalStateStore {
+public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
/** Logger for this class. */
private static final Logger LOG = LoggerFactory.getLogger(TaskLocalStateStoreImpl.class);
@@ -232,6 +232,7 @@ public class TaskLocalStateStoreImpl implements TaskLocalStateStore {
/**
* Disposes the state of all local snapshots managed by this object.
*/
+ @Override
public CompletableFuture<Void> dispose() {
Collection<Map.Entry<Long, TaskStateSnapshot>> statesCopy;
[4/7] flink git commit: [hotfix] Change default for SLOT_IDLE_TIMEOUT
to match HEARTBEAT_TIMEOUT
Posted by sr...@apache.org.
[hotfix] Change default for SLOT_IDLE_TIMEOUT to match HEARTBEAT_TIMEOUT
That preserves sticky slot allocation for local recovery for lost JVMs
that can take as long as the heartbeat timeout to be detected.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b17be267
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b17be267
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b17be267
Branch: refs/heads/master
Commit: b17be267118dd2fe22172c204597aa90db72f909
Parents: a7b54f1
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Mar 13 13:30:48 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:03 2018 +0200
----------------------------------------------------------------------
docs/_includes/generated/job_manager_configuration.html | 2 +-
.../java/org/apache/flink/configuration/JobManagerOptions.java | 3 ++-
2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b17be267/docs/_includes/generated/job_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html
index 251731a..db0ca53 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -54,7 +54,7 @@
</tr>
<tr>
<td><h5>slot.idle.timeout</h5></td>
- <td style="word-wrap: break-word;">10000</td>
+ <td style="word-wrap: break-word;">50000</td>
<td>The timeout in milliseconds for a idle slot in Slot Pool.</td>
</tr>
<tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/b17be267/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index ade3958..add8e68 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -140,7 +140,8 @@ public class JobManagerOptions {
public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT =
key("slot.idle.timeout")
- .defaultValue(10L * 1000L)
+ // default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
+ .defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
.withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");
// ---------------------------------------------------------------------------------------------
[5/7] flink git commit: [hotfix] Small improvements in logging for
local recovery
Posted by sr...@apache.org.
[hotfix] Small improvements in logging for local recovery
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89935997
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89935997
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89935997
Branch: refs/heads/master
Commit: 89935997de03b0f6db89d111a087b0f0f210695d
Parents: 2bc1eaa
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed May 16 17:09:28 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:04 2018 +0200
----------------------------------------------------------------------
.../TaskExecutorLocalStateStoresManager.java | 14 ++++++--------
.../runtime/state/TaskLocalStateStoreImpl.java | 20 ++++++++++++--------
.../runtime/state/TaskStateManagerImpl.java | 7 ++-----
3 files changed, 20 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index cb3b680..6826fbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -158,15 +158,13 @@ public class TaskExecutorLocalStateStoresManager {
taskStateManagers.put(taskKey, taskLocalStateStore);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Registered new local state store with configuration {} for {} - {} - {} under allocation id {}.",
- localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID);
- }
+
+ LOG.debug("Registered new local state store with configuration {} for {} - {} - {} under allocation " +
+ "id {}.", localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID);
+
} else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found existing local state store for {} - {} - {} under allocation id {}.",
- jobId, jobVertexID, subtaskIndex, allocationID);
- }
+ LOG.debug("Found existing local state store for {} - {} - {} under allocation id {}: {}",
+ jobId, jobVertexID, subtaskIndex, allocationID, taskLocalStateStore);
}
return taskLocalStateStore;
http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index 9d105e6..df9147c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -190,17 +190,20 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
snapshot = storedTaskStateByCheckpointID.get(checkpointID);
}
- snapshot = (snapshot != NULL_DUMMY) ? snapshot : null;
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found entry for local state for checkpoint {} in subtask ({} - {} - {}) : {}",
- checkpointID, jobID, jobVertexID, subtaskIndex, snapshot);
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("Found entry for local state for checkpoint {} in subtask ({} - {} - {})",
+ if (snapshot != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found registered local state for checkpoint {} in subtask ({} - {} - {}) : {}",
+ checkpointID, jobID, jobVertexID, subtaskIndex, snapshot);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Found registered local state for checkpoint {} in subtask ({} - {} - {})",
+ checkpointID, jobID, jobVertexID, subtaskIndex);
+ }
+ } else {
+ LOG.debug("Did not find registered local state for checkpoint {} in subtask ({} - {} - {})",
checkpointID, jobID, jobVertexID, subtaskIndex);
}
- return snapshot;
+ return (snapshot != NULL_DUMMY) ? snapshot : null;
}
@Override
@@ -357,6 +360,7 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
", allocationID=" + allocationID +
", subtaskIndex=" + subtaskIndex +
", localRecoveryConfig=" + localRecoveryConfig +
+ ", storedCheckpointIDs=" + storedTaskStateByCheckpointID.keySet() +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index e542ba1..a0aeb3c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -136,11 +136,8 @@ public class TaskStateManagerImpl implements TaskStateManager {
}
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Operator {} has remote state {} from job manager and local state alternatives {} from local " +
- "state store {}.",
- operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore);
- }
+ LOG.debug("Operator {} has remote state {} from job manager and local state alternatives {} from local " +
+ "state store {}.", operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore);
PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(
jobManagerSubtaskState,
[6/7] flink git commit: [hotfix] Expose AllocationID as string
through TaskInfo
Posted by sr...@apache.org.
[hotfix] Expose AllocationID as string through TaskInfo
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edece9c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edece9c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edece9c1
Branch: refs/heads/master
Commit: edece9c1e2ec8e759b783ea07dcaa50d4e8704a2
Parents: 8993599
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Mar 13 13:53:48 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:04 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/api/common/TaskInfo.java | 36 ++++++++++++++++++--
.../util/AbstractRuntimeUDFContext.java | 7 ++++
.../apache/flink/runtime/taskmanager/Task.java | 3 +-
3 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index 33f2e0c..2583687 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.common;
import org.apache.flink.annotation.Internal;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Encapsulates task-specific information: name, index of subtask, parallelism and attempt number.
@@ -31,12 +31,35 @@ public class TaskInfo {
private final String taskName;
private final String taskNameWithSubtasks;
+ private final String allocationIDAsString;
private final int maxNumberOfParallelSubtasks;
private final int indexOfSubtask;
private final int numberOfParallelSubtasks;
private final int attemptNumber;
- public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
+ public TaskInfo(
+ String taskName,
+ int maxNumberOfParallelSubtasks,
+ int indexOfSubtask,
+ int numberOfParallelSubtasks,
+ int attemptNumber) {
+ this(
+ taskName,
+ maxNumberOfParallelSubtasks,
+ indexOfSubtask,
+ numberOfParallelSubtasks,
+ attemptNumber,
+ "UNKNOWN");
+ }
+
+ public TaskInfo(
+ String taskName,
+ int maxNumberOfParallelSubtasks,
+ int indexOfSubtask,
+ int numberOfParallelSubtasks,
+ int attemptNumber,
+ String allocationIDAsString) {
+
checkArgument(indexOfSubtask >= 0, "Task index must be a non-negative number.");
checkArgument(maxNumberOfParallelSubtasks >= 1, "Max parallelism must be a positive number.");
checkArgument(maxNumberOfParallelSubtasks >= numberOfParallelSubtasks, "Max parallelism must be >= than parallelism.");
@@ -49,6 +72,7 @@ public class TaskInfo {
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.attemptNumber = attemptNumber;
this.taskNameWithSubtasks = taskName + " (" + (indexOfSubtask + 1) + '/' + numberOfParallelSubtasks + ')';
+ this.allocationIDAsString = checkNotNull(allocationIDAsString);
}
/**
@@ -107,4 +131,12 @@ public class TaskInfo {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+ /**
+ * Returns the allocation id for where this task is executed.
+ * @return the allocation id for where this task is executed.
+ */
+ public String getAllocationIDAsString() {
+ return allocationIDAsString;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 6246e80..d6262c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.functions.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
@@ -239,4 +240,10 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
+
+ @Internal
+ @VisibleForTesting
+ public String getAllocationIDAsString() {
+ return taskInfo.getAllocationIDAsString();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f7ed231..f8aa0e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -310,7 +310,8 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
taskInformation.getMaxNumberOfSubtaks(),
subtaskIndex,
taskInformation.getNumberOfSubtasks(),
- attemptNumber);
+ attemptNumber,
+ String.valueOf(slotAllocationId));
this.jobId = jobInformation.getJobId();
this.vertexId = taskInformation.getJobVertexId();
[2/7] flink git commit: [hotfix] Abort restore when the procedure
failed through with a closed CloseableRegistry
Posted by sr...@apache.org.
[hotfix] Abort restore when the procedure failed through with a closed CloseableRegistry
This prevents that exceptions from cancellation through the CloseableRegistry will result in
unnecessary recovery attemps with alternative state.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7b54f12
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7b54f12
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7b54f12
Branch: refs/heads/master
Commit: a7b54f12c96bf03fd87b4cf03127f0838e746695
Parents: 85eb104
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Mar 12 22:43:47 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:03 2018 +0200
----------------------------------------------------------------------
.../flink/streaming/api/operators/BackendRestorerProcedure.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a7b54f12/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
index dd75fb2..0f5b0e0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
@@ -125,6 +125,10 @@ public class BackendRestorerProcedure<
LOG.warn("Exception while restoring {} from alternative ({}/{}), will retry while more " +
"alternatives are available.", logDescription, alternativeIdx, restoreOptions.size(), ex);
+
+ if (backendCloseableRegistry.isClosed()) {
+ throw new FlinkException("Stopping restore attempts for already cancelled task.", collectedException);
+ }
}
}
[3/7] flink git commit: [hotfix][docs] Update docs for simplified
configuration of local recovery
Posted by sr...@apache.org.
[hotfix][docs] Update docs for simplified configuration of local recovery
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85eb104b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85eb104b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85eb104b
Branch: refs/heads/master
Commit: 85eb104b04a0ea55420754c017d44852edea1744
Parents: 0deaa3b
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu May 17 10:00:50 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:03 2018 +0200
----------------------------------------------------------------------
docs/ops/state/large_state_tuning.md | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/85eb104b/docs/ops/state/large_state_tuning.md
----------------------------------------------------------------------
diff --git a/docs/ops/state/large_state_tuning.md b/docs/ops/state/large_state_tuning.md
index 6359938..6df551f 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -298,10 +298,7 @@ that the task-local state is an in-memory consisting of heap objects, and not st
### Configuring task-local recovery
Task-local recovery is *deactivated by default* and can be activated through Flink's configuration with the key `state.backend.local-recovery` as specified
-in `CheckpointingOptions.LOCAL_RECOVERY`. Users have currently two choices:
-
-- `DISABLED`: Local recovery is disabled (default).
-- `ENABLE_FILE_BASED`: Local recovery is activated, based on writing a secondary copies of the task state on local disk.
+in `CheckpointingOptions.LOCAL_RECOVERY`. The value for this setting can either be *true* to enable or *false* (default) to disable local recovery.
### Details on task-local recovery for different state backends
[7/7] flink git commit: [FLINK-8910][e2e] Automated test for local
recovery (including sticky allocation)
Posted by sr...@apache.org.
[FLINK-8910][e2e] Automated test for local recovery (including sticky allocation)
This closes #5676.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/489e4281
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/489e4281
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/489e4281
Branch: refs/heads/master
Commit: 489e42811157a9b2575f259f7cda2a2ee680d008
Parents: edece9c
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Mar 6 10:35:44 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:04 2018 +0200
----------------------------------------------------------------------
.../pom.xml | 96 ++++
...StickyAllocationAndLocalRecoveryTestJob.java | 478 +++++++++++++++++++
.../src/main/resources/log4j-test.properties | 27 ++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/run-nightly-tests.sh | 8 +
flink-end-to-end-tests/test-scripts/common.sh | 43 +-
.../test_local_recovery_and_scheduling.sh | 121 +++++
.../TaskExecutorLocalStateStoresManager.java | 2 +-
.../api/operators/BackendRestorerProcedure.java | 2 +
9 files changed, 773 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
new file mode 100644
index 0000000..4b966e2
--- /dev/null
+++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.6-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-local-recovery-and-allocation-test</artifactId>
+ <name>flink-local-recovery-and-allocation-test</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <!-- StickyAllocationAndLocalRecoveryTestJob -->
+ <execution>
+ <id>StickyAllocationAndLocalRecoveryTestJob</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <finalName>StickyAllocationAndLocalRecoveryTestJob</finalName>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.class</include>
+ <include>org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob$*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
new file mode 100644
index 0000000..b03791e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky allocation).
+ *
+ * <p>List of possible input parameters for this job:
+ * <ul>
+ * <li>checkpointDir: the checkpoint directory, required.</li>
+ * <li>parallelism: the parallelism of the job, default 1.</li>
+ * <li>maxParallelism: the maximum parallelism of the job, default 1.</li>
+ * <li>checkpointInterval: the checkpointing interval in milliseconds, default 1000.</li>
+ * <li>restartDelay: the delay of the fixed delay restart strategy, default 0.</li>
+ * <li>externalizedCheckpoints: flag to activate externalized checkpoints, default <code>false</code>.</li>
+ * <li>stateBackend: choice for state backend between <code>file</code> and <code>rocks</code>, default <code>file</code>.</li>
+ * <li>killJvmOnFail: flag that determines whether or not an artificial failure induced by the test kills the JVM or not.</li>
+ * <li>asyncCheckpoints: flag for async checkpoints with file state backend, default <code>true</code>.</li>
+ * <li>incrementalCheckpoints: flag for incremental checkpoint with rocks state backend, default <code>false</code>.</li>
+ * <li>delay: sleep delay to throttle down the production of the source, default 0.</li>
+ * <li>maxAttempts: the maximum number of run attempts, before the job finishes with success, default 3.</li>
+ * <li>valueSize: size of the artificial value for each key in bytes, default 10.</li>
+ * </ul>
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+ public static void main(String[] args) throws Exception {
+
+ final ParameterTool pt = ParameterTool.fromArgs(args);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setParallelism(pt.getInt("parallelism", 1));
+ env.setMaxParallelism(pt.getInt("maxParallelism", pt.getInt("parallelism", 1)));
+ env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, pt.getInt("restartDelay", 0)));
+ if (pt.getBoolean("externalizedCheckpoints", false)) {
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ }
+
+ String stateBackend = pt.get("stateBackend", "file");
+ String checkpointDir = pt.getRequired("checkpointDir");
+
+ boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+ if ("file".equals(stateBackend)) {
+ boolean asyncCheckpoints = pt.getBoolean("asyncCheckpoints", true);
+ env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
+ } else if ("rocks".equals(stateBackend)) {
+ boolean incrementalCheckpoints = pt.getBoolean("incrementalCheckpoints", false);
+ env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
+ } else {
+ throw new IllegalArgumentException("Unknown backend: " + stateBackend);
+ }
+
+ // make parameters available in the web interface
+ env.getConfig().setGlobalJobParameters(pt);
+
+ // delay to throttle down the production of the source
+ long delay = pt.getLong("delay", 0L);
+
+ // the maximum number of attempts, before the job finishes with success
+ int maxAttempts = pt.getInt("maxAttempts", 3);
+
+ // size of one artificial value
+ int valueSize = pt.getInt("valueSize", 10);
+
+ env.addSource(new RandomLongSource(maxAttempts, delay))
+ .keyBy((KeySelector<Long, Long>) aLong -> aLong)
+ .flatMap(new StateCreatingFlatMap(valueSize, killJvmOnFail))
+ .addSink(new PrintSinkFunction<>());
+
+ env.execute("Sticky Allocation And Local Recovery Test");
+ }
+
+ /**
+ * Source function that produces a long sequence.
+ */
+ private static final class RandomLongSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Generator delay between two events.
+ */
+ final long delay;
+
+ /**
+ * Maximum restarts before shutting down this source.
+ */
+ final int maxAttempts;
+
+ /**
+ * State that holds the current key for recovery.
+ */
+ transient ListState<Long> sourceCurrentKeyState;
+
+ /**
+ * Generator's current key.
+ */
+ long currentKey;
+
+ /**
+ * Generator runs while this is true.
+ */
+ volatile boolean running;
+
+ RandomLongSource(int maxAttempts, long delay) {
+ this.delay = delay;
+ this.maxAttempts = maxAttempts;
+ this.running = true;
+ }
+
+ @Override
+ public void run(SourceContext<Long> sourceContext) throws Exception {
+
+ int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+ // the source emits one final event and shuts down once we have reached max attempts.
+ if (getRuntimeContext().getAttemptNumber() > maxAttempts) {
+ synchronized (sourceContext.getCheckpointLock()) {
+ sourceContext.collect(Long.MAX_VALUE - subtaskIdx);
+ }
+ return;
+ }
+
+ while (running) {
+
+ synchronized (sourceContext.getCheckpointLock()) {
+ sourceContext.collect(currentKey);
+ currentKey += numberOfParallelSubtasks;
+ }
+
+ if (delay > 0) {
+ Thread.sleep(delay);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ sourceCurrentKeyState.clear();
+ sourceCurrentKeyState.add(currentKey);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+
+ ListStateDescriptor<Long> currentKeyDescriptor = new ListStateDescriptor<>("currentKey", Long.class);
+ sourceCurrentKeyState = context.getOperatorStateStore().getListState(currentKeyDescriptor);
+
+ currentKey = getRuntimeContext().getIndexOfThisSubtask();
+ Iterable<Long> iterable = sourceCurrentKeyState.get();
+ if (iterable != null) {
+ Iterator<Long> iterator = iterable.iterator();
+ if (iterator.hasNext()) {
+ currentKey = iterator.next();
+ Preconditions.checkState(!iterator.hasNext());
+ }
+ }
+ }
+ }
+
+ /**
+ * Stateful map function. Failure creation and checks happen here.
+ */
+ private static final class StateCreatingFlatMap
+ extends RichFlatMapFunction<Long, String> implements CheckpointedFunction, CheckpointListener {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * User configured size of the generated artificial values in the keyed state.
+ */
+ final int valueSize;
+
+ /**
+ * Holds the user configuration if the artificial test failure is killing the JVM.
+ */
+ final boolean killTaskOnFailure;
+
+ /**
+ * This state is used to create artificial keyed state in the backend.
+ */
+ transient ValueState<String> valueState;
+
+ /**
+ * This state is used to persist the schedulingAndFailureInfo to state.
+ */
+ transient ListState<MapperSchedulingAndFailureInfo> schedulingAndFailureState;
+
+ /**
+ * This contains the current scheduling and failure meta data.
+ */
+ transient MapperSchedulingAndFailureInfo currentSchedulingAndFailureInfo;
+
+ /**
+ * Message to indicate that recovery detected a failure with sticky allocation.
+ */
+ transient volatile String allocationFailureMessage;
+
+ /**
+ * If this flag is true, the next invocation of the map function introduces a test failure.
+ */
+ transient volatile boolean failTask;
+
+ StateCreatingFlatMap(int valueSize, boolean killTaskOnFailure) {
+ this.valueSize = valueSize;
+ this.failTask = false;
+ this.killTaskOnFailure = killTaskOnFailure;
+ this.allocationFailureMessage = null;
+ }
+
+ @Override
+ public void flatMap(Long key, Collector<String> collector) throws IOException {
+
+ if (allocationFailureMessage != null) {
+ // Report the failure downstream, so that we can get the message from the output.
+ collector.collect(allocationFailureMessage);
+ allocationFailureMessage = null;
+ }
+
+ if (failTask) {
+ // we fail the task, either by killing the JVM hard, or by throwing a user code exception.
+ if (killTaskOnFailure) {
+ Runtime.getRuntime().halt(-1);
+ } else {
+ throw new RuntimeException("Artificial user code exception.");
+ }
+ }
+
+ // sanity check
+ if (null != valueState.value()) {
+ throw new IllegalStateException("This should never happen, keys are generated monotonously.");
+ }
+
+ // store artificial data to blow up the state
+ valueState.update(RandomStringUtils.random(valueSize, true, true));
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+ ValueStateDescriptor<String> stateDescriptor =
+ new ValueStateDescriptor<>("state", String.class);
+ valueState = functionInitializationContext.getKeyedStateStore().getState(stateDescriptor);
+
+ ListStateDescriptor<MapperSchedulingAndFailureInfo> mapperInfoStateDescriptor =
+ new ListStateDescriptor<>("mapperState", MapperSchedulingAndFailureInfo.class);
+ schedulingAndFailureState =
+ functionInitializationContext.getOperatorStateStore().getUnionListState(mapperInfoStateDescriptor);
+
+ StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
+ String allocationID = runtimeContext.getAllocationIDAsString();
+
+ final int thisJvmPid = getJvmPid();
+ final Set<Integer> killedJvmPids = new HashSet<>();
+
+ // here we check if the sticky scheduling worked as expected
+ if (functionInitializationContext.isRestored()) {
+ Iterable<MapperSchedulingAndFailureInfo> iterable = schedulingAndFailureState.get();
+ String taskNameWithSubtasks = runtimeContext.getTaskNameWithSubtasks();
+
+ MapperSchedulingAndFailureInfo infoForThisTask = null;
+ List<MapperSchedulingAndFailureInfo> completeInfo = new ArrayList<>();
+ if (iterable != null) {
+ for (MapperSchedulingAndFailureInfo testInfo : iterable) {
+
+ completeInfo.add(testInfo);
+
+ if (taskNameWithSubtasks.equals(testInfo.taskNameWithSubtask)) {
+ infoForThisTask = testInfo;
+ }
+
+ if (testInfo.killedJvm) {
+ killedJvmPids.add(testInfo.jvmPid);
+ }
+ }
+ }
+
+ Preconditions.checkNotNull(infoForThisTask, "Expected to find info here.");
+
+ if (!isScheduledToCorrectAllocation(infoForThisTask, allocationID, killedJvmPids)) {
+ allocationFailureMessage = String.format(
+ "Sticky allocation test failed: Subtask %s in attempt %d was rescheduled from allocation %s " +
+ "on JVM with PID %d to unexpected allocation %s on JVM with PID %d.\n" +
+ "Complete information from before the crash: %s.",
+ runtimeContext.getTaskNameWithSubtasks(),
+ runtimeContext.getAttemptNumber(),
+ infoForThisTask.allocationId,
+ infoForThisTask.jvmPid,
+ allocationID,
+ thisJvmPid,
+ completeInfo);
+ }
+ }
+
+ // We determine which of the subtasks will produce the artificial failure
+ boolean failingTask = shouldTaskFailForThisAttempt();
+
+ // We take note of all the meta info that we require to check sticky scheduling in the next re-attempt
+ this.currentSchedulingAndFailureInfo = new MapperSchedulingAndFailureInfo(
+ failingTask,
+ failingTask && killTaskOnFailure,
+ thisJvmPid,
+ runtimeContext.getTaskNameWithSubtasks(),
+ allocationID);
+
+ schedulingAndFailureState.clear();
+ schedulingAndFailureState.add(currentSchedulingAndFailureInfo);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // we can only fail the task after at least one checkpoint is completed to record progress.
+ failTask = currentSchedulingAndFailureInfo.failingTask;
+ }
+
+ private boolean shouldTaskFailForThisAttempt() {
+ RuntimeContext runtimeContext = getRuntimeContext();
+ int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+ int subtaskIdx = runtimeContext.getIndexOfThisSubtask();
+ int attempt = runtimeContext.getAttemptNumber();
+ return (attempt % numSubtasks) == subtaskIdx;
+ }
+
+ private boolean isScheduledToCorrectAllocation(
+ MapperSchedulingAndFailureInfo infoForThisTask,
+ String allocationID,
+ Set<Integer> killedJvmPids) {
+
+ return (infoForThisTask.allocationId.equals(allocationID)
+ || killedJvmPids.contains(infoForThisTask.jvmPid));
+ }
+ }
+
+ /**
+ * This code is copied from Stack Overflow.
+ *
+ * <p><a href="https://stackoverflow.com/questions/35842">https://stackoverflow.com/questions/35842</a>, answer
+ * <a href="https://stackoverflow.com/a/12066696/9193881">https://stackoverflow.com/a/12066696/9193881</a>
+ *
+ * <p>Author: <a href="https://stackoverflow.com/users/446591/brad-mace">Brad Mace</a>)
+ */
+ private static int getJvmPid() throws Exception {
+ java.lang.management.RuntimeMXBean runtime =
+ java.lang.management.ManagementFactory.getRuntimeMXBean();
+ java.lang.reflect.Field jvm = runtime.getClass().getDeclaredField("jvm");
+ jvm.setAccessible(true);
+ sun.management.VMManagement mgmt =
+ (sun.management.VMManagement) jvm.get(runtime);
+ java.lang.reflect.Method pidMethod =
+ mgmt.getClass().getDeclaredMethod("getProcessId");
+ pidMethod.setAccessible(true);
+
+ return (int) (Integer) pidMethod.invoke(mgmt);
+ }
+
+ /**
+ * Records the information required to check sticky scheduling after a restart.
+ */
+ public static class MapperSchedulingAndFailureInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * True iff this task inflicts a test failure.
+ */
+ final boolean failingTask;
+
+ /**
+ * True iff this task kills its JVM.
+ */
+ final boolean killedJvm;
+
+ /**
+ * PID of the task JVM.
+ */
+ final int jvmPid;
+
+ /**
+ * Name and subtask index of the task.
+ */
+ final String taskNameWithSubtask;
+
+ /**
+ * The current allocation id of this task.
+ */
+ final String allocationId;
+
+ MapperSchedulingAndFailureInfo(
+ boolean failingTask,
+ boolean killedJvm,
+ int jvmPid,
+ String taskNameWithSubtask,
+ String allocationId) {
+
+ this.failingTask = failingTask;
+ this.killedJvm = killedJvm;
+ this.jvmPid = jvmPid;
+ this.taskNameWithSubtask = taskNameWithSubtask;
+ this.allocationId = allocationId;
+ }
+
+ @Override
+ public String toString() {
+ return "MapperTestInfo{" +
+ "failingTask=" + failingTask +
+ ", killedJvm=" + killedJvm +
+ ", jvmPid=" + jvmPid +
+ ", taskNameWithSubtask='" + taskNameWithSubtask + '\'' +
+ ", allocationId='" + allocationId + '\'' +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
new file mode 100644
index 0000000..37c65e9
--- /dev/null
+++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index dadb46f..367e120 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -43,6 +43,7 @@ under the License.
<module>flink-distributed-cache-via-blob-test</module>
<module>flink-high-parallelism-iterations-test</module>
<module>flink-stream-stateful-job-upgrade-test</module>
+ <module>flink-local-recovery-and-allocation-test</module>
</modules>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index d4309c1..bd91bb2 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -132,5 +132,13 @@ if [ $EXIT_CODE == 0 ]; then
EXIT_CODE=$?
fi
+if [ $EXIT_CODE == 0 ]; then
+ printf "\n==============================================================================\n"
+ printf "Running local recovery and sticky scheduling nightly end-to-end test\n"
+ printf "==============================================================================\n"
+ $END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh
+ EXIT_CODE=$?
+fi
+
# Exit code for Travis build success/failure
exit $EXIT_CODE
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index ec963c5..56a5d27 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -320,6 +320,39 @@ function s3_delete {
https://${bucket}.s3.amazonaws.com/${s3_file}
}
+# This function starts the given number of task managers and monitors their processes. If a task manager process goes
+# away a replacement is started.
+function tm_watchdog {
+ local expectedTm=$1
+ while true;
+ do
+ runningTm=`jps | grep -Eo 'TaskManagerRunner|TaskManager' | wc -l`;
+ count=$((expectedTm-runningTm))
+ for (( c=0; c<count; c++ ))
+ do
+ $FLINK_DIR/bin/taskmanager.sh start > /dev/null
+ done
+ sleep 5;
+ done
+}
+
+# Kills all job manager.
+function jm_kill_all {
+ kill_all 'StandaloneSessionClusterEntrypoint'
+}
+
+# Kills all task manager.
+function tm_kill_all {
+ kill_all 'TaskManagerRunner|TaskManager'
+}
+
+# Kills all processes that match the given name.
+function kill_all {
+ local pid=`jps | grep -E "${1}" | cut -d " " -f 1`
+ kill ${pid} 2> /dev/null
+ wait ${pid} 2> /dev/null
+}
+
function kill_random_taskmanager {
KILL_TM=$(jps | grep "TaskManager" | sort -R | head -n 1 | awk '{print $1}')
kill -9 "$KILL_TM"
@@ -432,12 +465,14 @@ function run_test {
return "${exit_code}"
}
-# make sure to clean up even in case of failures
+# Shuts down the cluster and cleans up all temporary folders and files. Make sure to clean up even in case of failures.
function cleanup {
stop_cluster
- check_all_pass
- rm -rf $TEST_DATA_DIR
- rm -f $FLINK_DIR/log/*
+ tm_kill_all
+ jm_kill_all
+ rm -rf $TEST_DATA_DIR 2> /dev/null
revert_default_config
+ check_all_pass
+ rm -rf $FLINK_DIR/log/* 2> /dev/null
}
trap cleanup EXIT
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
new file mode 100755
index 0000000..98ef01f
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+# This function checks the logs for entries that indicate problems with local recovery
+function check_logs {
+ local parallelism=$1
+ local attempts=$2
+ (( expected_count=parallelism * (attempts + 1) ))
+
+ # Search for the log message that indicates restore problem from existing local state for the keyed backend.
+ local failed_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+ # Search for attempts to recover locally.
+ local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+ if [ ${failed_local_recovery} -ne 0 ]
+ then
+ PASS=""
+ echo "FAILURE: Found ${failed_local_recovery} failed attempt(s) for local recovery of correctly scheduled task(s)."
+ fi
+
+ if [ ${attempt_local_recovery} -eq 0 ]
+ then
+ PASS=""
+ echo "FAILURE: Found no attempt for local recovery. Configuration problem?"
+ fi
+}
+
+# This function does a cleanup after the test. The configuration is restored, the watchdog is terminated and temporary
+# files and folders are deleted.
+function cleanup_after_test {
+ # Reset the configurations
+ sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' "$FLINK_DIR/conf/log4j.properties"
+ #
+ kill ${watchdog_pid} 2> /dev/null
+ wait ${watchdog_pid} 2> /dev/null
+ #
+ cleanup
+}
+
+# Calls the cleanup step for this tests and exits with an error.
+function cleanup_after_test_and_exit_fail {
+ cleanup_after_test
+ exit 1
+}
+
+## This function executes one run for a certain configuration
+function run_local_recovery_test {
+ local parallelism=$1
+ local max_attempts=$2
+ local backend=$3
+ local incremental=$4
+ local kill_jvm=$5
+
+ echo "Running local recovery test on ${backend} backend: incremental checkpoints = ${incremental}, kill JVM = ${kill_jvm}."
+ TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar
+
+ # Backup conf and configure for HA
+ backup_config
+ create_ha_config
+
+ # Enable debug logging
+ sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' "$FLINK_DIR/conf/log4j.properties"
+
+ # Enable local recovery
+ set_conf "state.backend.local-recovery" "true"
+ # Ensure that each TM only has one operator(chain)
+ set_conf "taskmanager.numberOfTaskSlots" "1"
+
+ rm $FLINK_DIR/log/* 2> /dev/null
+
+ # Start HA server
+ start_local_zk
+ start_cluster
+
+ tm_watchdog ${parallelism} &
+ watchdog_pid=$!
+
+ echo "Started TM watchdog with PID ${watchdog_pid}."
+
+ $FLINK_DIR/bin/flink run -c org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob \
+ -p ${parallelism} $TEST_PROGRAM_JAR \
+ -D state.backend.local-recovery=ENABLE_FILE_BASED \
+ --checkpointDir file://$TEST_DATA_DIR/local_recovery_test/checkpoints \
+ --output $TEST_DATA_DIR/out/local_recovery_test/out --killJvmOnFail ${kill_jvm} --checkpointInterval 1000 \
+ --maxAttempts ${max_attempts} --parallelism ${parallelism} --stateBackend ${backend} \
+ --incrementalCheckpoints ${incremental}
+
+ check_logs ${parallelism} ${max_attempts}
+ cleanup_after_test
+}
+
+## MAIN
+trap cleanup_after_test_and_exit_fail EXIT
+run_local_recovery_test 4 3 "file" "false" "false"
+run_local_recovery_test 4 3 "file" "false" "true"
+run_local_recovery_test 4 10 "rocks" "false" "false"
+run_local_recovery_test 4 10 "rocks" "true" "false"
+run_local_recovery_test 4 10 "rocks" "false" "true"
+run_local_recovery_test 4 10 "rocks" "true" "true"
+trap - EXIT
+exit 0
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 6826fbd..095dc86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -142,7 +142,7 @@ public class TaskExecutorLocalStateStoresManager {
LocalRecoveryConfig localRecoveryConfig =
new LocalRecoveryConfig(localRecoveryEnabled, directoryProvider);
- taskLocalStateStore = (localRecoveryMode != LocalRecoveryConfig.LocalRecoveryMode.DISABLED) ?
+ taskLocalStateStore = localRecoveryConfig.isLocalRecoveryEnabled() ?
// Real store implementation if local recovery is enabled
new TaskLocalStateStoreImpl(
http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
index 0f5b0e0..29746dc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
@@ -105,6 +105,8 @@ public class BackendRestorerProcedure<
++alternativeIdx;
+ // IMPORTANT: please be careful when modifying the log statements because they are used for validation in
+ // the automatic end-to-end tests. Those tests might fail if they are not aligned with the log message!
if (restoreState.isEmpty()) {
LOG.debug("Creating {} with empty state.", logDescription);
} else {