You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2022/06/01 10:26:06 UTC

[ignite-3] branch ignite-17068 created (now 319e41272)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a change to branch ignite-17068
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


      at 319e41272 IGNITE-17068 Sql: Fix AsyncResultSet.fetchNextPage semantics

This branch includes the following new commits:

     new 319e41272 IGNITE-17068 Sql: Fix AsyncResultSet.fetchNextPage semantics

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite-3] 01/01: IGNITE-17068 Sql: Fix AsyncResultSet.fetchNextPage semantics

Posted by tl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch ignite-17068
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 319e41272d9c1f0c9176c77865626bd39bc62244
Author: tledkov <tl...@gridgain.com>
AuthorDate: Wed Jun 1 13:25:43 2022 +0300

    IGNITE-17068 Sql: Fix AsyncResultSet.fetchNextPage semantics
---
 .../apache/ignite/sql/async/AsyncResultSet.java    | 23 +++++++
 .../internal/sql/api/ItSqlAsynchronousApiTest.java | 75 ++++------------------
 .../internal/sql/api/AsyncResultSetImpl.java       | 54 +++++++---------
 3 files changed, 58 insertions(+), 94 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
index 58da53fe0..2eacedad4 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
@@ -27,6 +27,25 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Asynchronous result set provides methods for query results processing in asynchronous way.
  *
+ * <p>Usage example:
+ * <pre><code>
+ *      private CompletionStage&ltVoid&gt fetchAllRowsInto(
+ *          AsyncResultSet resultSet,
+ *          List&lt;SqlRow&gt; target
+ *      ) {
+ *          for (var row : resultSet.currentPage()) {
+ *              target.add(row);
+ *          }
+ *
+ *          if (!resultSet.hasMorePages()) {
+ *              return CompletableFuture.completedFuture(null);
+ *          }
+ *
+ *           return resultSet.fetchNextPage()
+ *              .thenCompose(res -> fetchAllRowsInto(res, target));
+ *      }
+ * </code></pre>
+ *
  * @see ResultSet
  */
 public interface AsyncResultSet {
@@ -91,6 +110,10 @@ public interface AsyncResultSet {
 
     /**
      * Fetch the next page of results asynchronously.
+     * The future that is completed with the same {@code AsyncResultSet} object.
+     * The current page is changed after the future complete.
+     * The methods {@link #currentPage()}, {@link #currentPageSize()}, {@link #hasMorePages()}
+     * use current page and return consistent results between complete last page future and call {@code fetchNextPage}.
      *
      * @return Operation future.
      * @throws NoRowSetExpectedException if no row set is expected as a query result.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 3d48a8b66..56ce8b2c4 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -29,9 +29,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -238,21 +236,27 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
         Session ses = sql.sessionBuilder().defaultPageSize(1).build();
 
         AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM TEST ORDER BY ID").get();
+        var p0 = ars0.currentPage();
         AsyncResultSet ars1 = ars0.fetchNextPage().toCompletableFuture().get();
+        var p1 = ars1.currentPage();
         AsyncResultSet ars2 = ars1.fetchNextPage().toCompletableFuture().get();
+        var p2 = ars2.currentPage();
         AsyncResultSet ars3 = ars1.fetchNextPage().toCompletableFuture().get();
+        var p3 = ars3.currentPage();
         AsyncResultSet ars4 = ars0.fetchNextPage().toCompletableFuture().get();
+        var p4 = ars4.currentPage();
 
-        assertSame(ars1, ars4);
-        assertSame(ars2, ars3);
+        assertSame(ars0, ars1);
+        assertSame(ars0, ars2);
+        assertSame(ars0, ars3);
+        assertSame(ars0, ars4);
 
-        List<SqlRow> res = Stream.of(ars0, ars1, ars2)
-                .map(AsyncResultSet::currentPage)
+        List<SqlRow> res = Stream.of(p0, p1, p2, p3, p4)
                 .flatMap(p -> StreamSupport.stream(p.spliterator(), false))
                 .collect(Collectors.toList());
 
         TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT - res.size());
-        ars3.fetchNextPage().thenCompose(pageProc).toCompletableFuture().get();
+        ars4.fetchNextPage().thenCompose(pageProc).toCompletableFuture().get();
 
         res.addAll(pageProc.result());
 
@@ -261,63 +265,6 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
         }
     }
 
-    @Test
-    public void fetchNextPageParallel() throws Exception {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        final List<SqlRow> res = new ArrayList<>();
-
-        IgniteSql sql = CLUSTER_NODES.get(0).sql();
-        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
-
-        AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM TEST").get();
-        StreamSupport.stream(ars0.currentPage().spliterator(), false).forEach(res::add);
-
-        AtomicInteger cnt = new AtomicInteger();
-        ConcurrentHashMap<Integer, AsyncResultSet> results = new ConcurrentHashMap<>();
-
-        IgniteTestUtils.runMultiThreaded(
-                () -> {
-                    AsyncResultSet ars = ars0.fetchNextPage().toCompletableFuture().get();
-
-                    results.put(cnt.getAndIncrement(), ars);
-
-                    assertFalse(ars.hasMorePages());
-
-                    return null;
-                },
-                10,
-                "test-fetch");
-
-        AsyncResultSet ars1 = CollectionUtils.first(results.values());
-        StreamSupport.stream(ars1.currentPage().spliterator(), false).forEach(res::add);
-
-        // Check that all next page are same.
-        results.values().forEach(ars -> assertSame(ars1, ars));
-
-        assertThrowsWithCause(
-                () -> ars1.fetchNextPage().toCompletableFuture().get(),
-                IgniteSqlException.class,
-                "There are no more pages"
-        );
-
-        await(ars0.closeAsync());
-
-        // Check results
-        Set<Integer> rs = res.stream().map(r -> r.intValue(0)).collect(Collectors.toSet());
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            assertTrue(rs.remove(i), "Results invalid: " + res);
-        }
-
-        assertTrue(rs.isEmpty());
-
-        checkSession(ses);
-    }
-
     @Test
     public void errors() {
         IgniteSql sql = CLUSTER_NODES.get(0).sql();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 1a2ee8255..855edd243 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -50,20 +50,16 @@ import org.jetbrains.annotations.Nullable;
  */
 public class AsyncResultSetImpl implements AsyncResultSet {
     private static final CompletableFuture<? extends AsyncResultSet> HAS_NO_MORE_PAGE_FUTURE =
-            CompletableFuture.failedFuture(new IgniteSqlException("There are no more pages."));
+            CompletableFuture.failedFuture(new IgniteSqlException("No more pages."));
 
     private final AsyncSqlCursor<List<Object>> cur;
 
-    private final BatchedResult<List<Object>> batchPage;
+    private volatile BatchedResult<List<Object>> curPage;
 
     private final int pageSize;
 
     private final Runnable closeRun;
 
-    private final Object mux = new Object();
-
-    private volatile CompletionStage<? extends AsyncResultSet> next;
-
     /**
      * Constructor.
      *
@@ -71,15 +67,15 @@ public class AsyncResultSetImpl implements AsyncResultSet {
      */
     public AsyncResultSetImpl(AsyncSqlCursor<List<Object>> cur, BatchedResult<List<Object>> page, int pageSize, Runnable closeRun) {
         this.cur = cur;
-        this.batchPage = page;
+        this.curPage = page;
         this.pageSize = pageSize;
         this.closeRun = closeRun;
 
         assert cur.queryType() == SqlQueryType.QUERY
                 || ((cur.queryType() == SqlQueryType.DML || cur.queryType() == SqlQueryType.DDL)
-                && batchPage.items().size() == 1
-                && batchPage.items().get(0).size() == 1
-                && !batchPage.hasMore()) : "Invalid query result: [type=" + cur.queryType() + "res=" + batchPage + ']';
+                && curPage.items().size() == 1
+                && curPage.items().get(0).size() == 1
+                && !curPage.hasMore()) : "Invalid query result: [type=" + cur.queryType() + "res=" + curPage + ']';
     }
 
     /** {@inheritDoc} */
@@ -119,9 +115,9 @@ public class AsyncResultSetImpl implements AsyncResultSet {
             return -1;
         }
 
-        assert batchPage.items().get(0).get(0) instanceof Long : "Invalid DML result: " + batchPage;
+        assert curPage.items().get(0).get(0) instanceof Long : "Invalid DML result: " + curPage;
 
-        return (long) batchPage.items().get(0).get(0);
+        return (long) curPage.items().get(0).get(0);
     }
 
     /** {@inheritDoc} */
@@ -131,9 +127,9 @@ public class AsyncResultSetImpl implements AsyncResultSet {
             return false;
         }
 
-        assert batchPage.items().get(0).get(0) instanceof Boolean : "Invalid DDL result: " + batchPage;
+        assert curPage.items().get(0).get(0) instanceof Boolean : "Invalid DDL result: " + curPage;
 
-        return (boolean) batchPage.items().get(0).get(0);
+        return (boolean) curPage.items().get(0).get(0);
     }
 
     /** {@inheritDoc} */
