You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/06/13 08:39:29 UTC
[1/2] flink git commit: [FLINK-6833] [task] Fail StreamTask only due
to async exception if it is running
Repository: flink
Updated Branches:
refs/heads/release-1.3 db975260c -> a3103c2dd
[FLINK-6833] [task] Fail StreamTask only due to async exception if it is running
In order to resolve a race condition between a properly terminated StreamTask which
cleans up its resources (stopping asynchronous operations, etc.) and a cancelled
asynchronous operation (e.g. asynchronous checkpointing operation), we check whether
the StreamTask is still running before failing it externally.
This closes #4058.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3103c2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3103c2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3103c2d
Branch: refs/heads/release-1.3
Commit: a3103c2ddd6b8a7566b8fd27f3acd695b4b345c7
Parents: a1d70f5
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jun 2 15:48:54 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 13 10:35:58 2017 +0200
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTask.java | 7 +-
.../tasks/StreamTaskTerminationTest.java | 288 +++++++++++++++++++
2 files changed, 293 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a3103c2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index bc66751..5495970 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -822,11 +822,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
* FAILED, and, if the invokable code is running, starts an asynchronous thread
* that aborts that code.
*
- * <p>This method never blocks.</p>
+ * <p>This method never blocks.
*/
@Override
public void handleAsyncException(String message, Throwable exception) {
- getEnvironment().failExternally(exception);
+ if (isRunning) {
+ // only fail if the task is still running
+ getEnvironment().failExternally(exception);
+ }
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a3103c2d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
new file mode 100644
index 0000000..f021b38
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the StreamTask termination.
+ */
+public class StreamTaskTerminationTest extends TestLogger {
+
+ public static final OneShotLatch RUN_LATCH = new OneShotLatch();
+ public static final OneShotLatch CHECKPOINTING_LATCH = new OneShotLatch();
+ private static final OneShotLatch CLEANUP_LATCH = new OneShotLatch();
+ private static final OneShotLatch HANDLE_ASYNC_EXCEPTION_LATCH = new OneShotLatch();
+
+ /**
+ * FLINK-6833
+ *
+ * <p>Tests that a finished stream task cannot be failed by an asynchronous checkpointing operation after
+ * the stream task has stopped running.
+ */
+ @Test
+ public void testConcurrentAsyncCheckpointCannotFailFinishedStreamTask() throws Exception {
+ final Configuration taskConfiguration = new Configuration();
+ final StreamConfig streamConfig = new StreamConfig(taskConfiguration);
+ final NoOpStreamOperator<Long> noOpStreamOperator = new NoOpStreamOperator<>();
+
+ final AbstractStateBackend blockingStateBackend = new BlockingStateBackend();
+
+ streamConfig.setStreamOperator(noOpStreamOperator);
+ streamConfig.setStateBackend(blockingStateBackend);
+
+ final long checkpointId = 0L;
+ final long checkpointTimestamp = 0L;
+
+ final JobInformation jobInformation = new JobInformation(
+ new JobID(),
+ "Test Job",
+ new SerializedValue<>(new ExecutionConfig()),
+ new Configuration(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList());
+
+ final TaskInformation taskInformation = new TaskInformation(
+ new JobVertexID(),
+ "Test Task",
+ 1,
+ 1,
+ BlockingStreamTask.class.getName(),
+ taskConfiguration);
+
+ final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
+
+ final NetworkEnvironment networkEnv = mock(NetworkEnvironment.class);
+ when(networkEnv.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mock(TaskKvStateRegistry.class));
+
+ final Task task = new Task(
+ jobInformation,
+ taskInformation,
+ new ExecutionAttemptID(),
+ new AllocationID(),
+ 0,
+ 0,
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ 0,
+ null,
+ new MemoryManager(32L * 1024L, 1),
+ new IOManagerAsync(),
+ networkEnv,
+ mock(BroadcastVariableManager.class),
+ mock(TaskManagerActions.class),
+ mock(InputSplitProvider.class),
+ mock(CheckpointResponder.class),
+ new FallbackLibraryCacheManager(),
+ mock(FileCache.class),
+ taskManagerRuntimeInfo,
+ new UnregisteredTaskMetricsGroup(),
+ mock(ResultPartitionConsumableNotifier.class),
+ mock(PartitionProducerStateChecker.class),
+ Executors.directExecutor());
+
+ Future<Void> taskRun = FlinkCompletableFuture.supplyAsync(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ task.run();
+
+ return null;
+ }
+ }, TestingUtils.defaultExecutor());
+
+ // wait until the stream task started running
+ RUN_LATCH.await();
+
+ // trigger a checkpoint
+ task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, CheckpointOptions.forFullCheckpoint());
+
+ // wait until the task has completed execution
+ taskRun.get();
+
+ // check that no failure occurred
+ if (task.getFailureCause() != null) {
+ throw new Exception("Task failed", task.getFailureCause());
+ }
+
+ // check that we have entered the finished state
+ assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+ }
+
+ /**
+ * Blocking stream task which waits on and triggers a set of one shot latches to establish a certain
+ * interleaving with a concurrently running checkpoint operation.
+ */
+ public static class BlockingStreamTask<T, OP extends StreamOperator<T>> extends StreamTask<T, OP> {
+
+ public BlockingStreamTask() {
+ }
+
+ @Override
+ protected void init() throws Exception {
+
+ }
+
+ @Override
+ protected void run() throws Exception {
+ RUN_LATCH.trigger();
+ // wait until we have started an asynchronous checkpoint
+ CHECKPOINTING_LATCH.await();
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ // notify the asynchronous checkpoint operation that we have reached the cleanup stage --> the task
+ // has been stopped
+ CLEANUP_LATCH.trigger();
+
+ // wait until handle async exception has been called to proceed with the termination of the
+ // StreamTask
+ HANDLE_ASYNC_EXCEPTION_LATCH.await();
+ }
+
+ @Override
+ protected void cancelTask() throws Exception {
+ }
+
+ @Override
+ public void handleAsyncException(String message, Throwable exception) {
+ super.handleAsyncException(message, exception);
+
+ HANDLE_ASYNC_EXCEPTION_LATCH.trigger();
+ }
+ }
+
+ static class NoOpStreamOperator<T> extends AbstractStreamOperator<T> {
+ private static final long serialVersionUID = 4517845269225218312L;
+ }
+
+ static class BlockingStateBackend extends AbstractStateBackend {
+
+ private static final long serialVersionUID = -5053068148933314100L;
+
+ @Override
+ public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
+ return mock(CheckpointStreamFactory.class);
+ }
+
+ @Override
+ public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, @Nullable String targetLocation) throws IOException {
+ return null;
+ }
+
+ @Override
+ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+ Environment env,
+ JobID jobID,
+ String operatorIdentifier,
+ TypeSerializer<K> keySerializer,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ TaskKvStateRegistry kvStateRegistry) throws IOException {
+ return null;
+ }
+
+ @Override
+ public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
+ OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class);
+ when(operatorStateBackend.snapshot(anyLong(), anyLong(), any(CheckpointStreamFactory.class), any(CheckpointOptions.class)))
+ .thenReturn(new FutureTask<>(new BlockingCallable()));
+
+ return operatorStateBackend;
+ }
+ }
+
+ static class BlockingCallable implements Callable<OperatorStateHandle> {
+
+ @Override
+ public OperatorStateHandle call() throws Exception {
+ // notify that we have started the asynchronous checkpointint operation
+ CHECKPOINTING_LATCH.trigger();
+ // wait until we have reached the StreamTask#cleanup --> This will already cancel this FutureTask
+ CLEANUP_LATCH.await();
+
+ // now throw exception to fail the async checkpointing operation if it has not already been cancelled
+ // by the StreamTask in the meantime
+ throw new FlinkException("Checkpointing operation failed");
+ }
+ }
+}
[2/2] flink git commit: [FLINK-6899] [state] Create correctly sized
state array in NestedMapsStateTable
Posted by tr...@apache.org.
[FLINK-6899] [state] Create correctly sized state array in NestedMapsStateTable
This closes #4107.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1d70f57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1d70f57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1d70f57
Branch: refs/heads/release-1.3
Commit: a1d70f5730c7186dd4aa02ec5628a19d8c56f6e8
Parents: db97526
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jun 12 15:21:47 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 13 10:35:58 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/state/heap/NestedMapsStateTable.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1d70f57/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
index 75c31db..e31e58f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -68,7 +68,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();
@SuppressWarnings("unchecked")
- Map<N, Map<K, S>>[] state = (Map<N, Map<K, S>>[]) new Map[keyContext.getNumberOfKeyGroups()];
+ Map<N, Map<K, S>>[] state = (Map<N, Map<K, S>>[]) new Map[keyContext.getKeyGroupRange().getNumberOfKeyGroups()];
this.state = state;
}