You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/28 08:34:38 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #20031: [FLINK-27906][runtime] Introduce HsDataIndex.

xintongsong commented on code in PR #20031:
URL: https://github.com/apache/flink/pull/20031#discussion_r908182989


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;
+
+    @BeforeEach
+    void before() {
+        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    }
+
+    /**
+     * If a subpartition has no spilled buffer added to the index, all the query results are {@link
+     * Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionForEmptySubpartition() {

Review Comment:
   ```suggestion
       void testGetReadableRegionEmptySubpartition() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;
+
+    @BeforeEach
+    void before() {
+        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    }
+
+    /**
+     * If a subpartition has no spilled buffer added to the index, all the query results are {@link
+     * Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionForEmptySubpartition() {
+        // Only add buffers for subpartition 0.
+        hsDataIndex.addBuffers(createSpilledBuffers(0, Arrays.asList(0, 10)));
+        hsDataIndex.markBufferReadable(0, 0);
+        hsDataIndex.markBufferReadable(0, 10);
+        // Different subpartitions are independent and will not affect each other.
+        assertThat(hsDataIndex.getReadableRegion(1, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(2, 10)).isNotPresent();
+    }
+
+    /**
+     * If the buffer index with the corresponding subpartition is not exists in the data index, the
+     * return value should be {@link Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotExists() {
+        final int targetSubpartition = 0;
+        // spilled buffers for subpartition 0: (0-4, 0-5), (0-7, 0-8)
+        // note that: 0-4 indicate buffer within subpartition 0 and buffer index is 4.
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(targetSubpartition, Arrays.asList(4, 5, 7, 8));
+
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        assertThat(hsDataIndex.getReadableRegion(targetSubpartition, 6)).isNotPresent();
+    }
+
+    /** If target buffer is not released, {@link Optional#empty()} should be eventually returned. */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotReleased() {

Review Comment:
   ```suggestion
       void testGetReadableRegionNotReadable() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;

Review Comment:
   Declaring it as the interface helps ensuring testing against its contracts rather than implementation.
   ```suggestion
       private HsFileDataIndex hsDataIndex;
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;
+
+    @BeforeEach
+    void before() {
+        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    }
+
+    /**
+     * If a subpartition has no spilled buffer added to the index, all the query results are {@link
+     * Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionForEmptySubpartition() {
+        // Only add buffers for subpartition 0.
+        hsDataIndex.addBuffers(createSpilledBuffers(0, Arrays.asList(0, 10)));
+        hsDataIndex.markBufferReadable(0, 0);
+        hsDataIndex.markBufferReadable(0, 10);
+        // Different subpartitions are independent and will not affect each other.
+        assertThat(hsDataIndex.getReadableRegion(1, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(2, 10)).isNotPresent();

Review Comment:
   Shouldn't this be good enough to server the purpose of this test case?
   ```suggestion
           assertThat(hsDataIndex.getReadableRegion(0, 0)).isNotPresent();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;
+
+    @BeforeEach
+    void before() {
+        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    }
+
+    /**
+     * If a subpartition has no spilled buffer added to the index, all the query results are {@link
+     * Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionForEmptySubpartition() {
+        // Only add buffers for subpartition 0.
+        hsDataIndex.addBuffers(createSpilledBuffers(0, Arrays.asList(0, 10)));
+        hsDataIndex.markBufferReadable(0, 0);
+        hsDataIndex.markBufferReadable(0, 10);
+        // Different subpartitions are independent and will not affect each other.
+        assertThat(hsDataIndex.getReadableRegion(1, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(2, 10)).isNotPresent();
+    }
+
+    /**
+     * If the buffer index with the corresponding subpartition is not exists in the data index, the
+     * return value should be {@link Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotExists() {
+        final int targetSubpartition = 0;
+        // spilled buffers for subpartition 0: (0-4, 0-5), (0-7, 0-8)
+        // note that: 0-4 indicate buffer within subpartition 0 and buffer index is 4.
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(targetSubpartition, Arrays.asList(4, 5, 7, 8));
+
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        assertThat(hsDataIndex.getReadableRegion(targetSubpartition, 6)).isNotPresent();
+    }
+
+    /** If target buffer is not released, {@link Optional#empty()} should be eventually returned. */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotReleased() {
+        final int targetSubpartition = 0;
+        // spilled buffers for subpartition 0: (0-3, 0-4, 0-5, 0-6)
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(targetSubpartition, Arrays.asList(3, 4, 5, 6));
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        Optional<ReadableRegion> regionOpt = hsDataIndex.getReadableRegion(targetSubpartition, 4);
+        assertThat(regionOpt).isNotPresent();
+    }
+
+    /**
+     * If target buffer is already released, a not null {@link ReadableRegion} starts with the given
+     * buffer index should be returned.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsReleased() {
+        final int targetChannel = 0;
+        // spilled buffers for subpartition 0: (0-0, 0-1, 0-2, 0-4, 0-5, 0-6)
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(0, Arrays.asList(0, 1, 2, 4, 5, 6));
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        // only release buffer with index 2, 4, 5.
+        hsDataIndex.markBufferReadable(targetChannel, 2);
+        hsDataIndex.markBufferReadable(targetChannel, 4);
+        hsDataIndex.markBufferReadable(targetChannel, 5);
+
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 1)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 3)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 6)).isNotPresent();
+
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 2))
+                .hasValueSatisfying(
+                        readableRegion -> {
+                            assertRegionStartWithTargetBufferIndex(readableRegion, 2);
+                            // Readable region will not include discontinuous buffer.
+                            assertThat(readableRegion.numReadable).isEqualTo(1);
+                        });
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 4))
+                .hasValueSatisfying(
+                        readableRegion -> {
+                            assertRegionStartWithTargetBufferIndex(readableRegion, 4);
+                            assertThat(readableRegion.numReadable)
+                                    .isGreaterThanOrEqualTo(1)
+                                    .isLessThanOrEqualTo(2);
+                        });
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 5))
+                .hasValueSatisfying(
+                        readableRegion -> {
+                            assertRegionStartWithTargetBufferIndex(readableRegion, 5);
+                            assertThat(readableRegion.numReadable).isEqualTo(1);
+                        });
+    }
+
+    /**
+     * Verify that the offset of the first buffer of the region is the offset of the target buffer.
+     */
+    private void assertRegionStartWithTargetBufferIndex(

Review Comment:
   ```suggestion
       private static void assertRegionStartWithTargetBufferIndex(
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;
+
+    @BeforeEach
+    void before() {
+        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    }
+
+    /**
+     * If a subpartition has no spilled buffer added to the index, all the query results are {@link
+     * Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionForEmptySubpartition() {
+        // Only add buffers for subpartition 0.
+        hsDataIndex.addBuffers(createSpilledBuffers(0, Arrays.asList(0, 10)));
+        hsDataIndex.markBufferReadable(0, 0);
+        hsDataIndex.markBufferReadable(0, 10);
+        // Different subpartitions are independent and will not affect each other.
+        assertThat(hsDataIndex.getReadableRegion(1, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(2, 10)).isNotPresent();
+    }
+
+    /**
+     * If the buffer index with the corresponding subpartition is not exists in the data index, the
+     * return value should be {@link Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotExists() {
+        final int targetSubpartition = 0;
+        // spilled buffers for subpartition 0: (0-4, 0-5), (0-7, 0-8)
+        // note that: 0-4 indicate buffer within subpartition 0 and buffer index is 4.
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(targetSubpartition, Arrays.asList(4, 5, 7, 8));
+
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        assertThat(hsDataIndex.getReadableRegion(targetSubpartition, 6)).isNotPresent();
+    }
+
+    /** If target buffer is not released, {@link Optional#empty()} should be eventually returned. */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotReleased() {
+        final int targetSubpartition = 0;
+        // spilled buffers for subpartition 0: (0-3, 0-4, 0-5, 0-6)
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(targetSubpartition, Arrays.asList(3, 4, 5, 6));
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        Optional<ReadableRegion> regionOpt = hsDataIndex.getReadableRegion(targetSubpartition, 4);
+        assertThat(regionOpt).isNotPresent();
+    }
+
+    /**
+     * If target buffer is already released, a not null {@link ReadableRegion} starts with the given
+     * buffer index should be returned.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsReleased() {
+        final int targetChannel = 0;
+        // spilled buffers for subpartition 0: (0-0, 0-1, 0-2, 0-4, 0-5, 0-6)
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(0, Arrays.asList(0, 1, 2, 4, 5, 6));
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        // only release buffer with index 2, 4, 5.
+        hsDataIndex.markBufferReadable(targetChannel, 2);
+        hsDataIndex.markBufferReadable(targetChannel, 4);
+        hsDataIndex.markBufferReadable(targetChannel, 5);
+
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 1)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 3)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 6)).isNotPresent();
+
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 2))
+                .hasValueSatisfying(
+                        readableRegion -> {
+                            assertRegionStartWithTargetBufferIndex(readableRegion, 2);
+                            // Readable region will not include discontinuous buffer.
+                            assertThat(readableRegion.numReadable).isEqualTo(1);
+                        });
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 4))
+                .hasValueSatisfying(
+                        readableRegion -> {
+                            assertRegionStartWithTargetBufferIndex(readableRegion, 4);
+                            assertThat(readableRegion.numReadable)
+                                    .isGreaterThanOrEqualTo(1)
+                                    .isLessThanOrEqualTo(2);
+                        });
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 5))
+                .hasValueSatisfying(
+                        readableRegion -> {
+                            assertRegionStartWithTargetBufferIndex(readableRegion, 5);
+                            assertThat(readableRegion.numReadable).isEqualTo(1);
+                        });
+    }
+
+    /**
+     * Verify that the offset of the first buffer of the region is the offset of the target buffer.
+     */
+    private void assertRegionStartWithTargetBufferIndex(
+            ReadableRegion readableRegion, int targetBufferIndex) {
+        assertThat(targetBufferIndex).isEqualTo(readableRegion.offset + readableRegion.numSkip);
+    }
+
+    /** Note that: To facilitate testing, offset are set to be equal to buffer index. */
+    private List<SpilledBuffer> createSpilledBuffers(

Review Comment:
   ```suggestion
       private static List<SpilledBuffer> createSpilledBuffers(
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;
+
+    @BeforeEach
+    void before() {
+        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    }
+
+    /**
+     * If a subpartition has no spilled buffer added to the index, all the query results are {@link
+     * Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionForEmptySubpartition() {
+        // Only add buffers for subpartition 0.
+        hsDataIndex.addBuffers(createSpilledBuffers(0, Arrays.asList(0, 10)));
+        hsDataIndex.markBufferReadable(0, 0);
+        hsDataIndex.markBufferReadable(0, 10);
+        // Different subpartitions are independent and will not affect each other.
+        assertThat(hsDataIndex.getReadableRegion(1, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(2, 10)).isNotPresent();
+    }
+
+    /**
+     * If the buffer index with the corresponding subpartition is not exists in the data index, the

Review Comment:
   ```suggestion
        * If the buffer index with the corresponding subpartition does not exist in the data index, the
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;
+
+    @BeforeEach
+    void before() {
+        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    }
+
+    /**
+     * If a subpartition has no spilled buffer added to the index, all the query results are {@link
+     * Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionForEmptySubpartition() {
+        // Only add buffers for subpartition 0.
+        hsDataIndex.addBuffers(createSpilledBuffers(0, Arrays.asList(0, 10)));
+        hsDataIndex.markBufferReadable(0, 0);
+        hsDataIndex.markBufferReadable(0, 10);
+        // Different subpartitions are independent and will not affect each other.
+        assertThat(hsDataIndex.getReadableRegion(1, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(2, 10)).isNotPresent();
+    }
+
+    /**
+     * If the buffer index with the corresponding subpartition is not exists in the data index, the
+     * return value should be {@link Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotExists() {

Review Comment:
   ```suggestion
       void testGetReadableRegionBufferNotExists() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;
+
+    @BeforeEach
+    void before() {
+        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    }
+
+    /**
+     * If a subpartition has no spilled buffer added to the index, all the query results are {@link
+     * Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionForEmptySubpartition() {
+        // Only add buffers for subpartition 0.
+        hsDataIndex.addBuffers(createSpilledBuffers(0, Arrays.asList(0, 10)));
+        hsDataIndex.markBufferReadable(0, 0);
+        hsDataIndex.markBufferReadable(0, 10);
+        // Different subpartitions are independent and will not affect each other.
+        assertThat(hsDataIndex.getReadableRegion(1, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(2, 10)).isNotPresent();
+    }
+
+    /**
+     * If the buffer index with the corresponding subpartition is not exists in the data index, the
+     * return value should be {@link Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotExists() {
+        final int targetSubpartition = 0;
+        // spilled buffers for subpartition 0: (0-4, 0-5), (0-7, 0-8)
+        // note that: 0-4 indicate buffer within subpartition 0 and buffer index is 4.
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(targetSubpartition, Arrays.asList(4, 5, 7, 8));
+
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        assertThat(hsDataIndex.getReadableRegion(targetSubpartition, 6)).isNotPresent();
+    }
+
+    /** If target buffer is not released, {@link Optional#empty()} should be eventually returned. */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotReleased() {
+        final int targetSubpartition = 0;
+        // spilled buffers for subpartition 0: (0-3, 0-4, 0-5, 0-6)
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(targetSubpartition, Arrays.asList(3, 4, 5, 6));
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        Optional<ReadableRegion> regionOpt = hsDataIndex.getReadableRegion(targetSubpartition, 4);
+        assertThat(regionOpt).isNotPresent();
+    }
+
+    /**
+     * If target buffer is already released, a not null {@link ReadableRegion} starts with the given
+     * buffer index should be returned.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsReleased() {

Review Comment:
   ```suggestion
       void testGetReadableRegion() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;
+
+    @BeforeEach
+    void before() {
+        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    }
+
+    /**
+     * If a subpartition has no spilled buffer added to the index, all the query results are {@link
+     * Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionForEmptySubpartition() {
+        // Only add buffers for subpartition 0.
+        hsDataIndex.addBuffers(createSpilledBuffers(0, Arrays.asList(0, 10)));
+        hsDataIndex.markBufferReadable(0, 0);
+        hsDataIndex.markBufferReadable(0, 10);
+        // Different subpartitions are independent and will not affect each other.
+        assertThat(hsDataIndex.getReadableRegion(1, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(2, 10)).isNotPresent();
+    }
+
+    /**
+     * If the buffer index with the corresponding subpartition is not exists in the data index, the
+     * return value should be {@link Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotExists() {
+        final int targetSubpartition = 0;
+        // spilled buffers for subpartition 0: (0-4, 0-5), (0-7, 0-8)
+        // note that: 0-4 indicate buffer within subpartition 0 and buffer index is 4.
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(targetSubpartition, Arrays.asList(4, 5, 7, 8));
+
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        assertThat(hsDataIndex.getReadableRegion(targetSubpartition, 6)).isNotPresent();
+    }

Review Comment:
   Seems `testGetReadableRegionForEmptySubpartition` is a special case of `testGetReadableRegionWhenBufferIsNotExists`. They probably can be combined into one case.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;
+
+    @BeforeEach
+    void before() {
+        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    }
+
+    /**
+     * If a subpartition has no spilled buffer added to the index, all the query results are {@link
+     * Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionForEmptySubpartition() {
+        // Only add buffers for subpartition 0.
+        hsDataIndex.addBuffers(createSpilledBuffers(0, Arrays.asList(0, 10)));
+        hsDataIndex.markBufferReadable(0, 0);
+        hsDataIndex.markBufferReadable(0, 10);
+        // Different subpartitions are independent and will not affect each other.
+        assertThat(hsDataIndex.getReadableRegion(1, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(2, 10)).isNotPresent();
+    }
+
+    /**
+     * If the buffer index with the corresponding subpartition is not exists in the data index, the
+     * return value should be {@link Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotExists() {
+        final int targetSubpartition = 0;
+        // spilled buffers for subpartition 0: (0-4, 0-5), (0-7, 0-8)
+        // note that: 0-4 indicate buffer within subpartition 0 and buffer index is 4.
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(targetSubpartition, Arrays.asList(4, 5, 7, 8));
+
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        assertThat(hsDataIndex.getReadableRegion(targetSubpartition, 6)).isNotPresent();
+    }
+
+    /** If target buffer is not released, {@link Optional#empty()} should be eventually returned. */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotReleased() {
+        final int targetSubpartition = 0;
+        // spilled buffers for subpartition 0: (0-3, 0-4, 0-5, 0-6)
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(targetSubpartition, Arrays.asList(3, 4, 5, 6));
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        Optional<ReadableRegion> regionOpt = hsDataIndex.getReadableRegion(targetSubpartition, 4);
+        assertThat(regionOpt).isNotPresent();
+    }
+
+    /**
+     * If target buffer is already released, a not null {@link ReadableRegion} starts with the given
+     * buffer index should be returned.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsReleased() {
+        final int targetChannel = 0;
+        // spilled buffers for subpartition 0: (0-0, 0-1, 0-2, 0-4, 0-5, 0-6)
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(0, Arrays.asList(0, 1, 2, 4, 5, 6));
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        // only release buffer with index 2, 4, 5.
+        hsDataIndex.markBufferReadable(targetChannel, 2);
+        hsDataIndex.markBufferReadable(targetChannel, 4);
+        hsDataIndex.markBufferReadable(targetChannel, 5);
+
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 1)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 3)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(targetChannel, 6)).isNotPresent();

Review Comment:
   These are already verified by other cases.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.ReadableRegion;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexImpl}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsFileDataIndexImplTest {
+    private static final int NUM_SUBPARTITIONS = 3;
+
+    private HsFileDataIndexImpl hsDataIndex;
+
+    @BeforeEach
+    void before() {
+        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    }
+
+    /**
+     * If a subpartition has no spilled buffer added to the index, all the query results are {@link
+     * Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionForEmptySubpartition() {
+        // Only add buffers for subpartition 0.
+        hsDataIndex.addBuffers(createSpilledBuffers(0, Arrays.asList(0, 10)));
+        hsDataIndex.markBufferReadable(0, 0);
+        hsDataIndex.markBufferReadable(0, 10);
+        // Different subpartitions are independent and will not affect each other.
+        assertThat(hsDataIndex.getReadableRegion(1, 0)).isNotPresent();
+        assertThat(hsDataIndex.getReadableRegion(2, 10)).isNotPresent();
+    }
+
+    /**
+     * If the buffer index with the corresponding subpartition is not exists in the data index, the
+     * return value should be {@link Optional#empty()}.
+     */
+    @Test
+    void testGetReadableRegionWhenBufferIsNotExists() {
+        final int targetSubpartition = 0;
+        // spilled buffers for subpartition 0: (0-4, 0-5), (0-7, 0-8)
+        // note that: 0-4 indicate buffer within subpartition 0 and buffer index is 4.
+        List<SpilledBuffer> spilledBuffers =
+                createSpilledBuffers(targetSubpartition, Arrays.asList(4, 5, 7, 8));
+
+        hsDataIndex.addBuffers(spilledBuffers);
+
+        assertThat(hsDataIndex.getReadableRegion(targetSubpartition, 6)).isNotPresent();
+    }
+
+    /** If target buffer is not released, {@link Optional#empty()} should be eventually returned. */

Review Comment:
   ```suggestion
       /** If target buffer is not readable, {@link Optional#empty()} should be eventually returned. */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org