@@ -141,7 +137,9 @@ public class AsyncResultSetImpl implements AsyncResultSet {
     public Iterable<SqlRow> currentPage() {
         requireResultSet();
 
-        return () -> new TransformingIterator<>(batchPage.items().iterator(), SqlRowImpl::new);
+        final Iterator<List<Object>> it0 = curPage.items().iterator();
+
+        return () -> new TransformingIterator<>(it0, SqlRowImpl::new);
     }
 
     /** {@inheritDoc} */
@@ -149,32 +147,28 @@ public class AsyncResultSetImpl implements AsyncResultSet {
     public int currentPageSize() {
         requireResultSet();
 
-        return batchPage.items().size();
+        return curPage.items().size();
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletionStage<? extends AsyncResultSet> fetchNextPage() {
-        if (next == null) {
-            synchronized (mux) {
-                if (next == null) {
-                    if (!hasMorePages()) {
-                        next = HAS_NO_MORE_PAGE_FUTURE;
-                    } else {
-                        next = cur.requestNextAsync(pageSize)
-                                .thenApply(batchRes -> new AsyncResultSetImpl(cur, batchRes, pageSize, closeRun));
-                    }
-                }
-            }
-        }
+        if (!hasMorePages()) {
+            return HAS_NO_MORE_PAGE_FUTURE;
+        } else {
+            return cur.requestNextAsync(pageSize)
+                    .thenApply(page -> {
+                        curPage = page;
 
-        return next;
+                        return AsyncResultSetImpl.this;
+                    });
+        }
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean hasMorePages() {
-        return batchPage.hasMore();
+        return curPage.hasMore();
     }
 
     /** {@inheritDoc} */