You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/27 09:06:36 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #20371: [FLINK-27908][network] Introduce HsResultPartition and HsSubpartitionView

xintongsong commented on code in PR #20371:
URL: https://github.com/apache/flink/pull/20371#discussion_r930690641


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java:
##########
@@ -97,24 +97,40 @@ public Optional<CompletableFuture<Void>> getSpilledFuture() {
         return Optional.ofNullable(spilledFuture);
     }
 
-    public void release() {
-        checkState(!released, "Release buffer repeatedly is unexpected.");
+    /**
+     * Mark buffer status to release.
+     *
+     * @return Whether the status has been modified successfully. If it has been released, false
+     *     will be returned.
+     */
+    public boolean release() {

Review Comment:
   Looking into how this method is used, I don't think we need this boolean return value.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java:
##########
@@ -97,24 +97,40 @@ public Optional<CompletableFuture<Void>> getSpilledFuture() {
         return Optional.ofNullable(spilledFuture);
     }
 
-    public void release() {
-        checkState(!released, "Release buffer repeatedly is unexpected.");
+    /**
+     * Mark buffer status to release.
+     *
+     * @return Whether the status has been modified successfully. If it has been released, false
+     *     will be returned.
+     */
+    public boolean release() {
+        if (isReleased()) {
+            return false;
+        }
         released = true;
         // decrease ref count when buffer is released from memory.
         buffer.recycleBuffer();
+        return true;
     }
 
-    public void startSpilling(CompletableFuture<Void> spilledFuture) {
-        checkState(!released, "Buffer is already released.");
-        checkState(
-                !spillStarted && this.spilledFuture == null,
-                "Spill buffer repeatedly is unexpected.");
+    /**
+     * Mark buffer status to startSpilling.
+     *
+     * @param spilledFuture completable future of this buffer's spilling operation.
+     * @return Whether the status has been modified successfully. If it has been released or is in
+     *     startSpilling status, false will be returned.

Review Comment:
   I'd suggest to be more straightforward.
   ```
   @return true, if spilling of the buffer has not been yet started before; false, otherwise.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java:
##########
@@ -97,24 +97,40 @@ public Optional<CompletableFuture<Void>> getSpilledFuture() {
         return Optional.ofNullable(spilledFuture);
     }
 
-    public void release() {
-        checkState(!released, "Release buffer repeatedly is unexpected.");
+    /**
+     * Mark buffer status to release.
+     *
+     * @return Whether the status has been modified successfully. If it has been released, false
+     *     will be returned.
+     */
+    public boolean release() {
+        if (isReleased()) {
+            return false;
+        }
         released = true;
         // decrease ref count when buffer is released from memory.
         buffer.recycleBuffer();
+        return true;
     }
 
-    public void startSpilling(CompletableFuture<Void> spilledFuture) {
-        checkState(!released, "Buffer is already released.");
-        checkState(
-                !spillStarted && this.spilledFuture == null,
-                "Spill buffer repeatedly is unexpected.");
+    /**
+     * Mark buffer status to startSpilling.
+     *
+     * @param spilledFuture completable future of this buffer's spilling operation.
+     * @return Whether the status has been modified successfully. If it has been released or is in
+     *     startSpilling status, false will be returned.
+     */
+    public boolean startSpilling(CompletableFuture<Void> spilledFuture) {
+        if (isReleased() || isSpillStarted()) {

Review Comment:
   Shall we still use `checkState` for `isReleased`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndex.java:
##########
@@ -62,6 +62,9 @@ public interface HsFileDataIndex {
      */
     void markBufferReadable(int subpartitionId, int bufferIndex);
 
+    /** Clean-up the {@link HsFileDataIndex} to free memory as soon as possible. */
+    default void cleanUp() {}

Review Comment:
   Why is this `default` while the only implementation of this interface overrides it?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java:
##########
@@ -211,12 +211,17 @@ public List<BufferWithIdentity> spillSubpartitionBuffers(
         return callWithLock(
                 () ->
                         toSpill.stream()
+                                .filter(
+                                        indexAndChannel -> {
+                                            int bufferIndex = indexAndChannel.getBufferIndex();
+                                            return startSpillingBuffer(

Review Comment:
   It's a bit against intuition to perform an action like this in a filter.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDiskDataView.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReaderImpl.BufferIndexOrError;
+
+import java.util.Queue;
+
+/**
+ * A view for {@link HsSubpartitionView} to find out what data exists in disk and polling the data.
+ */
+public class HsDiskDataView {

Review Comment:
   I think `HsDiskDataView` and `HsMemoryDataView` can be two interfaces (or even one) implemented by `HsFileDataManager` respectively, rather than two classes. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadScheduler.java:
##########
@@ -132,7 +133,9 @@ public HsResultPartitionReadScheduler(
         this.dataFilePath = checkNotNull(dataFilePath);
         this.bufferPool = checkNotNull(bufferPool);
         this.ioExecutor = checkNotNull(ioExecutor);
-        this.maxRequestedBuffers = hybridShuffleConfiguration.getMaxRequestedBuffers();
+        this.maxRequestedBuffers =
+                hybridShuffleConfiguration.getMaxRequestedBuffers(
+                        numSubpartitions, bufferPool.getNumBuffersPerRequest());

Review Comment:
   I'm not sure about dynamically deciding this at initializing the scheduler. I think these should be decided at the time generating the hybrid shuffle configuration.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDiskDataView.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReaderImpl.BufferIndexOrError;
+
+import java.util.Queue;
+
+/**
+ * A view for {@link HsSubpartitionView} to find out what data exists in disk and polling the data.
+ */
+public class HsDiskDataView {

Review Comment:
   We may also lack a layer of `HsFileDataManager` here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java:
##########
@@ -83,6 +83,14 @@ public void markBufferReadable(int subpartitionId, int bufferIndex) {
         }
     }
 
+    @Override
+    public void cleanUp() {
+        synchronized (lock) {
+            subpartitionFirstBufferIndexInternalRegions.forEach(TreeMap::clear);
+            subpartitionFirstBufferIndexInternalRegions.clear();

Review Comment:
   Why do we need these cleanup? Shouldn't these be GC-ed automatically?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java:
##########
@@ -135,8 +146,23 @@ public float getFullStrategyReleaseBufferRatio() {
         return fullStrategyReleaseBufferRatio;
     }
 
+    /** Get {@link HsSelectiveSpillingStrategy} for hybrid shuffle mode. */
+    public HsSpillingStrategy getSpillingStrategy() {
+        switch (spillingStrategyName.toLowerCase()) {
+            case FULL_SPILLING_STRATEGY_NAME:
+                return new HsFullSpillingStrategy(this);
+
+            case SELECTIVE_SPILLING_STRATEGY_NAME:
+                return new HsSelectiveSpillingStrategy(this);
+
+            default:
+                throw new IllegalConfigurationException(
+                        "Unknown spilling strategy: " + spillingStrategyName);
+        }
+    }

Review Comment:
   Why don't use enum? Given that arbitrary strategy is not supported anyway.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java:
##########
@@ -252,6 +252,20 @@ public void releaseSubpartitionBuffers(List<BufferIndexAndChannel> toRelease) {
                                 }));
     }
 
+    @SuppressWarnings("FieldAccessNotGuarded")
+    // Note that: runWithLock ensure that code block guarded by resultPartitionReadLock and
+    // subpartitionLock.
+    public void release() {
+        runWithLock(
+                () -> {
+                    // clear all buffer of this subpartition to help gc, memorySegments should
+                    // recycle to buffer pool after spilled.
+                    unfinishedBuffers.clear();
+                    allBuffers.clear();
+                    unConsumedBuffers.clear();

Review Comment:
   We should not need these.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -61,23 +61,30 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
 
     private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
 
+    private HsMemoryDataSpiller spiller;
+
     public HsMemoryDataManager(
             int numSubpartitions,
             int bufferSize,
             BufferPool bufferPool,
             HsSpillingStrategy spillStrategy,
             HsFileDataIndex fileDataIndex,
-            FileChannel dataFileChannel) {
+            Path dataFilePath) {
         this.numSubpartitions = numSubpartitions;
         this.bufferPool = bufferPool;
-        this.spiller = new HsMemoryDataSpiller(dataFileChannel);
         this.spillStrategy = spillStrategy;
         this.fileDataIndex = fileDataIndex;
         this.subpartitionMemoryDataManagers = new HsSubpartitionMemoryDataManager[numSubpartitions];
 
         ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
         this.lock = readWriteLock.writeLock();
 
+        try {
+            this.spiller = new HsMemoryDataSpiller(dataFilePath);
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        }

Review Comment:
   You can keep `spiller` to be `final`, and throw the `IOException` from the constructor. `ResultPartition#setup` throws `IOException` anyway.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java:
##########
@@ -145,8 +149,22 @@ private void setBufferWithHeader(Buffer buffer, ByteBuffer[] bufferWithHeaders,
         bufferWithHeaders[index + 1] = buffer.getNioBufferReadable();
     }
 
-    @Override
-    public void close() throws Exception {
+    private FileChannel openFileChannel(Path path) throws IOException {
+        return FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+    }
+
+    public void close() {
         ioExecutor.shutdown();
     }
+
+    public void release() {

Review Comment:
   I think we need to document the semantics of these two methods.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java:
##########
@@ -384,19 +389,23 @@ private void trimHeadingReleasedBuffers(Deque<HsBufferContext> bufferQueue) {
     }
 
     @GuardedBy("subpartitionLock")
-    private void releaseBuffer(int bufferIndex) {
-        HsBufferContext bufferContext = checkNotNull(bufferIndexToContexts.remove(bufferIndex));
-        bufferContext.release();
+    private boolean releaseBuffer(int bufferIndex) {
+        HsBufferContext bufferContext = bufferIndexToContexts.remove(bufferIndex);
+        if (bufferContext == null || !bufferContext.release()) {
+            return false;
+        }
         // remove released buffers from head lazy.
         trimHeadingReleasedBuffers(allBuffers);
+        return true;
     }
 
     @GuardedBy("subpartitionLock")
-    private HsBufferContext startSpillingBuffer(
-            int bufferIndex, CompletableFuture<Void> spillFuture) {
-        HsBufferContext bufferContext = checkNotNull(bufferIndexToContexts.get(bufferIndex));
-        bufferContext.startSpilling(spillFuture);
-        return bufferContext;
+    private boolean startSpillingBuffer(int bufferIndex, CompletableFuture<Void> spillFuture) {

Review Comment:
   We can return `Optional<HsBufferContext>` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org