You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/19 15:12:09 UTC

[flink] 09/10: [FLINK-17547][task] Use RefCountedFile in SpanningWrapper (todo: merge with next?)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3dacffe35709f9747923dad4c7028baec27e2651
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 7 15:59:26 2020 +0200

    [FLINK-17547][task] Use RefCountedFile in SpanningWrapper (todo: merge
    with next?)
---
 .../io/network/api/serialization/SpanningWrapper.java     | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 18ea6cc..9cffde3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import org.apache.flink.core.fs.RefCountedFile;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -74,7 +75,7 @@ final class SpanningWrapper {
 
 	private int leftOverLimit;
 
-	private File spillFile;
+	private RefCountedFile spillFile;
 
 	private DataInputViewStreamWrapper spillFileReader;
 
@@ -136,7 +137,7 @@ final class SpanningWrapper {
 		accumulatedRecordBytes += length;
 		if (hasFullRecord()) {
 			spillingChannel.close();
-			spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE));
+			spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), FILE_BUFFER_SIZE));
 		}
 	}
 
@@ -220,7 +221,6 @@ final class SpanningWrapper {
 		return accumulatedRecordBytes + (recordLength >= 0 ? LENGTH_BYTES : lengthBuffer.position());
 	}
 
-	@SuppressWarnings("ResultOfMethodCallIgnored")
 	public void clear() {
 		buffer = initialBuffer;
 		serializationReadBuffer.releaseArrays();
@@ -232,7 +232,7 @@ final class SpanningWrapper {
 		leftOverLimit = 0;
 		accumulatedRecordBytes = 0;
 
-		closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.delete());
+		closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.release());
 		spillingChannel = null;
 		spillFileReader = null;
 		spillFile = null;
@@ -260,9 +260,10 @@ final class SpanningWrapper {
 		int maxAttempts = 10;
 		for (int attempt = 0; attempt < maxAttempts; attempt++) {
 			String directory = tempDirs[rnd.nextInt(tempDirs.length)];
-			spillFile = new File(directory, randomString(rnd) + ".inputchannel");
-			if (spillFile.createNewFile()) {
-				return new RandomAccessFile(spillFile, "rw").getChannel();
+			File file = new File(directory, randomString(rnd) + ".inputchannel");
+			if (file.createNewFile()) {
+				spillFile = new RefCountedFile(file);
+				return new RandomAccessFile(file, "rw").getChannel();
 			}
 		}