You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/09 16:13:18 UTC

[GitHub] NicoK closed pull request #6788: [FLINK-10469][core] make sure to always write the whole buffer to FileChannel

NicoK closed pull request #6788: [FLINK-10469][core] make sure to always write the whole buffer to FileChannel
URL: https://github.com/apache/flink/pull/6788
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 23af2e8cf84..8f322626116 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -28,6 +28,8 @@
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
@@ -56,6 +58,14 @@
 
 	// ------------------------------------------------------------------------
 
+	public static void writeCompletely(WritableByteChannel channel, ByteBuffer src) throws IOException {
+		while (src.hasRemaining()) {
+			channel.write(src);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Constructs a random filename with the given prefix and
 	 * a random part generated from hex characters.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index 0e575d3021a..ddb0c4e839d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -21,6 +21,7 @@
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.event.NotificationListener;
+import org.apache.flink.util.FileUtils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -341,7 +342,7 @@ protected SegmentWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequ
 	@Override
 	public void write() throws IOException {
 		try {
-			this.channel.fileChannel.write(this.segment.wrap(0, this.segment.size()));
+			FileUtils.writeCompletely(this.channel.fileChannel, this.segment.wrap(0, this.segment.size()));
 		}
 		catch (NullPointerException npex) {
 			throw new IOException("Memory segment has been released.");
@@ -375,8 +376,8 @@ public void write() throws IOException {
 		header.putInt(nioBufferReadable.remaining());
 		header.flip();
 
-		channel.fileChannel.write(header);
-		channel.fileChannel.write(nioBufferReadable);
+		FileUtils.writeCompletely(channel.fileChannel, header);
+		FileUtils.writeCompletely(channel.fileChannel, nioBufferReadable);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 8630acee9a8..a78cb4dd708 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -24,6 +24,7 @@
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;
@@ -481,7 +482,7 @@ private void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRec
 				this.spillingChannel = createSpillingChannel();
 
 				ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
-				this.spillingChannel.write(toWrite);
+				FileUtils.writeCompletely(this.spillingChannel, toWrite);
 			}
 			else {
 				// collect in memory
@@ -528,7 +529,7 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in
 			if (spillingChannel != null) {
 				// spill to file
 				ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
-				this.spillingChannel.write(toWrite);
+				FileUtils.writeCompletely(this.spillingChannel, toWrite);
 			} else {
 				segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy);
 			}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 4b690d1f750..ae95408af44 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -27,6 +27,7 @@
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
 
 import java.io.File;
@@ -75,9 +76,6 @@
 	/** The buffer that encodes the spilled header. */
 	private final ByteBuffer headBuffer;
 
-	/** The reusable array that holds header and contents buffers. */
-	private final ByteBuffer[] sources;
-
 	/** The file that we currently spill to. */
 	private File currentSpillFile;
 
@@ -109,8 +107,6 @@ public BufferSpiller(IOManager ioManager, int pageSize) throws IOException {
 		this.headBuffer = ByteBuffer.allocateDirect(16);
 		this.headBuffer.order(ByteOrder.LITTLE_ENDIAN);
 
-		this.sources = new ByteBuffer[] { this.headBuffer, null };
-
 		File[] tempDirs = ioManager.getSpillingDirectories();
 		this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length];
 
@@ -148,8 +144,8 @@ public void add(BufferOrEvent boe) throws IOException {
 
 			bytesWritten += (headBuffer.remaining() + contents.remaining());
 
-			sources[1] = contents;
-			currentChannel.write(sources);
+			FileUtils.writeCompletely(currentChannel, headBuffer);
+			FileUtils.writeCompletely(currentChannel, contents);
 		}
 		finally {
 			if (boe.isBuffer()) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
index adbe2405184..c1ff79faf32 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
@@ -23,6 +23,7 @@
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.streaming.runtime.io.BufferSpiller.SpilledBufferOrEventSequence;
+import org.apache.flink.util.FileUtils;
 
 import org.junit.After;
 import org.junit.Before;
@@ -107,7 +108,7 @@ public void testIncompleteHeaderOnFirstElement() {
 			ByteBuffer buf = ByteBuffer.allocate(7);
 			buf.order(ByteOrder.LITTLE_ENDIAN);
 
-			fileChannel.write(buf);
+			FileUtils.writeCompletely(fileChannel, buf);
 			fileChannel.position(0);
 
 			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
@@ -175,7 +176,7 @@ public void testBufferSequenceWithIncompleteBuffer() {
 			data.put((byte) 0);
 			data.position(0);
 			data.limit(312);
-			fileChannel.write(data);
+			FileUtils.writeCompletely(fileChannel, data);
 			fileChannel.position(0L);
 
 			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
@@ -414,7 +415,7 @@ public void testCleanup() {
 			ByteBuffer data = ByteBuffer.allocate(157);
 			data.order(ByteOrder.LITTLE_ENDIAN);
 
-			fileChannel.write(data);
+			FileUtils.writeCompletely(fileChannel, data);
 			fileChannel.position(54);
 
 			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
@@ -451,8 +452,8 @@ private static BufferOrEvent generateAndWriteEvent(FileChannel fileChannel, Rand
 		header.put((byte) 1);
 		header.flip();
 
-		fileChannel.write(header);
-		fileChannel.write(serializedEvent);
+		FileUtils.writeCompletely(fileChannel, header);
+		FileUtils.writeCompletely(fileChannel, serializedEvent);
 		return new BufferOrEvent(evt, channelIndex);
 	}
 
@@ -467,7 +468,7 @@ private static void writeBuffer(FileChannel fileChannel, int size, int channelIn
 			data.put((byte) i);
 		}
 		data.flip();
-		fileChannel.write(data);
+		FileUtils.writeCompletely(fileChannel, data);
 	}
 
 	private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services