You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/22 08:41:33 UTC

[GitHub] [flink] zhijiangW commented on a change in pull request #13595: [FLINK-19582][network] Introduce sort-merge based blocking shuffle to Flink

zhijiangW commented on a change in pull request #13595:
URL: https://github.com/apache/flink/pull/13595#discussion_r509983577



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import static org.apache.flink.runtime.io.network.partition.SortBuffer.BufferWithChannel;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SortMergeResultPartition} appends records and events to {@link SortBuffer} and after the {@link SortBuffer}
+ * is full, all data in the {@link SortBuffer} will be copied and spilled to a {@link PartitionedFile} in subpartition
+ * index order sequentially. Large records that can not be appended to an empty {@link SortBuffer} will be spilled to
+ * the {@link PartitionedFile} separately.
+ */
+@NotThreadSafe
+public class SortMergeResultPartition extends ResultPartition {
+
+	private final Object lock = new Object();
+
+	/** All active readers which are consuming data from this result partition now. */
+	@GuardedBy("lock")
+	private final Set<SortMergeSubpartitionReader> readers = new HashSet<>();
+
+	/** {@link PartitionedFile} produced by this result partition. */
+	@GuardedBy("lock")
+	private PartitionedFile resultFile;
+
+	/** Used to generate random file channel ID. */
+	private final FileChannelManager channelManager;
+
+	/** Number of data buffers (excluding events) written for each subpartition. */
+	private final int[] numDataBuffers;
+
+	/** A piece of unmanaged memory for data writing. */
+	private final MemorySegment writeBuffer;
+
+	/** Size of network buffer and write buffer. */
+	private final int networkBufferSize;
+
+	/** Current {@link SortBuffer} to append records to. */
+	private SortBuffer currentSortBuffer;
+
+	/** File writer for this result partition. */
+	private PartitionedFileWriter fileWriter;
+
+	public SortMergeResultPartition(
+			String owningTaskName,
+			int partitionIndex,
+			ResultPartitionID partitionId,
+			ResultPartitionType partitionType,
+			int numSubpartitions,
+			int numTargetKeyGroups,
+			int networkBufferSize,
+			ResultPartitionManager partitionManager,
+			FileChannelManager channelManager,
+			@Nullable BufferCompressor bufferCompressor,
+			SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
+
+		super(
+			owningTaskName,
+			partitionIndex,
+			partitionId,
+			partitionType,
+			numSubpartitions,
+			numTargetKeyGroups,
+			partitionManager,
+			bufferCompressor,
+			bufferPoolFactory);
+
+		this.channelManager = checkNotNull(channelManager);
+		this.networkBufferSize = networkBufferSize;
+		this.numDataBuffers = new int[numSubpartitions];
+		this.writeBuffer = MemorySegmentFactory.allocateUnpooledOffHeapMemory(networkBufferSize);
+	}
+
+	@Override
+	protected void releaseInternal() {
+		synchronized (lock) {
+			isFinished = true; // to fail writing faster
+
+			// delete the produced file only when no reader is reading now
+			if (readers.isEmpty()) {
+				if (resultFile != null) {
+					resultFile.deleteQuietly();
+					resultFile = null;
+				}
+			}
+		}
+	}
+
+	@Override
+	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
+		emit(record, targetSubpartition, DataType.DATA_BUFFER);
+	}
+
+	@Override
+	public void broadcastRecord(ByteBuffer record) throws IOException {
+		broadcast(record, DataType.DATA_BUFFER);
+	}
+
+	@Override
+	public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+		Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
+		try {
+			ByteBuffer serializedEvent = buffer.getNioBufferReadable();
+			broadcast(serializedEvent, buffer.getDataType());
+		} finally {
+			buffer.recycleBuffer();
+		}
+	}
+
+	private void broadcast(ByteBuffer record, DataType dataType) throws IOException {
+		for (int channelIndex = 0; channelIndex < numSubpartitions; ++channelIndex) {
+			record.rewind();
+			emit(record, channelIndex, dataType);
+		}
+	}
+
+	private void emit(ByteBuffer record, int targetSubpartition, DataType dataType) throws IOException {
+		checkInProduceState();
+
+		SortBuffer sortBuffer = getSortBuffer();
+		if (sortBuffer.append(record, targetSubpartition, dataType)) {
+			return;
+		}
+
+		if (!sortBuffer.hasRemaining()) {
+			// the record can not be appended to the free sort buffer because it is too large
+			releaseCurrentSortBuffer();
+			writeLargeRecord(record, targetSubpartition, dataType);
+			return;
+		}
+
+		flushCurrentSortBuffer();
+		emit(record, targetSubpartition, dataType);
+	}
+
+	private void releaseCurrentSortBuffer() {
+		if (currentSortBuffer != null) {
+			currentSortBuffer.release();
+			currentSortBuffer = null;
+		}
+	}
+
+	private SortBuffer getSortBuffer() {
+		if (currentSortBuffer != null) {
+			return currentSortBuffer;
+		}
+
+		currentSortBuffer = new PartitionSortedBuffer(bufferPool, numSubpartitions, networkBufferSize);
+		return currentSortBuffer;
+	}
+
+	private void flushCurrentSortBuffer() throws IOException {
+		if (currentSortBuffer == null || !currentSortBuffer.hasRemaining()) {
+			releaseCurrentSortBuffer();
+			return;
+		}
+
+		currentSortBuffer.finish();
+		PartitionedFileWriter fileWriter = getPartitionedFileWriter();
+
+		while (currentSortBuffer.hasRemaining()) {
+			BufferWithChannel bufferWithChannel = currentSortBuffer.copyData(writeBuffer);
+			Buffer buffer = bufferWithChannel.getBuffer();
+			int subpartitionIndex = bufferWithChannel.getChannelIndex();
+
+			writeCompressedBufferIfPossible(buffer, fileWriter, subpartitionIndex);
+			updateStatistics(buffer, subpartitionIndex);
+		}
+
+		releaseCurrentSortBuffer();
+	}
+
+	private PartitionedFileWriter getPartitionedFileWriter() throws IOException {
+		if (fileWriter == null) {
+			String basePath = channelManager.createChannel().getPath();
+			fileWriter = new PartitionedFileWriter(basePath, numSubpartitions);
+			fileWriter.open();
+		}
+
+		fileWriter.startNewRegion();
+		return fileWriter;
+	}
+
+	private void writeCompressedBufferIfPossible(
+			Buffer buffer,
+			PartitionedFileWriter fileWriter,
+			int targetSubpartition) throws IOException {
+		if (canBeCompressed(buffer)) {
+			buffer = bufferCompressor.compressToIntermediateBuffer(buffer);
+		}
+		fileWriter.writeBuffer(buffer, targetSubpartition);
+		buffer.recycleBuffer();
+	}
+
+	private void updateStatistics(Buffer buffer, int subpartitionIndex) {
+		numBuffersOut.inc();
+		numBytesOut.inc(buffer.readableBytes());
+		if (buffer.isBuffer()) {
+			++numDataBuffers[subpartitionIndex];
+		}
+	}
+
+	/**
+	 * Spills the large record into the target {@link PartitionedFile} as a separate data region.
+	 */
+	private void writeLargeRecord(ByteBuffer record, int targetSubpartition, DataType dataType) throws IOException {
+		PartitionedFileWriter fileWriter = getPartitionedFileWriter();
+
+		while (record.hasRemaining()) {
+			int toCopy = Math.min(record.remaining(), writeBuffer.size());
+			writeBuffer.put(0, record, toCopy);
+			NetworkBuffer buffer = new NetworkBuffer(writeBuffer, (buf) -> {}, dataType, toCopy);
+
+			writeCompressedBufferIfPossible(buffer, fileWriter, targetSubpartition);
+			updateStatistics(buffer, targetSubpartition);
+		}
+	}
+
+	void releaseReader(SortMergeSubpartitionReader reader) {
+		synchronized (lock) {
+			readers.remove(reader);
+
+			// release the result partition if it has been marked as released
+			if (readers.isEmpty() && isReleased()) {
+				releaseInternal();
+			}
+		}
+	}
+
+	@Override
+	public void finish() throws IOException {
+		checkInProduceState();
+
+		broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
+		flushCurrentSortBuffer();
+
+		synchronized (lock) {
+			checkState(!isReleased());
+
+			resultFile = fileWriter.finish();
+			fileWriter = null;
+
+			LOG.info("New partitioned file produced: {}.", resultFile);
+		}
+
+		super.finish();
+	}
+
+	@Override
+	public void close() {
+		releaseCurrentSortBuffer();

Review comment:
       `#close` could be called by canceller thread if I remembered correctly, then it might cause race condition here.
   Maybe we can only release `fileWriter` as below to interrupt the task thread while writing data. For the other resource cleanup we can put them into `#releaseInternal()` before task exits.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org