You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/14 13:00:54 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #13614: [FLINK-19547][runtime] Clean up partial record when reconnecting for Approximate Local Recovery

pnowojski commented on a change in pull request #13614:
URL: https://github.com/apache/flink/pull/13614#discussion_r504622425



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
##########
@@ -159,6 +184,14 @@ int getCurrentReaderPosition() {
 		return currentReaderPosition;
 	}
 
+	boolean startOfDataBuffer() {

Review comment:
       nit: `isStartOfDataBuffer()`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -128,26 +128,40 @@ protected void flushAllSubpartitions(boolean finishProducers) {
 
 	@Override
 	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
-		do {
-			final BufferBuilder bufferBuilder = getSubpartitionBufferBuilder(targetSubpartition);
-			bufferBuilder.appendAndCommit(record);
+		BufferBuilder buffer = subpartitionBufferBuilders[targetSubpartition];
 
-			if (bufferBuilder.isFull()) {
-				finishSubpartitionBufferBuilder(targetSubpartition);
-			}
-		} while (record.hasRemaining());
+		if (buffer == null) {
+			buffer = getNewEmptySubpartitionBufferBuilderForNewRecord(targetSubpartition);
+		}
+		buffer.appendAndCommit(record);

Review comment:
       nit: those names are a bit lengthy, also it's a bit confusing that `getNewEmptySubpartitionBufferBuilderForNewRecord` is not appending the data, while `getNewEmptySubpartitionBufferBuilderForRecordContinuation` is.
   
   Maybe can you extract those lines above into a method:
   ```
   BufferBuilder buffer = appendDataForSubpartitionNewRecord(record, targetSubpartition);
   ```
   and for the broadcast version
   ```
   BufferBuilder buffer = appendDataForBroadcastNewRecord(record, targetSubpartition);
   ```
   ...

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -305,6 +308,10 @@ BufferAndBacklog pollBuffer() {
 		}
 	}
 
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		return buffer.build();
+	}

Review comment:
       inline?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumerWithPartialRecordLength.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * BufferConsumer with partial record length if a record is spanning over buffers
+ *
+ * <p>`partialRecordLength` is the length of bytes to skip in order to start with a complete record,
+ * from position index 0 of the underlying MemorySegment. `partialRecordLength` is used in approximate
+ * local recovery to find the start position of a complete record on a BufferConsumer, so called
+ * `partial record clean-up`.
+ *
+ * <p>Partial records happen if a record can not fit into one buffer, then the remaining part of the same record
+ * is put into the next buffer. Hence partial records only exist at the beginning of a buffer.
+ * Partial record clean-up is needed in the mode of approximate local recovery.
+ * If a record is spanning over multiple buffers, and the first (several) buffers have got lost due to the failure
+ * of the receiver task, the remaining data belonging to the same record in transition should be cleaned up.
+ *
+ * <p> If partialRecordLength == 0, the buffer starts with a complete record</p>
+ * <p> If partialRecordLength > 0, the buffer starts with a partial record, its length = partialRecordLength</p>
+ * <p> If partialRecordLength < 0, partialRecordLength is undefined. It is currently used in
+ * 									{@cite ResultSubpartitionRecoveredStateHandler#recover}</p>
+ */
+@NotThreadSafe
+public class BufferConsumerWithPartialRecordLength {
+	private final BufferConsumer bufferConsumer;
+	private final int partialRecordLength;
+
+	public BufferConsumerWithPartialRecordLength(BufferConsumer bufferConsumer, int partialRecordLength) {
+		this.bufferConsumer = checkNotNull(bufferConsumer);
+		this.partialRecordLength = partialRecordLength;
+	}
+
+	public BufferConsumer getBufferConsumer() {
+		return bufferConsumer;
+	}
+
+	public int getPartialRecordLength() {
+		return partialRecordLength;
+	}
+
+	public Buffer build() {
+		return bufferConsumer.build();
+	}
+
+	public PartialRecordCleanupResult cleanupPartialRecord() {

Review comment:
       Do you really need to return the buffer here? I have a feeling that the could would be simpler if:
   
   - this method was returning just `true/false` if cleanup has finished or not. 
   - `skipBuild(...)` would be replaced with `skip(...)`, that would just move the offset, without returning the buffer
   - `.build()` call would be required afterwards, to get the remaining data

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -211,46 +225,62 @@ protected void releaseInternal() {
 		}
 	}
 
-	private BufferBuilder getSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
-		final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition];
-		if (bufferBuilder != null) {
-			return bufferBuilder;
-		}
+	private BufferBuilder getNewEmptySubpartitionBufferBuilderForNewRecord(int targetSubpartition) throws IOException {
+		final BufferBuilder bufferBuilder = requestNewSubpartitionBufferBuilder(targetSubpartition);
+		subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(), 0);
 
-		return getNewSubpartitionBufferBuilder(targetSubpartition);
+		return bufferBuilder;
 	}
 
