You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/19 15:12:09 UTC
[flink] 09/10: [FLINK-17547][task] Use RefCountedFile in
SpanningWrapper (todo: merge with next?)
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3dacffe35709f9747923dad4c7028baec27e2651
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 7 15:59:26 2020 +0200
[FLINK-17547][task] Use RefCountedFile in SpanningWrapper (todo: merge
with next?)
---
.../io/network/api/serialization/SpanningWrapper.java | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 18ea6cc..9cffde3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.io.network.api.serialization;
+import org.apache.flink.core.fs.RefCountedFile;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -74,7 +75,7 @@ final class SpanningWrapper {
private int leftOverLimit;
- private File spillFile;
+ private RefCountedFile spillFile;
private DataInputViewStreamWrapper spillFileReader;
@@ -136,7 +137,7 @@ final class SpanningWrapper {
accumulatedRecordBytes += length;
if (hasFullRecord()) {
spillingChannel.close();
- spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE));
+ spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), FILE_BUFFER_SIZE));
}
}
@@ -220,7 +221,6 @@ final class SpanningWrapper {
return accumulatedRecordBytes + (recordLength >= 0 ? LENGTH_BYTES : lengthBuffer.position());
}
- @SuppressWarnings("ResultOfMethodCallIgnored")
public void clear() {
buffer = initialBuffer;
serializationReadBuffer.releaseArrays();
@@ -232,7 +232,7 @@ final class SpanningWrapper {
leftOverLimit = 0;
accumulatedRecordBytes = 0;
- closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.delete());
+ closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.release());
spillingChannel = null;
spillFileReader = null;
spillFile = null;
@@ -260,9 +260,10 @@ final class SpanningWrapper {
int maxAttempts = 10;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
String directory = tempDirs[rnd.nextInt(tempDirs.length)];
- spillFile = new File(directory, randomString(rnd) + ".inputchannel");
- if (spillFile.createNewFile()) {
- return new RandomAccessFile(spillFile, "rw").getChannel();
+ File file = new File(directory, randomString(rnd) + ".inputchannel");
+ if (file.createNewFile()) {
+ spillFile = new RefCountedFile(file);
+ return new RandomAccessFile(file, "rw").getChannel();
}
}