You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2023/05/19 06:45:50 UTC

[ignite-3] branch main updated: IGNITE-19278 Multiple partition scan requests may return more rows than requested (#2079)

This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 144ffc4f92 IGNITE-19278 Multiple partition scan requests may return more rows than requested (#2079)
144ffc4f92 is described below

commit 144ffc4f922b18d8c69a414fb0ce42121569dbff
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Fri May 19 09:45:45 2023 +0300

    IGNITE-19278 Multiple partition scan requests may return more rows than requested (#2079)
---
 .../ignite/internal/table/ItTableScanTest.java     | 30 ++++++++++++++++++++++
 .../distributed/storage/InternalTableImpl.java     |  8 ++++--
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 64fd61b178..9ee8498b47 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -76,6 +76,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
 /**
  * Tests to check a scan internal command.
@@ -580,6 +582,34 @@ public class ItTableScanTest extends ClusterPerClassIntegrationTest {
         }
     }
 
+    /**
+     * Ensures that multiple consecutive scan requests with different requested rows amount
+     * return the expected total number of requested rows.
+     *
+     * @param requestAmount1 Number of rows in the first request.
+     * @param requestAmount2 Number of rows in the second request.
+     *
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @CsvSource({"3, 1", "1, 3"})
+    public void testCompositeScanRequest(int requestAmount1, int requestAmount2) throws Exception {
+        List<BinaryRow> scannedRows = new ArrayList<>();
+        Publisher<BinaryRow> publisher = internalTable.scan(0, null, null, null, null, 0, null);
+        CompletableFuture<Void> scanned = new CompletableFuture<>();
+
+        Subscription subscription = subscribeToPublisher(scannedRows, publisher, scanned);
+
+        subscription.request(requestAmount1);
+        subscription.request(requestAmount2);
+
+        int total = requestAmount1 + requestAmount2;
+        assertTrue(waitForCondition(() -> scannedRows.size() == total, 10_000),
+                "expected=" + total + ", actual=" + scannedRows.size());
+
+        subscription.cancel();
+    }
+
     /**
      * Represents a binary row as a string.
      *
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index a3cf28d7f6..57a6774ca9 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -1408,8 +1408,12 @@ public class InternalTableImpl implements InternalTable {
 
                     if (binaryRows.size() < n) {
                         cancel();
-                    } else if (requestedItemsCnt.addAndGet(Math.negateExact(binaryRows.size())) > 0) {
-                        scanBatch(Math.min(n, INTERNAL_BATCH_SIZE));
+                    } else {
+                        long remaining = requestedItemsCnt.addAndGet(Math.negateExact(binaryRows.size()));
+
+                        if (remaining > 0) {
+                            scanBatch((int) Math.min(remaining, INTERNAL_BATCH_SIZE));
+                        }
                     }
                 }).exceptionally(t -> {
                     cancel(t);