You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 19:32:49 UTC

[flink] 04/11: [hotfix][network] minor optimisations and clarifications around BufferBuilder and BufferConsumer

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51f53d8aeb7b9b4db8f3f4dde0c0f6ab6b626d2f
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:24:37 2018 +0200

    [hotfix][network] minor optimisations and clarifications around BufferBuilder and BufferConsumer
---
 .../runtime/io/network/buffer/BufferBuilder.java   | 25 +++++++++++-----------
 .../runtime/io/network/buffer/BufferConsumer.java  | 23 ++++++++++++++------
 2 files changed, 28 insertions(+), 20 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 305f184..6fb067e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -104,9 +104,9 @@ public class BufferBuilder {
 	 * @return number of written bytes.
 	 */
 	public int finish() {
-		positionMarker.markFinished();
+		int writtenBytes = positionMarker.markFinished();
 		commit();
-		return getWrittenBytes();
+		return writtenBytes;
 	}
 
 	public boolean isFinished() {
@@ -118,18 +118,10 @@ public class BufferBuilder {
 		return positionMarker.getCached() == getMaxCapacity();
 	}
 
-	public boolean isEmpty() {
-		return positionMarker.getCached() == 0;
-	}
-
 	public int getMaxCapacity() {
 		return memorySegment.size();
 	}
 
-	private int getWrittenBytes() {
-		return positionMarker.getCached();
-	}
-
 	/**
 	 * Holds a reference to the current writer position. Negative values indicate that writer ({@link BufferBuilder}
 	 * has finished. Value {@code Integer.MIN_VALUE} represents finished empty buffer.
@@ -156,7 +148,7 @@ public class BufferBuilder {
 	 * Cached writing implementation of {@link PositionMarker}.
 	 *
 	 * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently
-	 * of one another - for example the cached values can not accidentally leak from one to another.
+	 * of one another - so that the cached values can not accidentally leak from one to another.
 	 *
 	 * <p>Remember to commit the {@link SettablePositionMarker} to make the changes visible.
 	 */
@@ -181,12 +173,19 @@ public class BufferBuilder {
 			return PositionMarker.getAbsolute(cachedPosition);
 		}
 
-		public void markFinished() {
-			int newValue = -getCached();
+		/**
+		 * Marks this position as finished and returns the current position.
+		 *
+		 * @return current position as of {@link #getCached()}
+		 */
+		public int markFinished() {
+			int currentPosition = getCached();
+			int newValue = -currentPosition;
 			if (newValue == 0) {
 				newValue = FINISHED_EMPTY;
 			}
 			set(newValue);
+			return currentPosition;
 		}
 
 		public void move(int offset) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index f368ff0..3117fe5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -42,7 +42,7 @@ public class BufferConsumer implements Closeable {
 
 	private final CachedPositionMarker writerPosition;
 
-	private int currentReaderPosition = 0;
+	private int currentReaderPosition;
 
 	/**
 	 * Constructs {@link BufferConsumer} instance with content that can be changed by {@link BufferBuilder}.
@@ -74,6 +74,14 @@ public class BufferConsumer implements Closeable {
 		this.currentReaderPosition = currentReaderPosition;
 	}
 
+	/**
+	 * Checks whether the {@link BufferBuilder} has already been finished.
+	 *
+	 * <p>BEWARE: this method accesses the cached value of the position marker which is only updated
+	 * after calls to {@link #build()}!
+	 *
+	 * @return <tt>true</tt> if the buffer was finished, <tt>false</tt> otherwise
+	 */
 	public boolean isFinished() {
 		return writerPosition.isFinished();
 	}
@@ -84,16 +92,17 @@ public class BufferConsumer implements Closeable {
 	 */
 	public Buffer build() {
 		writerPosition.update();
-		Buffer slice = buffer.readOnlySlice(currentReaderPosition, writerPosition.getCached() - currentReaderPosition);
-		currentReaderPosition = writerPosition.getCached();
+		int cachedWriterPosition = writerPosition.getCached();
+		Buffer slice = buffer.readOnlySlice(currentReaderPosition, cachedWriterPosition - currentReaderPosition);
+		currentReaderPosition = cachedWriterPosition;
 		return slice.retainBuffer();
 	}
 
 	/**
 	 * Returns a retained copy with separate indexes. This allows to read from the same {@link MemorySegment} twice.
 	 *
-	 * <p>WARNING: newly returned {@link BufferConsumer} will have reader index copied from the original buffer. In
-	 * other words, data already consumed before copying will not be visible to the returned copies.
+	 * <p>WARNING: the newly returned {@link BufferConsumer} will have its reader index copied from the original buffer.
+	 * In other words, data already consumed before copying will not be visible to the returned copies.
 	 *
 	 * @return a retained copy of self with separate indexes
 	 */
@@ -124,7 +133,7 @@ public class BufferConsumer implements Closeable {
 	 * Cached reading wrapper around {@link PositionMarker}.
 	 *
 	 * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently
-	 * of one another - for example the cached values can not accidentally leak from one to another.
+	 * of one another - so that the cached values can not accidentally leak from one to another.
 	 */
 	private static class CachedPositionMarker {
 		private final PositionMarker positionMarker;
@@ -134,7 +143,7 @@ public class BufferConsumer implements Closeable {
 		 */
 		private int cachedPosition;
 
-		public CachedPositionMarker(PositionMarker positionMarker) {
+		CachedPositionMarker(PositionMarker positionMarker) {
 			this.positionMarker = checkNotNull(positionMarker);
 			update();
 		}