You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2020/09/24 16:05:12 UTC

[flink] 01/07: [FLINK-19320][task] Remove RecordWriter#clearBuffers

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 82f524baec6b7b5ce9ce4a4940ece71b6e2da1e2
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Tue Sep 1 10:56:53 2020 +0800

    [FLINK-19320][task] Remove RecordWriter#clearBuffers
    
    Previously, RecordWriter#clearBuffers was used to recycle the partially filled buffer in the serializer. However, currently the serializer does not contain any network buffer any more. The method now is used to finish the current BufferBuilders and only some tests and BatchTask use it. Actually, these usage should be replaced by RecordWriter#close which dose the same thing. So this patch removes RecordWriter#clearBuffers and the corresponding test cases.
---
 .../network/api/writer/BroadcastRecordWriter.java  | 11 +--
 .../api/writer/ChannelSelectorRecordWriter.java    | 17 ++---
 .../io/network/api/writer/RecordWriter.java        | 11 +--
 .../apache/flink/runtime/operators/BatchTask.java  |  2 +-
 .../operators/shipping/OutputCollector.java        |  2 +-
 .../io/network/api/writer/RecordWriterTest.java    | 86 ----------------------
 .../SlotCountExceedingParallelismTest.java         |  2 +-
 .../scheduler/ScheduleOrUpdateConsumersTest.java   |  2 +-
 .../jobmaster/TestingAbstractInvokables.java       |  2 +-
 .../apache/flink/runtime/jobmanager/Tasks.scala    |  4 +-
 .../flink/test/runtime/FileBufferReaderITCase.java | 17 +++--
 .../test/runtime/NetworkStackThroughputITCase.java |  4 +-
 .../test/runtime/ShuffleCompressionITCase.java     |  2 +-
 .../PipelinedRegionSchedulingITCase.java           |  2 +-
 14 files changed, 31 insertions(+), 133 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index 132fefa..b834738 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -163,16 +163,7 @@ public final class BroadcastRecordWriter<T extends IOReadableWritable> extends R
 	}
 
 	@Override
