You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 19:32:49 UTC
[flink] 04/11: [hotfix][network] minor optimisations and
clarifications around BufferBuilder and BufferConsumer
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 51f53d8aeb7b9b4db8f3f4dde0c0f6ab6b626d2f
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:24:37 2018 +0200
[hotfix][network] minor optimisations and clarifications around BufferBuilder and BufferConsumer
---
.../runtime/io/network/buffer/BufferBuilder.java | 25 +++++++++++-----------
.../runtime/io/network/buffer/BufferConsumer.java | 23 ++++++++++++++------
2 files changed, 28 insertions(+), 20 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 305f184..6fb067e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -104,9 +104,9 @@ public class BufferBuilder {
* @return number of written bytes.
*/
public int finish() {
- positionMarker.markFinished();
+ int writtenBytes = positionMarker.markFinished();
commit();
- return getWrittenBytes();
+ return writtenBytes;
}
public boolean isFinished() {
@@ -118,18 +118,10 @@ public class BufferBuilder {
return positionMarker.getCached() == getMaxCapacity();
}
- public boolean isEmpty() {
- return positionMarker.getCached() == 0;
- }
-
public int getMaxCapacity() {
return memorySegment.size();
}
- private int getWrittenBytes() {
- return positionMarker.getCached();
- }
-
/**
* Holds a reference to the current writer position. Negative values indicate that writer ({@link BufferBuilder}
* has finished. Value {@code Integer.MIN_VALUE} represents finished empty buffer.
@@ -156,7 +148,7 @@ public class BufferBuilder {
* Cached writing implementation of {@link PositionMarker}.
*
* <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently
- * of one another - for example the cached values can not accidentally leak from one to another.
+ * of one another - so that the cached values can not accidentally leak from one to another.
*
* <p>Remember to commit the {@link SettablePositionMarker} to make the changes visible.
*/
@@ -181,12 +173,19 @@ public class BufferBuilder {
return PositionMarker.getAbsolute(cachedPosition);
}
- public void markFinished() {
- int newValue = -getCached();
+ /**
+ * Marks this position as finished and returns the current position.
+ *
+ * @return current position as of {@link #getCached()}
+ */
+ public int markFinished() {
+ int currentPosition = getCached();
+ int newValue = -currentPosition;
if (newValue == 0) {
newValue = FINISHED_EMPTY;
}
set(newValue);
+ return currentPosition;
}
public void move(int offset) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index f368ff0..3117fe5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -42,7 +42,7 @@ public class BufferConsumer implements Closeable {
private final CachedPositionMarker writerPosition;
- private int currentReaderPosition = 0;
+ private int currentReaderPosition;
/**
* Constructs {@link BufferConsumer} instance with content that can be changed by {@link BufferBuilder}.
@@ -74,6 +74,14 @@ public class BufferConsumer implements Closeable {
this.currentReaderPosition = currentReaderPosition;
}
+ /**
+ * Checks whether the {@link BufferBuilder} has already been finished.
+ *
+ * <p>BEWARE: this method accesses the cached value of the position marker which is only updated
+ * after calls to {@link #build()}!
+ *
+ * @return <tt>true</tt> if the buffer was finished, <tt>false</tt> otherwise
+ */
public boolean isFinished() {
return writerPosition.isFinished();
}
@@ -84,16 +92,17 @@ public class BufferConsumer implements Closeable {
*/
public Buffer build() {
writerPosition.update();
- Buffer slice = buffer.readOnlySlice(currentReaderPosition, writerPosition.getCached() - currentReaderPosition);
- currentReaderPosition = writerPosition.getCached();
+ int cachedWriterPosition = writerPosition.getCached();
+ Buffer slice = buffer.readOnlySlice(currentReaderPosition, cachedWriterPosition - currentReaderPosition);
+ currentReaderPosition = cachedWriterPosition;
return slice.retainBuffer();
}
/**
* Returns a retained copy with separate indexes. This allows to read from the same {@link MemorySegment} twice.
*
- * <p>WARNING: newly returned {@link BufferConsumer} will have reader index copied from the original buffer. In
- * other words, data already consumed before copying will not be visible to the returned copies.
+ * <p>WARNING: the newly returned {@link BufferConsumer} will have its reader index copied from the original buffer.
+ * In other words, data already consumed before copying will not be visible to the returned copies.
*
* @return a retained copy of self with separate indexes
*/
@@ -124,7 +133,7 @@ public class BufferConsumer implements Closeable {
* Cached reading wrapper around {@link PositionMarker}.
*
* <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently
- * of one another - for example the cached values can not accidentally leak from one to another.
+ * of one another - so that the cached values can not accidentally leak from one to another.
*/
private static class CachedPositionMarker {
private final PositionMarker positionMarker;
@@ -134,7 +143,7 @@ public class BufferConsumer implements Closeable {
*/
private int cachedPosition;
- public CachedPositionMarker(PositionMarker positionMarker) {
+ CachedPositionMarker(PositionMarker positionMarker) {
this.positionMarker = checkNotNull(positionMarker);
update();
}