You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/05 11:59:52 UTC

[flink] branch release-1.10 updated (bf722e7 -> 4f1d9b0)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from bf722e7  [FLINK-15863][docs] Fix docs stating that savepoints are relocatable
     new c305784  [FLINK-15905][runtime] Fix race condition between allocation and release of OpaqueMemoryResource
     new 4f1d9b0  [FLINK-15916][docs] Remove outdated sections for Asnc Snapshots and Network buffers from Tuning Checkpoints and Large State

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/ops/state/large_state_tuning.md               | 21 -------------
 docs/ops/state/large_state_tuning.zh.md            | 21 -------------
 .../apache/flink/runtime/memory/MemoryManager.java | 21 ++++++++-----
 .../flink/runtime/memory/SharedResources.java      | 36 ++++++++++++++--------
 .../memory/MemoryManagerSharedResourcesTest.java   |  4 +++
 .../flink/runtime/memory/SharedResourcesTest.java  | 35 +++++++++++++++++++++
 6 files changed, 77 insertions(+), 61 deletions(-)


[flink] 01/02: [FLINK-15905][runtime] Fix race condition between allocation and release of OpaqueMemoryResource

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c305784f868d2795b433534764847edfec05a969
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Feb 4 20:54:55 2020 +0100

    [FLINK-15905][runtime] Fix race condition between allocation and release of OpaqueMemoryResource
---
 .../apache/flink/runtime/memory/MemoryManager.java | 21 ++++++++-----
 .../flink/runtime/memory/SharedResources.java      | 36 ++++++++++++++--------
 .../memory/MemoryManagerSharedResourcesTest.java   |  4 +++
 .../flink/runtime/memory/SharedResourcesTest.java  | 35 +++++++++++++++++++++
 4 files changed, 77 insertions(+), 19 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 46fe3ba..8e7c2dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -50,6 +50,7 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 import static org.apache.flink.core.memory.MemorySegmentFactory.allocateOffHeapUnsafeMemory;
 import static org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment;