-	public void closeBufferBuilder(int targetChannel) {
-		closeBufferBuilder();
-	}
-
-	@Override
-	public void clearBuffers() {
-		closeBufferBuilder();
-	}
-
-	private void closeBufferBuilder() {
+	public void closeBufferBuilders() {
 		if (bufferBuilder != null) {
 			bufferBuilder.finish();
 			bufferBuilder = null;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
index 2e14988..5e32056 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
@@ -116,17 +116,12 @@ public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> ext
 	}
 
 	@Override
-	public void closeBufferBuilder(int targetChannel) {
-		if (bufferBuilders[targetChannel] != null) {
-			bufferBuilders[targetChannel].finish();
-			bufferBuilders[targetChannel] = null;
-		}
-	}
-
-	@Override
-	public void clearBuffers() {
-		for (int index = 0; index < numberOfChannels; index++) {
-			closeBufferBuilder(index);
+	public void closeBufferBuilders() {
+		for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
+			if (bufferBuilders[targetChannel] != null) {
+				bufferBuilders[targetChannel].finish();
+				bufferBuilders[targetChannel] = null;
+			}
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 2c34993..d5d47c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -236,20 +236,15 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 	abstract void emptyCurrentBufferBuilder(int targetChannel);
 
 	/**
-	 * Marks the current {@link BufferBuilder} as finished and releases the resources for the target channel.
+	 * Marks the current {@link BufferBuilder}s as finished and releases the resources.
 	 */
-	abstract void closeBufferBuilder(int targetChannel);
-
-	/**
-	 * Closes the {@link BufferBuilder}s for all the channels.
-	 */
-	public abstract void clearBuffers();
+	abstract void closeBufferBuilders();
 
 	/**
 	 * Closes the writer. This stops the flushing thread (if there is one).
 	 */
 	public void close() {
-		clearBuffers();
+		closeBufferBuilders();
 		// make sure we terminate the thread in any case
 		if (outputFlusher != null) {
 			outputFlusher.terminate();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index b336fcf..f43359c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -1484,7 +1484,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 
 	public static void clearWriters(List<RecordWriter<?>> writers) {
 		for (RecordWriter<?> writer : writers) {
-			writer.clearBuffers();
+			writer.close();
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
index 7fc8942..d8cd9ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
@@ -81,7 +81,7 @@ public class OutputCollector<T> implements Collector<T> {
 	@Override
 	public void close() {
 		for (RecordWriter<?> writer : writers) {
-			writer.clearBuffers();
+			writer.close();
 			writer.flushAll();
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index bf939c4..6cad1d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -79,11 +79,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Random;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
@@ -92,7 +88,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.spy;
 
 /**
  * Tests for the {@link RecordWriter}.
@@ -117,87 +112,6 @@ public class RecordWriterTest {
 	// ---------------------------------------------------------------------------------------------
 
 	/**
-	 * Tests a fix for FLINK-2089.
-	 *
-	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-2089">FLINK-2089</a>
-	 */
-	@Test
-	public void testClearBuffersAfterInterruptDuringBlockingBufferRequest() throws Exception {
-		ExecutorService executor = null;
-
-		try {
-			executor = Executors.newSingleThreadExecutor();
-
-			TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(1);
-
-			KeepingPartitionWriter partitionWriter = new KeepingPartitionWriter(bufferProvider);
-
-			final RecordWriter<IntValue> recordWriter = createRecordWriter(partitionWriter);
-
-			CountDownLatch waitLock = new CountDownLatch(1);
-			Future<?> result = executor.submit(new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					IntValue val = new IntValue(0);
-
-					try {
-						recordWriter.emit(val);
-						recordWriter.flushAll();
-						waitLock.countDown();
-						recordWriter.emit(val);
-					}
-					catch (InterruptedException e) {
-						recordWriter.clearBuffers();
-					}
-
-					return null;
-				}
-			});
-
-			waitLock.await();
-
-			// Interrupt the Thread.
-			//
-			// The second emit call requests a new buffer and blocks the thread.
-			// When interrupting the thread at this point, clearing the buffers
-			// should not recycle any buffer.
-			result.cancel(true);
-
-			recordWriter.clearBuffers();
-
-			// Verify that the written out buffer has only been recycled once
-			// (by the partition writer), so no buffer recycled.
-			assertEquals(0, bufferProvider.getNumberOfAvailableBuffers());
-
-			partitionWriter.close();
-			assertEquals(1, bufferProvider.getNumberOfAvailableBuffers());
-		}
-		finally {
-			if (executor != null) {
-				executor.shutdown();
-			}
-		}
-	}
-
-	@Test
-	public void testSerializerClearedAfterClearBuffers() throws Exception {
-		ResultPartitionWriter partitionWriter =
-			spy(new RecyclingPartitionWriter(new TestPooledBufferProvider(1, 16)));
-
-		RecordWriter<IntValue> recordWriter = createRecordWriter(partitionWriter);
-
-		// Fill a buffer, but don't write it out.
-		recordWriter.emit(new IntValue(0));
-
-		// Clear all buffers.
-		recordWriter.clearBuffers();
-
-		// This should not throw an Exception iff the serializer state
-		// has been cleared as expected.
-		recordWriter.flushAll();
-	}
-
-	/**
 	 * Tests broadcasting events when no records have been emitted yet.
 	 */
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 138d5fd..3f80ea1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -147,7 +147,7 @@ public class SlotCountExceedingParallelismTest extends TestLogger {
 				writer.flushAll();
 			}
 			finally {
-				writer.clearBuffers();
+				writer.close();
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index f267e5f..ece1c80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -166,7 +166,7 @@ public class ScheduleOrUpdateConsumersTest extends TestLogger {
 					writer.flushAll();
 				}
 				finally {
-					writer.clearBuffers();
+					writer.close();
 				}
 			}
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
index 23e5dce..788e26c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
@@ -54,7 +54,7 @@ public class TestingAbstractInvokables {
 				writer.emit(new IntValue(1337));
 				writer.flushAll();
 			} finally {
-				writer.clearBuffers();
+				writer.close();
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 90ee282..84f166b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import org.apache.flink.runtime.execution.Environment
 import org.apache.flink.runtime.io.network.api.reader.RecordReader
-import org.apache.flink.runtime.io.network.api.writer.{RecordWriter, RecordWriterBuilder}
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
 import org.apache.flink.types.IntValue
 
@@ -51,7 +51,7 @@ object Tasks {
 
         writer.flushAll()
       } finally {
-        writer.clearBuffers()
+        writer.close()
       }
     }
   }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
index f6d6ad9..eae3e0e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
@@ -71,7 +71,13 @@ public class FileBufferReaderITCase extends TestLogger {
 
 	private static final int numRecords = 100_000;
 
-	private static final byte[] dataSource = new byte[1024];
+	private static final int bufferSize = 4096;
+
+	private static final int headerSize = 8;
+
+	private static final int recordSize = bufferSize - headerSize;
+
+	private static final byte[] dataSource = new byte[recordSize];
 
 	@BeforeClass
 	public static void setup() {
@@ -87,6 +93,7 @@ public class FileBufferReaderITCase extends TestLogger {
 		configuration.setString(RestOptions.BIND_PORT, "0");
 		configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file");
 		configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("1g"));
+		configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(bufferSize + "b"));
 
 		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
 			.setConfiguration(configuration)
@@ -153,12 +160,8 @@ public class FileBufferReaderITCase extends TestLogger {
 			final ByteArrayType bytes = new ByteArrayType(dataSource);
 			int counter = 0;
 			while (counter++ < numRecords) {
-				try {
-					writer.emit(bytes);
-					writer.flushAll();
-				} finally {
-					writer.clearBuffers();
-				}
+				writer.emit(bytes);
+				writer.flushAll();
 			}
 		}
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 479ae57..721ecda 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -105,7 +105,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
 				}
 			}
 			finally {
-				writer.clearBuffers();
+				writer.close();
 				writer.flushAll();
 			}
 		}
@@ -139,7 +139,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
 			}
 			finally {
 				reader.clearBuffers();
-				writer.clearBuffers();
+				writer.close();
 				writer.flushAll();
 			}
 		}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
index 3f7c85d..c483eba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
@@ -167,7 +167,7 @@ public class ShuffleCompressionITCase {
 				writer.broadcastEmit(RECORD_TO_SEND);
 			}
 			writer.flushAll();
-			writer.clearBuffers();
+			writer.close();
 		}
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
index df2c1ec..cfd03ca 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
@@ -163,7 +163,7 @@ public class PipelinedRegionSchedulingITCase extends TestLogger {
 				writer.emit(new IntValue(42));
 				writer.flushAll();
 			} finally {
-				writer.clearBuffers();
+				writer.close();
 			}
 
 			if (getIndexInSubtaskGroup() == 0) {