You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/09 15:13:55 UTC

[4/6] flink git commit: [FLINK-8375][network] Remove unnecessary synchronization

[FLINK-8375][network] Remove unnecessary synchronization

Synchronized blocks in ResultPartition could affect only:
1. totalNumberOfBuffers and totalNumberOfBytes counters
2. subpartition add(), finish() and release() calls.

However:
1. counters were not used anywhere - they are removed by this commit
2a. add(), finish() and release() methods for PipelinedSubpartition were already threads safe
2b. add(), finish() and release() methods for SpillableSubpartition were made thread safe in
this commit, by basically pushing synchronized section down one level.

This closes #5260.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3345aa7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3345aa7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3345aa7e

Branch: refs/heads/master
Commit: 3345aa7e1adb58b3845bb72e9ee6e0ee9d8fe32b
Parents: 718a2ba
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Fri Jan 5 15:28:40 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jan 9 16:11:36 2018 +0100

----------------------------------------------------------------------
 .../io/network/partition/ResultPartition.java   | 42 ++------------------
 .../partition/SpillableSubpartition.java        | 12 ++++--
 .../network/partition/ResultPartitionTest.java  |  2 -
 3 files changed, 12 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3345aa7e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index ea2cca5..01c8bfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -120,14 +120,6 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	private volatile Throwable cause;
 
-	// - Statistics ----------------------------------------------------------
-
-	/** The total number of buffers (both data and event buffers) */
-	private int totalNumberOfBuffers;
-
-	/** The total number of bytes (both data and event buffers) */
-	private long totalNumberOfBytes;
-
 	public ResultPartition(
 		String owningTaskName,
 		TaskActions taskActions, // actions on the owning task
@@ -224,24 +216,6 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 		return bufferPool;
 	}
 
-	/**
-	 * Returns the total number of processed network buffers since initialization.
-	 *
-	 * @return overall number of processed network buffers
-	 */
-	public int getTotalNumberOfBuffers() {
-		return totalNumberOfBuffers;
-	}
-
-	/**
-	 * Returns the total size of processed network buffers since initialization.
-	 *
-	 * @return overall size of processed network buffers
-	 */
-	public long getTotalNumberOfBytes() {
-		return totalNumberOfBytes;
-	}
-
 	public int getNumberOfQueuedBuffers() {
 		int totalBuffers = 0;
 
@@ -275,13 +249,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 			// retain for buffer use after add() but also to have a simple path for recycle()
 			buffer.retain();
-			synchronized (subpartition) {
-				success = subpartition.add(buffer);
-
-				// Update statistics
-				totalNumberOfBuffers++;
-				totalNumberOfBytes += buffer.getSize();
-			}
+			success = subpartition.add(buffer);
 		} finally {
 			if (success) {
 				notifyPipelinedConsumers();
@@ -304,9 +272,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 			checkInProduceState();
 
 			for (ResultSubpartition subpartition : subpartitions) {
-				synchronized (subpartition) {
-					subpartition.finish();
-				}
+				subpartition.finish();
 			}
 
 			success = true;
@@ -339,9 +305,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 			// Release all subpartitions
 			for (ResultSubpartition subpartition : subpartitions) {
 				try {
-					synchronized (subpartition) {
-						subpartition.release();
-					}
+					subpartition.release();
 				}
 				// Catch this in order to ensure that release is called on all subpartitions
 				catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3345aa7e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index e977f60..9047e51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -59,6 +59,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}, and
  * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, most spillable partitions
  * will be spilled for real-world data sets.
+ *
+ * <p>Note on thread safety. Synchronizing on {@code buffers} is used to synchronize
+ * writes and reads. Synchronizing on {@code this} is used against concurrent
+ * {@link #add(Buffer)} and clean ups {@link #release()} / {@link #finish()} which
+ * also are touching {@code spillWriter}. Since we do not want to block reads during
+ * spilling, we need those two synchronization. Probably this model could be simplified.
  */
 class SpillableSubpartition extends ResultSubpartition {
 
@@ -93,7 +99,7 @@ class SpillableSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public boolean add(Buffer buffer) throws IOException {
+	public synchronized boolean add(Buffer buffer) throws IOException {
 		checkNotNull(buffer);
 
 		synchronized (buffers) {
@@ -131,7 +137,7 @@ class SpillableSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public void finish() throws IOException {
+	public synchronized void finish() throws IOException {
 		synchronized (buffers) {
 			if (add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE))) {
 				isFinished = true;
@@ -145,7 +151,7 @@ class SpillableSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public void release() throws IOException {
+	public synchronized void release() throws IOException {
 		final ResultSubpartitionView view;
 
 		synchronized (buffers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3345aa7e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 5d24b4a..4512625 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -192,8 +192,6 @@ public class ResultPartitionTest {
 
 		partition.writeBufferToAllSubpartitions(buffer);
 
-		// Verify added to all queues, i.e. two buffers in total
-		assertEquals(2, partition.getTotalNumberOfBuffers());
 		// release the buffers in the partition
 		partition.release();