@@ -567,6 +568,13 @@ public class MemoryManager {
 	 *
 	 * <p>The OpaqueMemoryResource object returned from this method must be closed once not used any further.
 	 * Once all acquisitions have closed the object, the resource itself is closed.
+	 *
+	 * <p><b>Important:</b> The failure semantics are as follows: If the memory manager fails to reserve
+	 * the memory, the external resource initializer will not be called. If an exception is thrown when the
+	 * opaque resource is closed (last lease is released), the memory manager will still un-reserve the
+	 * memory to make sure its own accounting is clean. The exception will need to be handled by the caller of
+	 * {@link OpaqueMemoryResource#close()}. For example, if this indicates that native memory was not released
+	 * and the process might thus have a memory leak, the caller can decide to kill the process as a result.
 	 */
 	public <T extends AutoCloseable> OpaqueMemoryResource<T> getSharedMemoryResourceForManagedMemory(
 			String type,
@@ -576,7 +584,9 @@ public class MemoryManager {
 		// if we need to allocate the resource (no shared resource allocated, yet), this would be the size to use
 		final long numBytes = computeMemorySize(fractionToInitializeWith);
 
-		// the initializer attempt to reserve the memory before actual initialization
+		// initializer and releaser as functions that are pushed into the SharedResources,
+		// so that the SharedResources can decide in (thread-safely execute) when initialization
+		// and release should happen
 		final LongFunctionWithException<T, Exception> reserveAndInitialize = (size) -> {
 			try {
 				reserveMemory(type, MemoryType.OFF_HEAP, size);
@@ -588,6 +598,8 @@ public class MemoryManager {
 			return initializer.apply(size);
 		};
 
+		final Consumer<Long> releaser = (size) -> releaseMemory(type, MemoryType.OFF_HEAP, size);
+
 		// This object identifies the lease in this request. It is used only to identify the release operation.
 		// Using the object to represent the lease is a bit nicer safer than just using a reference counter.
 		final Object leaseHolder = new Object();
@@ -599,12 +611,7 @@ public class MemoryManager {
 		// someone else before with a different value for fraction (should not happen in practice, though).
 		final long size = resource.size();
 
-		final ThrowingRunnable<Exception> disposer = () -> {
-				final boolean allDisposed = sharedResources.release(type, leaseHolder);
-				if (allDisposed) {
-					releaseMemory(type, MemoryType.OFF_HEAP, size);
-				}
-			};
+		final ThrowingRunnable<Exception> disposer = () -> sharedResources.release(type, leaseHolder, releaser);
 
 		return new OpaqueMemoryResource<>(resource.resourceHandle(), size, disposer);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java
index 44eaa02..7ab0ed6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java
@@ -26,6 +26,7 @@ import javax.annotation.concurrent.GuardedBy;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -83,24 +84,34 @@ final class SharedResources {
 	/**
 	 * Releases a lease (identified by the lease holder object) for the given type.
 	 * If no further leases exist, the resource is disposed.
+	 */
+	void release(String type, Object leaseHolder) throws Exception {
+		release(type, leaseHolder, (value) -> {});
+	}
+
+	/**
+	 * Releases a lease (identified by the lease holder object) for the given type.
+	 * If no further leases exist, the resource is disposed.
 	 *
-	 * @return True, if this was the last lease holder and the resource was disposed.
+	 * <p>This method takes an additional hook that is called when the resource is disposed.
 	 */
-	boolean release(String type, Object leaseHolder) throws Exception {
+	void release(String type, Object leaseHolder, Consumer<Long> releaser) throws Exception {
 		lock.lock();
 		try {
-			final LeasedResource resource = reservedResources.get(type);
+			final LeasedResource<?> resource = reservedResources.get(type);
 			if (resource == null) {
-				return false;
+				return;
 			}
 
 			if (resource.removeLeaseHolder(leaseHolder)) {
-				reservedResources.remove(type);
-				resource.dispose();
-				return true;
+				try {
+					reservedResources.remove(type);
+					resource.dispose();
+				}
+				finally {
+					releaser.accept(resource.size());
+				}
 			}
-
-			return false;
 		}
 		finally {
 			lock.unlock();
@@ -169,9 +180,10 @@ final class SharedResources {
 		}
 
 		void dispose() throws Exception {
-			checkState(!disposed);
-			disposed = true;
-			resourceHandle.close();
+			if (!disposed) {
+				disposed = true;
+				resourceHandle.close();
+			}
 		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
index 5167f2c..9f49fe2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
@@ -117,6 +117,7 @@ public class MemoryManagerSharedResourcesTest {
 		resource1.close();
 
 		assertFalse(resource1.getResourceHandle().closed);
+		assertFalse(memoryManager.verifyEmpty());
 	}
 
 	@Test
@@ -132,6 +133,7 @@ public class MemoryManagerSharedResourcesTest {
 		resource2.close();
 
 		assertTrue(resource1.getResourceHandle().closed);
+		assertTrue(memoryManager.verifyEmpty());
 	}
 
 	@Test
@@ -145,6 +147,7 @@ public class MemoryManagerSharedResourcesTest {
 		resource1.close();
 
 		assertFalse(resource1.getResourceHandle().closed);
+		assertFalse(memoryManager.verifyEmpty());
 	}
 
 	@Test
@@ -160,6 +163,7 @@ public class MemoryManagerSharedResourcesTest {
 
 		assertTrue(resource1.getResourceHandle().closed);
 		assertTrue(resource2.getResourceHandle().closed);
+		assertTrue(memoryManager.verifyEmpty());
 	}
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/SharedResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/SharedResourcesTest.java
index 92557a2..5862b43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/SharedResourcesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/SharedResourcesTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.memory;
 
 import org.junit.Test;
 
+import java.util.function.Consumer;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -100,6 +102,20 @@ public class SharedResourcesTest {
 	}
 
 	@Test
+	public void testLastReleaseCallsReleaseHook() throws Exception {
+		final String type = "theType";
+		final long size = 100;
+		final SharedResources resources = new SharedResources();
+		final Object leaseHolder = new Object();
+		final TestReleaseHook hook = new TestReleaseHook(size);
+
+		resources.getOrAllocateSharedResource(type, leaseHolder, TestResource::new, size);
+		resources.release(type, leaseHolder, hook);
+
+		assertTrue(hook.wasCalled);
+	}
+
+	@Test
 	public void testReleaseNoneExistingLease() throws Exception {
 		final SharedResources resources = new SharedResources();
 
@@ -124,4 +140,23 @@ public class SharedResourcesTest {
 			closed = true;
 		}
 	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestReleaseHook implements Consumer<Long> {
+
+		private final long expectedValue;
+
+		boolean wasCalled;
+
+		TestReleaseHook(long expectedValue) {
+			this.expectedValue = expectedValue;
+		}
+
+		@Override
+		public void accept(Long value) {
+			wasCalled = true;
+			assertEquals(expectedValue, value.longValue());
+		}
+	}
 }


[flink] 02/02: [FLINK-15916][docs] Remove outdated sections for Asnc Snapshots and Network buffers from Tuning Checkpoints and Large State

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4f1d9b0be8fb3d3b678dbf44440bfc637b3a8a00
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Feb 5 11:23:22 2020 +0100

    [FLINK-15916][docs] Remove outdated sections for Asnc Snapshots and Network buffers from Tuning Checkpoints and Large State
---
 docs/ops/state/large_state_tuning.md    | 21 ---------------------
 docs/ops/state/large_state_tuning.zh.md | 21 ---------------------
 2 files changed, 42 deletions(-)

diff --git a/docs/ops/state/large_state_tuning.md b/docs/ops/state/large_state_tuning.md
index ba98338..b2149b6 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -92,27 +92,6 @@ the same time. For applications with large state in Flink, this often ties up to
 When a savepoint is manually triggered, it may be in process concurrently with an ongoing checkpoint.
 
 
-## Tuning Network Buffers
-
-Before Flink 1.3, an increased number of network buffers also caused increased checkpointing times since
-keeping more in-flight data meant that checkpoint barriers got delayed. Since Flink 1.3, the
-number of network buffers used per outgoing/incoming channel is limited and thus network buffers
-may be configured without affecting checkpoint times
-(see [network buffer configuration](../config.html#configuring-the-network-buffers)).
-
-## Asynchronous Checkpointing
-
-When state is *asynchronously* snapshotted, the checkpoints scale better than when the state is *synchronously* snapshotted.
-Especially in more complex streaming applications with multiple joins, co-functions, or windows, this may have a profound
-impact.
-
-For state to be snapshotted asynchronsously, you need to use a state backend which supports asynchronous snapshotting.
-Starting from Flink 1.3, both RocksDB-based as well as heap-based state backends (`filesystem`) support asynchronous
-snapshotting and use it by default. This applies to both managed operator state as well as managed keyed state (incl. timers state).
-
-<span class="label label-info">Note</span> *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state.
-Other state like keyed state is still snapshotted asynchronously. Please note that this is not a regression from previous versions and will be resolved with `FLINK-10026`.*
-
 ## Tuning RocksDB
 
 The state storage workhorse of many large scale Flink streaming applications is the *RocksDB State Backend*.
diff --git a/docs/ops/state/large_state_tuning.zh.md b/docs/ops/state/large_state_tuning.zh.md
index a78dc5a..7faea13 100644
--- a/docs/ops/state/large_state_tuning.zh.md
+++ b/docs/ops/state/large_state_tuning.zh.md
@@ -92,27 +92,6 @@ the same time. For applications with large state in Flink, this often ties up to
 When a savepoint is manually triggered, it may be in process concurrently with an ongoing checkpoint.
 
 
-## Tuning Network Buffers
-
-Before Flink 1.3, an increased number of network buffers also caused increased checkpointing times since
-keeping more in-flight data meant that checkpoint barriers got delayed. Since Flink 1.3, the
-number of network buffers used per outgoing/incoming channel is limited and thus network buffers
-may be configured without affecting checkpoint times
-(see [network buffer configuration](../config.html#configuring-the-network-buffers)).
-
-## Asynchronous Checkpointing
-
-When state is *asynchronously* snapshotted, the checkpoints scale better than when the state is *synchronously* snapshotted.
-Especially in more complex streaming applications with multiple joins, co-functions, or windows, this may have a profound
-impact.
-
-For state to be snapshotted asynchronsously, you need to use a state backend which supports asynchronous snapshotting.
-Starting from Flink 1.3, both RocksDB-based as well as heap-based state backends (`filesystem`) support asynchronous
-snapshotting and use it by default. This applies to both managed operator state as well as managed keyed state (incl. timers state).
-
-<span class="label label-info">Note</span> *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state.
-Other state like keyed state is still snapshotted asynchronously. Please note that this is not a regression from previous versions and will be resolved with `FLINK-10026`.*
-
 ## Tuning RocksDB
 
 The state storage workhorse of many large scale Flink streaming applications is the *RocksDB State Backend*.