You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2022/06/01 10:26:07 UTC
[ignite-3] 01/01: IGNITE-17068 Sql: Fix AsyncResultSet.fetchNextPage semantics
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch ignite-17068
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 319e41272d9c1f0c9176c77865626bd39bc62244
Author: tledkov <tl...@gridgain.com>
AuthorDate: Wed Jun 1 13:25:43 2022 +0300
IGNITE-17068 Sql: Fix AsyncResultSet.fetchNextPage semantics
---
.../apache/ignite/sql/async/AsyncResultSet.java | 23 +++++++
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 75 ++++------------------
.../internal/sql/api/AsyncResultSetImpl.java | 54 +++++++---------
3 files changed, 58 insertions(+), 94 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
index 58da53fe0..2eacedad4 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
@@ -27,6 +27,25 @@ import org.jetbrains.annotations.Nullable;
/**
* Asynchronous result set provides methods for query results processing in asynchronous way.
*
+ * <p>Usage example:
+ * <pre><code>
+ * private CompletionStage<Void> fetchAllRowsInto(
+ * AsyncResultSet resultSet,
+ * List<SqlRow> target
+ * ) {
+ * for (var row : resultSet.currentPage()) {
+ * target.add(row);
+ * }
+ *
+ * if (!resultSet.hasMorePages()) {
+ * return CompletableFuture.completedFuture(null);
+ * }
+ *
+ * return resultSet.fetchNextPage()
+ * .thenCompose(res -> fetchAllRowsInto(res, target));
+ * }
+ * </code></pre>
+ *
* @see ResultSet
*/
public interface AsyncResultSet {
@@ -91,6 +110,10 @@ public interface AsyncResultSet {
/**
* Fetch the next page of results asynchronously.
+ * The future that is completed with the same {@code AsyncResultSet} object.
+ * The current page is changed after the future complete.
+ * The methods {@link #currentPage()}, {@link #currentPageSize()}, {@link #hasMorePages()}
+ * use current page and return consistent results between complete last page future and call {@code fetchNextPage}.
*
* @return Operation future.
* @throws NoRowSetExpectedException if no row set is expected as a query result.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 3d48a8b66..56ce8b2c4 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -29,9 +29,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -238,21 +236,27 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
Session ses = sql.sessionBuilder().defaultPageSize(1).build();
AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM TEST ORDER BY ID").get();
+ var p0 = ars0.currentPage();
AsyncResultSet ars1 = ars0.fetchNextPage().toCompletableFuture().get();
+ var p1 = ars1.currentPage();
AsyncResultSet ars2 = ars1.fetchNextPage().toCompletableFuture().get();
+ var p2 = ars2.currentPage();
AsyncResultSet ars3 = ars1.fetchNextPage().toCompletableFuture().get();
+ var p3 = ars3.currentPage();
AsyncResultSet ars4 = ars0.fetchNextPage().toCompletableFuture().get();
+ var p4 = ars4.currentPage();
- assertSame(ars1, ars4);
- assertSame(ars2, ars3);
+ assertSame(ars0, ars1);
+ assertSame(ars0, ars2);
+ assertSame(ars0, ars3);
+ assertSame(ars0, ars4);
- List<SqlRow> res = Stream.of(ars0, ars1, ars2)
- .map(AsyncResultSet::currentPage)
+ List<SqlRow> res = Stream.of(p0, p1, p2, p3, p4)
.flatMap(p -> StreamSupport.stream(p.spliterator(), false))
.collect(Collectors.toList());
TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT - res.size());
- ars3.fetchNextPage().thenCompose(pageProc).toCompletableFuture().get();
+ ars4.fetchNextPage().thenCompose(pageProc).toCompletableFuture().get();
res.addAll(pageProc.result());
@@ -261,63 +265,6 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
}
}
- @Test
- public void fetchNextPageParallel() throws Exception {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- final List<SqlRow> res = new ArrayList<>();
-
- IgniteSql sql = CLUSTER_NODES.get(0).sql();
- Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
-
- AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM TEST").get();
- StreamSupport.stream(ars0.currentPage().spliterator(), false).forEach(res::add);
-
- AtomicInteger cnt = new AtomicInteger();
- ConcurrentHashMap<Integer, AsyncResultSet> results = new ConcurrentHashMap<>();
-
- IgniteTestUtils.runMultiThreaded(
- () -> {
- AsyncResultSet ars = ars0.fetchNextPage().toCompletableFuture().get();
-
- results.put(cnt.getAndIncrement(), ars);
-
- assertFalse(ars.hasMorePages());
-
- return null;
- },
- 10,
- "test-fetch");
-
- AsyncResultSet ars1 = CollectionUtils.first(results.values());
- StreamSupport.stream(ars1.currentPage().spliterator(), false).forEach(res::add);
-
- // Check that all next page are same.
- results.values().forEach(ars -> assertSame(ars1, ars));
-
- assertThrowsWithCause(
- () -> ars1.fetchNextPage().toCompletableFuture().get(),
- IgniteSqlException.class,
- "There are no more pages"
- );
-
- await(ars0.closeAsync());
-
- // Check results
- Set<Integer> rs = res.stream().map(r -> r.intValue(0)).collect(Collectors.toSet());
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- assertTrue(rs.remove(i), "Results invalid: " + res);
- }
-
- assertTrue(rs.isEmpty());
-
- checkSession(ses);
- }
-
@Test
public void errors() {
IgniteSql sql = CLUSTER_NODES.get(0).sql();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 1a2ee8255..855edd243 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -50,20 +50,16 @@ import org.jetbrains.annotations.Nullable;
*/
public class AsyncResultSetImpl implements AsyncResultSet {
private static final CompletableFuture<? extends AsyncResultSet> HAS_NO_MORE_PAGE_FUTURE =
- CompletableFuture.failedFuture(new IgniteSqlException("There are no more pages."));
+ CompletableFuture.failedFuture(new IgniteSqlException("No more pages."));
private final AsyncSqlCursor<List<Object>> cur;
- private final BatchedResult<List<Object>> batchPage;
+ private volatile BatchedResult<List<Object>> curPage;
private final int pageSize;
private final Runnable closeRun;
- private final Object mux = new Object();
-
- private volatile CompletionStage<? extends AsyncResultSet> next;
-
/**
* Constructor.
*
@@ -71,15 +67,15 @@ public class AsyncResultSetImpl implements AsyncResultSet {
*/
public AsyncResultSetImpl(AsyncSqlCursor<List<Object>> cur, BatchedResult<List<Object>> page, int pageSize, Runnable closeRun) {
this.cur = cur;
- this.batchPage = page;
+ this.curPage = page;
this.pageSize = pageSize;
this.closeRun = closeRun;
assert cur.queryType() == SqlQueryType.QUERY
|| ((cur.queryType() == SqlQueryType.DML || cur.queryType() == SqlQueryType.DDL)
- && batchPage.items().size() == 1
- && batchPage.items().get(0).size() == 1
- && !batchPage.hasMore()) : "Invalid query result: [type=" + cur.queryType() + "res=" + batchPage + ']';
+ && curPage.items().size() == 1
+ && curPage.items().get(0).size() == 1
+ && !curPage.hasMore()) : "Invalid query result: [type=" + cur.queryType() + "res=" + curPage + ']';
}
/** {@inheritDoc} */
@@ -119,9 +115,9 @@ public class AsyncResultSetImpl implements AsyncResultSet {
return -1;
}
- assert batchPage.items().get(0).get(0) instanceof Long : "Invalid DML result: " + batchPage;
+ assert curPage.items().get(0).get(0) instanceof Long : "Invalid DML result: " + curPage;
- return (long) batchPage.items().get(0).get(0);
+ return (long) curPage.items().get(0).get(0);
}
/** {@inheritDoc} */
@@ -131,9 +127,9 @@ public class AsyncResultSetImpl implements AsyncResultSet {
return false;
}
- assert batchPage.items().get(0).get(0) instanceof Boolean : "Invalid DDL result: " + batchPage;
+ assert curPage.items().get(0).get(0) instanceof Boolean : "Invalid DDL result: " + curPage;
- return (boolean) batchPage.items().get(0).get(0);
+ return (boolean) curPage.items().get(0).get(0);
}
/** {@inheritDoc} */
@@ -141,7 +137,9 @@ public class AsyncResultSetImpl implements AsyncResultSet {
public Iterable<SqlRow> currentPage() {
requireResultSet();
- return () -> new TransformingIterator<>(batchPage.items().iterator(), SqlRowImpl::new);
+ final Iterator<List<Object>> it0 = curPage.items().iterator();
+
+ return () -> new TransformingIterator<>(it0, SqlRowImpl::new);
}
/** {@inheritDoc} */
@@ -149,32 +147,28 @@ public class AsyncResultSetImpl implements AsyncResultSet {
public int currentPageSize() {
requireResultSet();
- return batchPage.items().size();
+ return curPage.items().size();
}
/** {@inheritDoc} */
@Override
public CompletionStage<? extends AsyncResultSet> fetchNextPage() {
- if (next == null) {
- synchronized (mux) {
- if (next == null) {
- if (!hasMorePages()) {
- next = HAS_NO_MORE_PAGE_FUTURE;
- } else {
- next = cur.requestNextAsync(pageSize)
- .thenApply(batchRes -> new AsyncResultSetImpl(cur, batchRes, pageSize, closeRun));
- }
- }
- }
- }
+ if (!hasMorePages()) {
+ return HAS_NO_MORE_PAGE_FUTURE;
+ } else {
+ return cur.requestNextAsync(pageSize)
+ .thenApply(page -> {
+ curPage = page;
- return next;
+ return AsyncResultSetImpl.this;
+ });
+ }
}
/** {@inheritDoc} */
@Override
public boolean hasMorePages() {
- return batchPage.hasMore();
+ return curPage.hasMore();
}
/** {@inheritDoc} */