You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2022/06/01 17:06:44 UTC

[ignite-3] branch main updated: IGNITE-17068 Sql: Fix AsyncResultSet.fetchNextPage semantics (#841)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new da2738b57 IGNITE-17068 Sql: Fix AsyncResultSet.fetchNextPage semantics (#841)
da2738b57 is described below

commit da2738b575c0278bbbb64b85fa89198931faa17f
Author: Taras Ledkov <tl...@gridgain.com>
AuthorDate: Wed Jun 1 20:06:40 2022 +0300

    IGNITE-17068 Sql: Fix AsyncResultSet.fetchNextPage semantics (#841)
---
 .../main/java/org/apache/ignite/sql/Session.java   |  2 +-
 .../main/java/org/apache/ignite/sql/Statement.java | 10 +++
 .../apache/ignite/sql/async/AsyncResultSet.java    | 23 +++++++
 .../internal/sql/api/ItSqlAsynchronousApiTest.java | 75 ++++------------------
 .../internal/sql/api/AsyncResultSetImpl.java       | 54 +++++++---------
 5 files changed, 69 insertions(+), 95 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/sql/Session.java b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
index d53d712a0..315cbb199 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/Session.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
@@ -239,7 +239,7 @@ public interface Session extends AutoCloseable {
     /**
      * Creates a new session builder from current session.
      *
-     * @return Session builder instance.
+     * @return Session builder based on the current session.
      */
     SessionBuilder toBuilder();
 
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/Statement.java b/modules/api/src/main/java/org/apache/ignite/sql/Statement.java
index e97456330..3f73a6ecb 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/Statement.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/Statement.java
@@ -78,6 +78,8 @@ public interface Statement extends AutoCloseable {
 
     /**
      * Creates a new statement builder from current statement.
+     *
+     * @return Statement builder based on the current statement.
      */
     StatementBuilder toBuilder();
 
@@ -95,6 +97,9 @@ public interface Statement extends AutoCloseable {
 
         /**
          * Set SQL statement string.
+         *
+         * @param sql SQL query.
+         * @return {@code this} for chaining.
          */
         StatementBuilder query(String sql);
 
@@ -107,6 +112,9 @@ public interface Statement extends AutoCloseable {
 
         /**
          * Marks current statement as prepared.
+         *
+         * @param prepared if {@code true} marks current statement as prepared.
+         * @return {@code this} for chaining.
          */
         StatementBuilder prepared(boolean prepared);
 
@@ -123,6 +131,7 @@ public interface Statement extends AutoCloseable {
          *
          * @param timeout Query timeout value.
          * @param timeUnit Timeunit.
+         * @return {@code this} for chaining.
          */
         StatementBuilder queryTimeout(long timeout, @NotNull TimeUnit timeUnit);
 
@@ -137,6 +146,7 @@ public interface Statement extends AutoCloseable {
          * Sets default schema for the statement, which the queries will be executed with.
          *
          * @param schema Default schema.
+         * @return {@code this} for chaining.
          */
         StatementBuilder defaultSchema(@NotNull String schema);
 
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
index 58da53fe0..e5671f6c4 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
@@ -27,6 +27,25 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Asynchronous result set provides methods for query results processing in asynchronous way.
  *
+ * <p>Usage example:
+ * <pre><code>
+ *      private CompletionStage&lt;Void&gt; fetchAllRowsInto(
+ *          AsyncResultSet resultSet,
+ *          List&lt;SqlRow&gt; target
+ *      ) {
+ *          for (var row : resultSet.currentPage()) {
+ *              target.add(row);
+ *          }
+ *
+ *          if (!resultSet.hasMorePages()) {
+ *              return CompletableFuture.completedFuture(null);
+ *          }
+ *
+ *           return resultSet.fetchNextPage()
+ *              .thenCompose(res -&gt; fetchAllRowsInto(res, target));
+ *      }
+ * </code></pre>
+ *
  * @see ResultSet
  */
 public interface AsyncResultSet {
@@ -91,6 +110,10 @@ public interface AsyncResultSet {
 
     /**
      * Fetch the next page of results asynchronously.
+     * The future that is completed with the same {@code AsyncResultSet} object.
+     * The current page is changed after the future complete.
+     * The methods {@link #currentPage()}, {@link #currentPageSize()}, {@link #hasMorePages()}
+     * use current page and return consistent results between complete last page future and call {@code fetchNextPage}.
      *
      * @return Operation future.
      * @throws NoRowSetExpectedException if no row set is expected as a query result.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 3d48a8b66..56ce8b2c4 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -29,9 +29,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -238,21 +236,27 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
         Session ses = sql.sessionBuilder().defaultPageSize(1).build();
 
         AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM TEST ORDER BY ID").get();
+        var p0 = ars0.currentPage();
         AsyncResultSet ars1 = ars0.fetchNextPage().toCompletableFuture().get();
+        var p1 = ars1.currentPage();
         AsyncResultSet ars2 = ars1.fetchNextPage().toCompletableFuture().get();
+        var p2 = ars2.currentPage();
         AsyncResultSet ars3 = ars1.fetchNextPage().toCompletableFuture().get();
+        var p3 = ars3.currentPage();
         AsyncResultSet ars4 = ars0.fetchNextPage().toCompletableFuture().get();
+        var p4 = ars4.currentPage();
 
-        assertSame(ars1, ars4);
-        assertSame(ars2, ars3);
+        assertSame(ars0, ars1);
+        assertSame(ars0, ars2);
+        assertSame(ars0, ars3);
+        assertSame(ars0, ars4);
 
-        List<SqlRow> res = Stream.of(ars0, ars1, ars2)
-                .map(AsyncResultSet::currentPage)
+        List<SqlRow> res = Stream.of(p0, p1, p2, p3, p4)
                 .flatMap(p -> StreamSupport.stream(p.spliterator(), false))
                 .collect(Collectors.toList());
 
         TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT - res.size());
-        ars3.fetchNextPage().thenCompose(pageProc).toCompletableFuture().get();
+        ars4.fetchNextPage().thenCompose(pageProc).toCompletableFuture().get();
 
         res.addAll(pageProc.result());
 
@@ -261,63 +265,6 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
         }
     }
 
-    @Test
-    public void fetchNextPageParallel() throws Exception {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        final List<SqlRow> res = new ArrayList<>();
-
-        IgniteSql sql = CLUSTER_NODES.get(0).sql();
-        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
-
-        AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM TEST").get();
-        StreamSupport.stream(ars0.currentPage().spliterator(), false).forEach(res::add);
-
-        AtomicInteger cnt = new AtomicInteger();
-        ConcurrentHashMap<Integer, AsyncResultSet> results = new ConcurrentHashMap<>();
-
-        IgniteTestUtils.runMultiThreaded(
-                () -> {
-                    AsyncResultSet ars = ars0.fetchNextPage().toCompletableFuture().get();
-
-                    results.put(cnt.getAndIncrement(), ars);
-
-                    assertFalse(ars.hasMorePages());
-
-                    return null;
-                },
-                10,
-                "test-fetch");
-
-        AsyncResultSet ars1 = CollectionUtils.first(results.values());
-        StreamSupport.stream(ars1.currentPage().spliterator(), false).forEach(res::add);
-
-        // Check that all next page are same.
-        results.values().forEach(ars -> assertSame(ars1, ars));
-
-        assertThrowsWithCause(
-                () -> ars1.fetchNextPage().toCompletableFuture().get(),
-                IgniteSqlException.class,
-                "There are no more pages"
-        );
-
-        await(ars0.closeAsync());
-
-        // Check results
-        Set<Integer> rs = res.stream().map(r -> r.intValue(0)).collect(Collectors.toSet());
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            assertTrue(rs.remove(i), "Results invalid: " + res);
-        }
-
-        assertTrue(rs.isEmpty());
-
-        checkSession(ses);
-    }
-
     @Test
     public void errors() {
         IgniteSql sql = CLUSTER_NODES.get(0).sql();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 1a2ee8255..855edd243 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -50,20 +50,16 @@ import org.jetbrains.annotations.Nullable;
  */
 public class AsyncResultSetImpl implements AsyncResultSet {
     private static final CompletableFuture<? extends AsyncResultSet> HAS_NO_MORE_PAGE_FUTURE =
-            CompletableFuture.failedFuture(new IgniteSqlException("There are no more pages."));
+            CompletableFuture.failedFuture(new IgniteSqlException("No more pages."));
 
     private final AsyncSqlCursor<List<Object>> cur;
 
-    private final BatchedResult<List<Object>> batchPage;
+    private volatile BatchedResult<List<Object>> curPage;
 
     private final int pageSize;
 
     private final Runnable closeRun;
 
-    private final Object mux = new Object();
-
-    private volatile CompletionStage<? extends AsyncResultSet> next;
-
     /**
      * Constructor.
      *
@@ -71,15 +67,15 @@ public class AsyncResultSetImpl implements AsyncResultSet {
      */
     public AsyncResultSetImpl(AsyncSqlCursor<List<Object>> cur, BatchedResult<List<Object>> page, int pageSize, Runnable closeRun) {
         this.cur = cur;
-        this.batchPage = page;
+        this.curPage = page;
         this.pageSize = pageSize;
         this.closeRun = closeRun;
 
         assert cur.queryType() == SqlQueryType.QUERY
                 || ((cur.queryType() == SqlQueryType.DML || cur.queryType() == SqlQueryType.DDL)
-                && batchPage.items().size() == 1
-                && batchPage.items().get(0).size() == 1
-                && !batchPage.hasMore()) : "Invalid query result: [type=" + cur.queryType() + "res=" + batchPage + ']';
+                && curPage.items().size() == 1
+                && curPage.items().get(0).size() == 1
+                && !curPage.hasMore()) : "Invalid query result: [type=" + cur.queryType() + "res=" + curPage + ']';
     }
 
     /** {@inheritDoc} */
@@ -119,9 +115,9 @@ public class AsyncResultSetImpl implements AsyncResultSet {
             return -1;
         }
 
-        assert batchPage.items().get(0).get(0) instanceof Long : "Invalid DML result: " + batchPage;
+        assert curPage.items().get(0).get(0) instanceof Long : "Invalid DML result: " + curPage;
 
-        return (long) batchPage.items().get(0).get(0);
+        return (long) curPage.items().get(0).get(0);
     }
 
     /** {@inheritDoc} */
@@ -131,9 +127,9 @@ public class AsyncResultSetImpl implements AsyncResultSet {
             return false;
         }
 
-        assert batchPage.items().get(0).get(0) instanceof Boolean : "Invalid DDL result: " + batchPage;
+        assert curPage.items().get(0).get(0) instanceof Boolean : "Invalid DDL result: " + curPage;
 
-        return (boolean) batchPage.items().get(0).get(0);
+        return (boolean) curPage.items().get(0).get(0);
     }
 
     /** {@inheritDoc} */
@@ -141,7 +137,9 @@ public class AsyncResultSetImpl implements AsyncResultSet {
     public Iterable<SqlRow> currentPage() {
         requireResultSet();
 
-        return () -> new TransformingIterator<>(batchPage.items().iterator(), SqlRowImpl::new);
+        final Iterator<List<Object>> it0 = curPage.items().iterator();
+
+        return () -> new TransformingIterator<>(it0, SqlRowImpl::new);
     }
 
     /** {@inheritDoc} */
@@ -149,32 +147,28 @@ public class AsyncResultSetImpl implements AsyncResultSet {
     public int currentPageSize() {
         requireResultSet();
 
-        return batchPage.items().size();
+        return curPage.items().size();
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletionStage<? extends AsyncResultSet> fetchNextPage() {
-        if (next == null) {
-            synchronized (mux) {
-                if (next == null) {
-                    if (!hasMorePages()) {
-                        next = HAS_NO_MORE_PAGE_FUTURE;
-                    } else {
-                        next = cur.requestNextAsync(pageSize)
-                                .thenApply(batchRes -> new AsyncResultSetImpl(cur, batchRes, pageSize, closeRun));
-                    }
-                }
-            }
-        }
+        if (!hasMorePages()) {
+            return HAS_NO_MORE_PAGE_FUTURE;
+        } else {
+            return cur.requestNextAsync(pageSize)
+                    .thenApply(page -> {
+                        curPage = page;
 
-        return next;
+                        return AsyncResultSetImpl.this;
+                    });
+        }
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean hasMorePages() {
-        return batchPage.hasMore();
+        return curPage.hasMore();
     }
 
     /** {@inheritDoc} */