You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/13 10:59:08 UTC

[1/2] flink git commit: [FLINK-5326] [network] Check release flag of parent in reader

Repository: flink
Updated Branches:
  refs/heads/master 38ab7164a -> 369837971


[FLINK-5326] [network] Check release flag of parent in reader

In PipelinedSubpartitionView, there is a possible race with
releasing the parent subpartition and querying for a buffer
in the view.

The parent partition release clears all buffers in locked
scope and releases the view outside of the lock. If concurrently
the view is queried for a buffer it might get null, which
is only allowed if the view was released.

Because the release is only forwarded out of the lock scope,
this can happen before the release has propagated.

As a solution, we check the parent release status as well in the
view. This is how it is handled in the spilled views, too.

This surfaced with the recent refactorings, because the previous
consumption model required multiple rounds of get, registerListener,
isReleased calls, which hid this problem.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d965d5ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d965d5ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d965d5ab

Branch: refs/heads/master
Commit: d965d5abdc389e9b65fd35a69bb16bfb71008504
Parents: 38ab716
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Dec 13 11:26:47 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Dec 13 11:26:48 2016 +0100

----------------------------------------------------------------------
 .../partition/PipelinedSubpartitionView.java    |  2 +-
 .../partition/PipelinedSubpartitionTest.java    | 28 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d965d5ab/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 52c78ea..fda2135 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -70,7 +70,7 @@ class PipelinedSubpartitionView implements ResultSubpartitionView {
 
 	@Override
 	public boolean isReleased() {
-		return isReleased.get();
+		return isReleased.get() || parent.isReleased();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d965d5ab/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index a56177e..a97e306 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -19,11 +19,14 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
+import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
@@ -31,19 +34,25 @@ import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
@@ -132,6 +141,25 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		testProduceConsume(true, true);
 	}
 
+	/**
+	 * Verifies that the isReleased() check of the view checks the parent
+	 * subpartition.
+	 */
+	@Test
+	public void testIsReleasedChecksParent() throws Exception {
+		PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class);
+
+		PipelinedSubpartitionView reader = new PipelinedSubpartitionView(
+				subpartition, mock(BufferAvailabilityListener.class));
+
+		assertFalse(reader.isReleased());
+		verify(subpartition, times(1)).isReleased();
+
+		when(subpartition.isReleased()).thenReturn(true);
+		assertTrue(reader.isReleased());
+		verify(subpartition, times(2)).isReleased();
+	}
+
 	private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception {
 		// Config
 		final int producerBufferPoolSize = 8;


[2/2] flink git commit: [docs] Add docs about externalized checkpoints

Posted by uc...@apache.org.
[docs] Add docs about externalized checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36983797
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36983797
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36983797

Branch: refs/heads/master
Commit: 3698379715a9373b3de59bd4cd12b6cb69154361
Parents: d965d5a
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Dec 13 11:32:43 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Dec 13 11:58:33 2016 +0100

----------------------------------------------------------------------
 docs/setup/cli.md             |  4 ++--
 docs/setup/config.md          |  2 ++
 docs/setup/fault_tolerance.md | 25 +++++++++++++++++++++++++
 3 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36983797/docs/setup/cli.md
----------------------------------------------------------------------
diff --git a/docs/setup/cli.md b/docs/setup/cli.md
index 855757f..ef5adb3 100644
--- a/docs/setup/cli.md
+++ b/docs/setup/cli.md
@@ -146,7 +146,7 @@ This allows the job to finish processing all inflight data.
 
 Returns the path of the created savepoint. You need this path to restore and dispose savepoints.
 
-You can optionally specify a `savepointDirectory` when triggering the savepoint. If you don't specify one here, you need to configure a default savepoint directory for the Flink installation (see [[savepoint.html#configuration]]).
+You can optionally specify a `savepointDirectory` when triggering the savepoint. If you don't specify one here, you need to configure a default savepoint directory for the Flink installation (see [Savepoints](savepoints.html#configuration)).
 
 ##### Cancel with a savepoint
 
@@ -156,7 +156,7 @@ You can atomically trigger a savepoint and cancel a job.
 ./bin/flink cancel -s  [savepointDirectory] <jobID>
 {% endhighlight %}
 
-If no savepoint directory is configured, you need to configure a default savepoint directory for the Flink installation (see [[savepoint.html#configuration]]).
+If no savepoint directory is configured, you need to configure a default savepoint directory for the Flink installation (see [Savepoints](savepoints.html#configuration)).
 
 The job will only be cancelled if the savepoint succeeds.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/36983797/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 51ef41c..c24065a 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -155,6 +155,8 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `state.backend.rocksdb.checkpointdir`:  The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example \u2018:\u2019 (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`)
 
+- `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/fault_tolerance.md#externalized-checkpoints).
+
 - `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers.

http://git-wip-us.apache.org/repos/asf/flink/blob/36983797/docs/setup/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/setup/fault_tolerance.md b/docs/setup/fault_tolerance.md
index 7fb3df4..cef746e 100644
--- a/docs/setup/fault_tolerance.md
+++ b/docs/setup/fault_tolerance.md
@@ -95,6 +95,31 @@ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
 
 {% top %}
 
+### Externalized Checkpoints
+
+You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are *not* automatically cleaned up when the job fails. This way, you will have a checkpoint around to resume from if your job fails.
+
+```java
+CheckpoingConfig config = env.getCheckpointConfig();
+config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+```
+
+The `ExternalizedCheckpointCleanup` mode configures what happens with externalized checkpoints when you cancel the job:
+
+- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the externalized checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case.
+
+- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the externalized checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails.
+
+The **target directory** for the checkpoint is determined from the default checkpoint directory configuration. This is configured via the configuration key `state.checkpoints.dir`, which should point to the desired target directory:
+
+```
+state.checkpoints.dir: hdfs:///checkpoints/
+```
+
+This directory will then contain the checkpoint meta data required to restore the checkpoint. The actual checkpoint files will still be available in their configured directory. You currently can only set this via the configuration files.
+
+Follow the [savepoint guide]({{ site.baseurl }}/setup/cli.html#savepoints) when you want to resume from a specific checkpoint.
+
 ### Fault Tolerance Guarantees of Data Sources and Sinks
 
 Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the