-	private BufferBuilder getNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
+	private BufferBuilder getNewEmptySubpartitionBufferBuilderForRecordContinuation(
+			final ByteBuffer remainingRecordBytes,
+			final int targetSubpartition) throws IOException {
+		final BufferBuilder bufferBuilder = requestNewSubpartitionBufferBuilder(targetSubpartition);
+		final int partialRecordBytes = bufferBuilder.appendAndCommit(remainingRecordBytes);
+		subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(), partialRecordBytes);
+
+		return bufferBuilder;
+	}
+
+	private BufferBuilder requestNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
 		checkInProduceState();
 		ensureUnicastMode();
-
 		final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition);
-		subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer());
 		subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;
+
 		return bufferBuilder;
 	}
 
-	private BufferBuilder getBroadcastBufferBuilder() throws IOException {
-		if (broadcastBufferBuilder != null) {
-			return broadcastBufferBuilder;
+	private BufferBuilder getNewEmptyBroadcastBufferBuilderForNewRecord() throws IOException {
+		final BufferBuilder bufferBuilder = requestNewBroadcastBufferBuilder();
+		try (final BufferConsumer consumer = bufferBuilder.createBufferConsumerFromBeginning()) {
+			for (ResultSubpartition subpartition : subpartitions) {
+				subpartition.add(consumer.copy(), 0);
+			}
 		}
 
-		return getNewBroadcastBufferBuilder();
+		return bufferBuilder;

Review comment:
       if you moved appending data into this method as I suggested above, you could deduplicate this code with:
   ```
   return getNewEmptyBroadcastBufferBuilderForRecordContinuation(record, 0);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
##########
@@ -108,6 +109,30 @@ public Buffer build() {
 		return slice.retainBuffer();
 	}
 
+	/**
+	 * @param bytesToSkip number of bytes to skip from currentReaderPosition
+	 * @return sliced {@link Buffer} containing the not yet consumed data, Returned {@link Buffer} shares the reference
+	 * counter with the parent {@link BufferConsumer} - in order to recycle memory both of them must be recycled/closed.
+	 */
+	Buffer skipBuild(int bytesToSkip) {
+		writerPosition.update();
+		int cachedWriterPosition = writerPosition.getCached();
+		Buffer slice;
+
+		int bytesReadable = cachedWriterPosition - currentReaderPosition;
+		checkState(bytesToSkip <= bytesReadable, "bytes to skip beyond readable range");
+
+		if (bytesToSkip < buffer.getMaxCapacity()) {
+			slice = buffer.readOnlySlice(currentReaderPosition + bytesToSkip, bytesReadable - bytesToSkip);
+		} else {
+			// return an empty buffer if beyond buffer max capacity
+			slice = buffer.readOnlySlice(currentReaderPosition, 0);
+		}

Review comment:
       How could it be that `bytesToSkip > buffer.getMaxCapacity()` if we already asserted `bytesToSkip <= bytesReadable`? If that's impossible, we could replace this if with just
   ```
   slice = buffer.readOnlySlice(currentReaderPosition + bytesToSkip, bytesReadable - bytesToSkip);
   ```
   as that would also return an empty buffer if `bytesToSkip == bytesReadable`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -128,26 +128,40 @@ protected void flushAllSubpartitions(boolean finishProducers) {
 
 	@Override
 	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
-		do {
-			final BufferBuilder bufferBuilder = getSubpartitionBufferBuilder(targetSubpartition);
-			bufferBuilder.appendAndCommit(record);
+		BufferBuilder buffer = subpartitionBufferBuilders[targetSubpartition];
 
-			if (bufferBuilder.isFull()) {
-				finishSubpartitionBufferBuilder(targetSubpartition);
-			}
-		} while (record.hasRemaining());
+		if (buffer == null) {
+			buffer = getNewEmptySubpartitionBufferBuilderForNewRecord(targetSubpartition);
+		}
+		buffer.appendAndCommit(record);
+
+		while (record.hasRemaining()) {
+			finishSubpartitionBufferBuilder(targetSubpartition);
+			buffer = getNewEmptySubpartitionBufferBuilderForRecordContinuation(record, targetSubpartition);

Review comment:
       ...
   ```
   buffer = appendDataForSubpartitionRecordContinuation(record, targetSubpartition);
   ```
   and for the broadcast version
   ```
   buffer = appendDataForBroadcastRecordContinuation(record, targetSubpartition);
   ```
   ?
   

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -211,46 +225,62 @@ protected void releaseInternal() {
 		}
 	}
 
-	private BufferBuilder getSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
-		final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition];
-		if (bufferBuilder != null) {
-			return bufferBuilder;
-		}
+	private BufferBuilder getNewEmptySubpartitionBufferBuilderForNewRecord(int targetSubpartition) throws IOException {
+		final BufferBuilder bufferBuilder = requestNewSubpartitionBufferBuilder(targetSubpartition);
+		subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(), 0);
 
-		return getNewSubpartitionBufferBuilder(targetSubpartition);
+		return bufferBuilder;
 	}
 
-	private BufferBuilder getNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
+	private BufferBuilder getNewEmptySubpartitionBufferBuilderForRecordContinuation(
+			final ByteBuffer remainingRecordBytes,
+			final int targetSubpartition) throws IOException {
+		final BufferBuilder bufferBuilder = requestNewSubpartitionBufferBuilder(targetSubpartition);
+		final int partialRecordBytes = bufferBuilder.appendAndCommit(remainingRecordBytes);
+		subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(), partialRecordBytes);
+
+		return bufferBuilder;
+	}
+
+	private BufferBuilder requestNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
 		checkInProduceState();
 		ensureUnicastMode();
-
 		final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition);
-		subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer());
 		subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;
+
 		return bufferBuilder;
 	}
 
-	private BufferBuilder getBroadcastBufferBuilder() throws IOException {
-		if (broadcastBufferBuilder != null) {
-			return broadcastBufferBuilder;
+	private BufferBuilder getNewEmptyBroadcastBufferBuilderForNewRecord() throws IOException {
+		final BufferBuilder bufferBuilder = requestNewBroadcastBufferBuilder();
+		try (final BufferConsumer consumer = bufferBuilder.createBufferConsumerFromBeginning()) {
+			for (ResultSubpartition subpartition : subpartitions) {
+				subpartition.add(consumer.copy(), 0);
+			}
 		}
 
-		return getNewBroadcastBufferBuilder();
+		return bufferBuilder;
 	}
 
-	private BufferBuilder getNewBroadcastBufferBuilder() throws IOException {
+	private BufferBuilder getNewEmptyBroadcastBufferBuilderForRecordContinuation(
+			final ByteBuffer remainingRecordBytes) throws IOException {
+			final BufferBuilder bufferBuilder = requestNewBroadcastBufferBuilder();
+		final int partialRecordBytes = bufferBuilder.appendAndCommit(remainingRecordBytes);
+		try (final BufferConsumer consumer = bufferBuilder.createBufferConsumerFromBeginning()) {
+			for (ResultSubpartition subpartition : subpartitions) {
+				subpartition.add(consumer.copy(), partialRecordBytes);

Review comment:
       hmmm, maybe it will be easier to deduplicate/generalise some of the code, if you replaced `partialRecordBytes` (`partialRecordLength`) with `remainingRecordBytes.remaining()` (`remainingRecordLength` - `remainingRecordLength` could be more than the size of one single `Buffer`, and would basically mean what's the remaining record length. if `remainingRecordLength > buffer.size()` it means we can not complete record clean up in one call)
   
   This would slightly change the implementation of `skipBuild(...)`, but maybe it would make the code overally a bit simpler?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org