You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2023/05/19 06:45:50 UTC
[ignite-3] branch main updated: IGNITE-19278 Multiple partition scan requests may return more rows than requested (#2079)
This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 144ffc4f92 IGNITE-19278 Multiple partition scan requests may return more rows than requested (#2079)
144ffc4f92 is described below
commit 144ffc4f922b18d8c69a414fb0ce42121569dbff
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Fri May 19 09:45:45 2023 +0300
IGNITE-19278 Multiple partition scan requests may return more rows than requested (#2079)
---
.../ignite/internal/table/ItTableScanTest.java | 30 ++++++++++++++++++++++
.../distributed/storage/InternalTableImpl.java | 8 ++++--
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 64fd61b178..9ee8498b47 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -76,6 +76,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
/**
* Tests to check a scan internal command.
@@ -580,6 +582,34 @@ public class ItTableScanTest extends ClusterPerClassIntegrationTest {
}
}
+ /**
+ * Ensures that multiple consecutive scan requests with different requested rows amount
+ * return the expected total number of requested rows.
+ *
+ * @param requestAmount1 Number of rows in the first request.
+ * @param requestAmount2 Number of rows in the second request.
+ *
+ * @throws Exception If failed.
+ */
+ @ParameterizedTest
+ @CsvSource({"3, 1", "1, 3"})
+ public void testCompositeScanRequest(int requestAmount1, int requestAmount2) throws Exception {
+ List<BinaryRow> scannedRows = new ArrayList<>();
+ Publisher<BinaryRow> publisher = internalTable.scan(0, null, null, null, null, 0, null);
+ CompletableFuture<Void> scanned = new CompletableFuture<>();
+
+ Subscription subscription = subscribeToPublisher(scannedRows, publisher, scanned);
+
+ subscription.request(requestAmount1);
+ subscription.request(requestAmount2);
+
+ int total = requestAmount1 + requestAmount2;
+ assertTrue(waitForCondition(() -> scannedRows.size() == total, 10_000),
+ "expected=" + total + ", actual=" + scannedRows.size());
+
+ subscription.cancel();
+ }
+
/**
* Represents a binary row as a string.
*
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index a3cf28d7f6..57a6774ca9 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -1408,8 +1408,12 @@ public class InternalTableImpl implements InternalTable {
if (binaryRows.size() < n) {
cancel();
- } else if (requestedItemsCnt.addAndGet(Math.negateExact(binaryRows.size())) > 0) {
- scanBatch(Math.min(n, INTERNAL_BATCH_SIZE));
+ } else {
+ long remaining = requestedItemsCnt.addAndGet(Math.negateExact(binaryRows.size()));
+
+ if (remaining > 0) {
+ scanBatch((int) Math.min(remaining, INTERNAL_BATCH_SIZE));
+ }
}
}).exceptionally(t -> {
cancel(t);