You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 10:28:21 UTC

[flink] 04/09: [hotfix][network] minor optimisations and clarifications around BufferBuilder and BufferConsumer

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit adaab8ee68e37c091c4b4ed29dadbf296c4fefd9
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:24:37 2018 +0200

    [hotfix][network] minor optimisations and clarifications around BufferBuilder and BufferConsumer
---
 .../runtime/io/network/buffer/BufferBuilder.java   | 25 ++++++++++----------
 .../runtime/io/network/buffer/BufferConsumer.java  | 27 +++++++++++++++-------
 2 files changed, 31 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 305f184..6fb067e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -104,9 +104,9 @@ public class BufferBuilder {
 	 * @return number of written bytes.
 	 */
 	public int finish() {
-		positionMarker.markFinished();
+		int writtenBytes = positionMarker.markFinished();
 		commit();
-		return getWrittenBytes();
+		return writtenBytes;
 	}
 
 	public boolean isFinished() {
@@ -118,18 +118,10 @@ public class BufferBuilder {
 		return positionMarker.getCached() == getMaxCapacity();
 	}
 
-	public boolean isEmpty() {
-		return positionMarker.getCached() == 0;
-	}
-
 	public int getMaxCapacity() {
 		return memorySegment.size();
 	}
 
-	private int getWrittenBytes() {
-		return positionMarker.getCached();
-	}
-
 	/**
 	 * Holds a reference to the current writer position. Negative values indicate that writer ({@link BufferBuilder}
 	 * has finished. Value {@code Integer.MIN_VALUE} represents finished empty buffer.
@@ -156,7 +148,7 @@ public class BufferBuilder {
 	 * Cached writing implementation of {@link PositionMarker}.
 	 *
 	 * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently
-	 * of one another - for example the cached values can not accidentally leak from one to another.
+	 * of one another - so that the cached values can not accidentally leak from one to another.
 	 *
 	 * <p>Remember to commit the {@link SettablePositionMarker} to make the changes visible.
 	 */
@@ -181,12 +173,19 @@ public class BufferBuilder {
 			return PositionMarker.getAbsolute(cachedPosition);
 		}
 
-		public void markFinished() {
-			int newValue = -getCached();
+		/**
+		 * Marks this position as finished and returns the current position.
+		 *
+		 * @return current position as of {@link #getCached()}
+		 */
+		public int markFinished() {
+			int currentPosition = getCached();
+			int newValue = -currentPosition;
 			if (newValue == 0) {
 				newValue = FINISHED_EMPTY;
 			}
 			set(newValue);
+			return currentPosition;
 		}
 
 		public void move(int offset) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index 4bad92f..abde3ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -42,7 +42,7 @@ public class BufferConsumer implements Closeable {
 
 	private final CachedPositionMarker writerPosition;
 
-	private int currentReaderPosition = 0;
+	private int currentReaderPosition;
 
 	/**
 	 * Constructs {@link BufferConsumer} instance with content that can be changed by {@link BufferBuilder}.
@@ -74,6 +74,14 @@ public class BufferConsumer implements Closeable {
 		this.currentReaderPosition = currentReaderPosition;
 	}
 
+	/**
+	 * Checks whether the {@link BufferBuilder} has already been finished.
+	 *
+	 * <p>BEWARE: this method accesses the cached value of the position marker which is only updated
+	 * after calls to {@link #build()}!
+	 *
+	 * @return <tt>true</tt> if the buffer was finished, <tt>false</tt> otherwise
+	 */
 	public boolean isFinished() {
 		return writerPosition.isFinished();
 	}
@@ -84,17 +92,20 @@ public class BufferConsumer implements Closeable {
 	 */
 	public Buffer build() {
 		writerPosition.update();
-		Buffer slice = buffer.readOnlySlice(currentReaderPosition, writerPosition.getCached() - currentReaderPosition);
-		currentReaderPosition = writerPosition.getCached();
+		int cachedWriterPosition = writerPosition.getCached();
+		Buffer slice = buffer.readOnlySlice(currentReaderPosition, cachedWriterPosition - currentReaderPosition);
+		currentReaderPosition = cachedWriterPosition;
 		return slice.retainBuffer();
 	}
 
 	/**
-	 * @return a retained copy of self with separate indexes - it allows two read from the same {@link MemorySegment}
+	 * Creates a retained copy of self with separate indexes which allows two read from the same {@link MemorySegment}
 	 * twice.
 	 *
-	 * <p>WARNING: newly returned {@link BufferConsumer} will have reader index copied from the original buffer. In
-	 * other words, data already consumed before copying will not be visible to the returned copies.
+	 * <p>WARNING: the newly returned {@link BufferConsumer} will have its reader index copied from the original buffer.
+	 * In other words, data already consumed before copying will not be visible to the returned copies.
+	 *
+	 * @return a retained copy of self with separate indexes
 	 */
 	public BufferConsumer copy() {
 		return new BufferConsumer(buffer.retainBuffer(), writerPosition.positionMarker, currentReaderPosition);
@@ -123,7 +134,7 @@ public class BufferConsumer implements Closeable {
 	 * Cached reading wrapper around {@link PositionMarker}.
 	 *
 	 * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently
-	 * of one another - for example the cached values can not accidentally leak from one to another.
+	 * of one another - so that the cached values can not accidentally leak from one to another.
 	 */
 	private static class CachedPositionMarker {
 		private final PositionMarker positionMarker;
@@ -133,7 +144,7 @@ public class BufferConsumer implements Closeable {
 		 */
 		private int cachedPosition;
 
-		public CachedPositionMarker(PositionMarker positionMarker) {
+		CachedPositionMarker(PositionMarker positionMarker) {
 			this.positionMarker = checkNotNull(positionMarker);
 			update();
 		}