You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2022/06/01 15:51:32 UTC

[ignite-3] branch main updated: IGNITE-16965 SQL API: Implement synchronous SQL API (#830)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 36d8e99c0 IGNITE-16965 SQL API: Implement synchronous SQL API (#830)
36d8e99c0 is described below

commit 36d8e99c0b4882776a15d3130b2e204944425cc6
Author: Taras Ledkov <tl...@gridgain.com>
AuthorDate: Wed Jun 1 18:51:28 2022 +0300

    IGNITE-16965 SQL API: Implement synchronous SQL API (#830)
---
 .../main/java/org/apache/ignite/sql/ResultSet.java |  15 +-
 .../main/java/org/apache/ignite/sql/Session.java   |   4 +
 .../internal/sql/api/ItSqlSynchronousApiTest.java  | 240 +++++++++++++++++++++
 .../ignite/internal/sql/api/ResultSetImpl.java     | 146 +++++++++++++
 .../ignite/internal/sql/api/SessionImpl.java       |  22 +-
 .../internal/sql/engine/IgniteSqlApiTest.java      |  60 ++++--
 6 files changed, 461 insertions(+), 26 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
index f8f389f68..4fc75a7bd 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.sql;
 
+import java.util.Iterator;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -25,10 +26,13 @@ import org.jetbrains.annotations.Nullable;
  * <p>All the rows in result set have the same structure described in {@link ResultSetMetadata}.
  * ResultSet must be closed after usage to free resources.
  *
+ * <p>The class and his methods are not thread-safe. If more than one thread use the result set object
+ * please use external synchronization on iterator methods.
+ *
  * <p>Note: one and only one of following is possible: {@link #hasRowSet()} returns {@code true}, or {@link #wasApplied()} returns
  * {@code true}, or {@link #affectedRows()} return zero or higher value.
  */
-public interface ResultSet extends Iterable<SqlRow>, AutoCloseable {
+public interface ResultSet extends Iterator<SqlRow>, AutoCloseable {
     /**
      * Returns metadata for the results if the result contains rows ({@link #hasRowSet()} returns {@code true}).
      *
@@ -39,7 +43,8 @@ public interface ResultSet extends Iterable<SqlRow>, AutoCloseable {
     /**
      * Returns whether the result of the query execution is a collection of rows, or not.
      *
-     * <p>Note: when returns {@code false}, then calling {@link #iterator()} will failed, and either {@link #affectedRows()} return number
+     * <p>Note: when returns {@code false}, then calling {@link #hasNext()}, {@link #next()} will fail,
+     * and either {@link #affectedRows()} return number
      * of affected rows or {@link #wasApplied()} returns {@code true}.
      *
      * @return {@code True} if the query returns rows, {@code false} otherwise.
@@ -67,4 +72,10 @@ public interface ResultSet extends Iterable<SqlRow>, AutoCloseable {
      * @return {@code True} if conditional query applied, {@code false} otherwise.
      */
     boolean wasApplied();
+
+    /**
+     * Invalidates result set and cleanup remote resources.
+     */
+    @Override
+    void close();
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/Session.java b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
index ac60a45e3..d53d712a0 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/Session.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
@@ -238,6 +238,8 @@ public interface Session extends AutoCloseable {
 
     /**
      * Creates a new session builder from current session.
+     *
+     * @return Session builder instance.
      */
     SessionBuilder toBuilder();
 
@@ -258,6 +260,7 @@ public interface Session extends AutoCloseable {
          *
          * @param timeout Query timeout value.
          * @param timeUnit Timeunit.
+         * @return {@code this} for chaining.
          */
         SessionBuilder defaultTimeout(long timeout, TimeUnit timeUnit);
 
@@ -275,6 +278,7 @@ public interface Session extends AutoCloseable {
          * text, to their canonical names.
          *
          * @param schema Default schema.
+         * @return {@code this} for chaining.
          */
         SessionBuilder defaultSchema(String schema);
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
new file mode 100644
index 000000000..6f4f75c46
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.common.collect.Streams;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.lang.ColumnAlreadyExistsException;
+import org.apache.ignite.lang.ColumnNotFoundException;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.TableAlreadyExistsException;
+import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Tests for synchronous SQL API.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-15655")
+public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest {
+    private static final int ROW_COUNT = 16;
+
+    /**
+     * Clear tables after each test.
+     *
+     * @param testInfo Test information oject.
+     * @throws Exception If failed.
+     */
+    @AfterEach
+    @Override
+    public void tearDown(TestInfo testInfo) throws Exception {
+        for (Table t : CLUSTER_NODES.get(0).tables().tables()) {
+            sql("DROP TABLE " + t.name());
+        }
+
+        super.tearDownBase(testInfo);
+    }
+
+    @Test
+    public void ddl() throws ExecutionException, InterruptedException {
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.createSession();
+
+        // CREATE TABLE
+        checkDdl(true, ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        checkError(
+                TableAlreadyExistsException.class,
+                "Table already exists [name=PUBLIC.TEST]",
+                ses,
+                "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"
+        );
+        checkDdl(false, ses, "CREATE TABLE IF NOT EXISTS TEST(ID INT PRIMARY KEY, VAL VARCHAR)");
+
+        // ADD COLUMN
+        checkDdl(true, ses, "ALTER TABLE TEST ADD COLUMN IF NOT EXISTS VAL1 VARCHAR");
+        checkError(
+                TableNotFoundException.class,
+                "Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
+                ses,
+                "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"
+        );
+        checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR");
+        checkError(
+                ColumnAlreadyExistsException.class,
+                "Column already exists [name=VAL1]",
+                ses,
+                "ALTER TABLE TEST ADD COLUMN VAL1 INT"
+        );
+        checkDdl(false, ses, "ALTER TABLE TEST ADD COLUMN IF NOT EXISTS VAL1 INT");
+
+        // CREATE INDEX
+        checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)");
+        checkError(
+                IndexAlreadyExistsException.class,
+                "Index already exists [name=TEST_IDX]",
+                ses,
+                "CREATE INDEX TEST_IDX ON TEST(VAL1)"
+        );
+        checkDdl(false, ses, "CREATE INDEX IF NOT EXISTS TEST_IDX ON TEST(VAL1)");
+
+        // DROP COLUMNS
+        checkDdl(true, ses, "ALTER TABLE TEST DROP COLUMN VAL1");
+        checkError(
+                TableNotFoundException.class,
+                "Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
+                ses,
+                "ALTER TABLE NOT_EXISTS_TABLE DROP COLUMN VAL1"
+        );
+        checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE DROP COLUMN VAL1");
+        checkError(
+                ColumnNotFoundException.class,
+                "Column 'VAL1' does not exist in table '\"PUBLIC\".\"TEST\"'",
+                ses,
+                "ALTER TABLE TEST DROP COLUMN VAL1"
+        );
+        checkDdl(false, ses, "ALTER TABLE TEST DROP COLUMN IF EXISTS VAL1");
+
+        // DROP TABLE
+        checkDdl(false, ses, "DROP TABLE IF EXISTS NOT_EXISTS_TABLE");
+        checkDdl(true, ses, "DROP TABLE TEST");
+        checkError(
+                TableNotFoundException.class,
+                "Table does not exist [name=PUBLIC.TEST]",
+                ses,
+                "DROP TABLE TEST"
+        );
+    }
+
+    @Test
+    public void dml() throws ExecutionException, InterruptedException {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.createSession();
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
+
+        checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
+    }
+
+    @Test
+    public void select() throws ExecutionException, InterruptedException {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 4).build();
+
+        ResultSet rs = ses.execute(null, "SELECT ID FROM TEST");
+
+        Set<Integer> set = Streams.stream(rs).map(r -> r.intValue(0)).collect(Collectors.toSet());
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            assertTrue(set.remove(i), "Results invalid: " + rs);
+        }
+
+        assertTrue(set.isEmpty());
+    }
+
+    @Test
+    public void errors() {
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
+
+        // Parse error.
+        assertThrowsWithCause(
+                () -> ses.execute(null, "SELECT ID FROM"),
+                IgniteInternalException.class,
+                "Failed to parse query"
+        );
+
+        // Multiple statements error.
+        assertThrowsWithCause(
+                () -> ses.execute(null, "SELECT 1; SELECT 2"),
+                IgniteSqlException.class,
+                "Multiple statements aren't allowed"
+        );
+
+        // Planning error.
+        assertThrowsWithCause(
+                () -> ses.execute(null, "CREATE TABLE TEST (VAL INT)"),
+                IgniteException.class,
+                "Table without PRIMARY KEY is not supported"
+        );
+
+        // Execute error.
+        assertThrowsWithCause(
+                () -> ses.execute(null, "SELECT 1 / ?", 0),
+                ArithmeticException.class,
+                "/ by zero"
+        );
+    }
+
+    private void checkDdl(boolean expectedApplied, Session ses, String sql) {
+        ResultSet res = ses.execute(
+                null,
+                sql
+        );
+
+        assertEquals(expectedApplied, res.wasApplied());
+        assertFalse(res.hasRowSet());
+        assertEquals(-1, res.affectedRows());
+
+        res.close();
+    }
+
+    private void checkError(Class<? extends Throwable> expectedException, String msg, Session ses, String sql, Object... args) {
+        assertThrowsWithCause(() -> ses.execute(null, sql), expectedException, msg);
+    }
+
+    private void checkDml(int expectedAffectedRows, Session ses, String sql, Object... args) {
+        ResultSet res = ses.execute(
+                null,
+                sql,
+                args
+        );
+
+        assertFalse(res.wasApplied());
+        assertFalse(res.hasRowSet());
+        assertEquals(expectedAffectedRows, res.affectedRows());
+
+        res.close();
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/ResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/ResultSetImpl.java
new file mode 100644
index 000000000..17cec1959
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/ResultSetImpl.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletionStage;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Synchronous result set implementation.
+ */
+public class ResultSetImpl implements ResultSet {
+    private final AsyncResultSet ars;
+
+    private final IteratorImpl it;
+
+    /**
+     * Constructor.
+     *
+     * @param ars Asynchronous result set.
+     */
+    public ResultSetImpl(AsyncResultSet ars) {
+        assert ars != null;
+
+        this.ars = ars;
+        it = ars.hasRowSet() ? new IteratorImpl(ars) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable ResultSetMetadata metadata() {
+        return ars.metadata();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean hasRowSet() {
+        return ars.hasRowSet();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long affectedRows() {
+        return ars.affectedRows();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean wasApplied() {
+        return ars.wasApplied();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() {
+        SessionImpl.await(ars.closeAsync().toCompletableFuture());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean hasNext() {
+        if (it == null) {
+            throw new IgniteSqlException("There are no results");
+        }
+
+        return it.hasNext();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SqlRow next() {
+        if (it == null) {
+            throw new IgniteSqlException("There are no results");
+        }
+
+        return it.next();
+    }
+
+    private static class IteratorImpl implements Iterator<SqlRow> {
+        private AsyncResultSet curRes;
+
+        private CompletionStage<? extends AsyncResultSet> nextPageStage;
+
+        private Iterator<SqlRow> curPage;
+
+        IteratorImpl(AsyncResultSet ars) {
+            curRes = ars;
+
+            advance();
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (curPage.hasNext()) {
+                return true;
+            } else if (nextPageStage != null) {
+                curRes = SessionImpl.await(nextPageStage);
+
+                advance();
+
+                return curPage.hasNext();
+            } else {
+                return false;
+            }
+        }
+
+        private void advance() {
+            curPage = curRes.currentPage().iterator();
+
+            if (curRes.hasMorePages()) {
+                nextPageStage = curRes.fetchNextPage();
+            } else {
+                nextPageStage = null;
+            }
+        }
+
+        @Override
+        public SqlRow next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+
+            return curPage.next();
+        }
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index 9e325992d..f58b10c68 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.TimeUnit;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.sql.engine.QueryContext;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.sql.engine.QueryTimeout;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
@@ -87,7 +89,7 @@ public class SessionImpl implements Session {
     /** {@inheritDoc} */
     @Override
     public ResultSet execute(@Nullable Transaction transaction, String query, @Nullable Object... arguments) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return new ResultSetImpl(await(executeAsync(transaction, query, arguments)));
     }
 
     /** {@inheritDoc} */
@@ -269,7 +271,7 @@ public class SessionImpl implements Session {
     /** {@inheritDoc} */
     @Override
     public void close() {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        await(closeAsync());
     }
 
     /** {@inheritDoc} */
@@ -294,4 +296,20 @@ public class SessionImpl implements Session {
     public Publisher<Void> closeReactive() {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
+
+    /**
+     * Awaits completion of the given stage and returns its result.
+     *
+     * @param stage The stage.
+     * @param <T> Type of the result returned by the stage.
+     * @return A result of the stage.
+     */
+    @SuppressWarnings("UnusedReturnValue")
+    public static <T> T await(CompletionStage<T> stage) {
+        try {
+            return stage.toCompletableFuture().get();
+        } catch (Throwable e) {
+            throw new IgniteException(e);
+        }
+    }
 }
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
index 0f54fa273..85f7d2d27 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
@@ -155,10 +155,11 @@ public class IgniteSqlApiTest {
         assertFalse(rs.wasApplied());
         assertEquals(-1L, rs.affectedRows());
 
-        assertTrue(rs.iterator().hasNext());
-        for (SqlRow r : rs) {
-            assertEquals("str" + r.intValue("id"), r.stringValue("val"));
-        }
+        assertTrue(rs.hasNext());
+
+        Streams.stream(rs).forEach(r ->
+                assertEquals("str" + r.intValue("id"), r.stringValue("val"))
+        );
 
         // Execute DML.
         rs = sess.execute(null, "DELETE FROM tbl");
@@ -190,15 +191,16 @@ public class IgniteSqlApiTest {
             // Execute query in TX.
             rs = sess.execute(tx, "SELECT id, val FROM tbl WHERE id > ?", 0);
 
-            assertTrue(rs.iterator().hasNext());
-            for (SqlRow r : rs) {
-                assertEquals("str" + r.intValue("id"), r.stringValue("val"));
-            }
+            assertTrue(rs.hasNext());
+
+            Streams.stream(rs).forEach(r ->
+                    assertEquals("str" + r.intValue("id"), r.stringValue("val"))
+            );
 
             // Execute query outside TX in the same session.
             rs = sess.execute(null, "SELECT id, val FROM tbl WHERE id > ?", 1);
 
-            assertFalse(rs.iterator().hasNext()); // No data found before TX is commited.
+            assertFalse(rs.hasNext()); // No data found before TX is commited.
 
             tx.commit();
 
@@ -227,14 +229,14 @@ public class IgniteSqlApiTest {
         }).join();
 
         ResultSet rs = sess.execute(null, "SELECT id, val FROM tbl WHERE id > ?", 1);
-        assertTrue(rs.iterator().hasNext());
+        assertTrue(rs.hasNext());
 
         igniteTx.beginAsync().thenAccept(tx -> {
             // Execute in TX.
             tbl.putAsync(tx, Tuple.create().set("id", 3), Tuple.create().set("val", "NewValue"))
                     .thenApply(f -> {
                         ResultSet rs0 = sess.execute(tx, "SELECT id, val FROM tbl WHERE id > ?", 2);
-                        assertTrue(rs0.iterator().hasNext());
+                        assertTrue(rs0.hasNext());
 
                         return f;
                     })
@@ -244,7 +246,7 @@ public class IgniteSqlApiTest {
         }).join();
 
         rs = sess.execute(null, "SELECT id, val FROM tbl WHERE id > ?", 2);
-        assertFalse(rs.iterator().hasNext());
+        assertFalse(rs.hasNext());
     }
 
     @Test
@@ -260,7 +262,12 @@ public class IgniteSqlApiTest {
 
         ResultSet rs = sess.execute(null, "SELECT id, val FROM tbl2");
 
-        assertEquals(2, Streams.stream(rs.iterator()).count());
+        int cnt = 0;
+        for (; rs.hasNext(); rs.next()) {
+            cnt++;
+        }
+
+        assertEquals(2, cnt);
     }
 
     @Test
@@ -344,7 +351,7 @@ public class IgniteSqlApiTest {
         ResultSet rs = igniteSql.createSession()
                 .execute(null, "SELECT id, val FROM tbl");
 
-        SqlRow row = rs.iterator().next();
+        SqlRow row = rs.next();
 
         ResultSetMetadata meta = rs.metadata();
 
@@ -448,7 +455,7 @@ public class IgniteSqlApiTest {
                     state(ans.getArgument(0)).put(ans.getArgument(2), ans.getArgument(3));
 
                     ResultSet res = Mockito.mock(ResultSet.class);
-                    Mockito.when(res.iterator()).thenThrow(AssertionError.class);
+                    Mockito.when(res).thenThrow(AssertionError.class);
                     Mockito.when(res.wasApplied()).thenReturn(false);
                     Mockito.when(res.hasRowSet()).thenReturn(false);
                     Mockito.when(res.affectedRows()).thenReturn(1L);
@@ -479,7 +486,10 @@ public class IgniteSqlApiTest {
                     Transaction txArg = ans.getArgument(0);
                     Integer filterArg = ans.getArgument(2);
 
-                    Mockito.when(res.iterator()).thenReturn(stateAsRowSet(txArg, filterArg).iterator());
+                    final var it = stateAsRowSet(txArg, filterArg).iterator();
+
+                    Mockito.when(res.hasNext()).thenAnswer(ans0 -> it.hasNext());
+                    Mockito.when(res.next()).thenAnswer(ans0 -> it.next());
                     return res;
                 });
 
@@ -498,21 +508,27 @@ public class IgniteSqlApiTest {
         Mockito.when(session.execute(Mockito.isNull(), Mockito.eq("SELECT id, val FROM tbl2")))
                 .thenAnswer(ans -> {
                     ResultSet res = Mockito.mock(ResultSet.class);
+
+                    var tbl2iter = List.of(
+                            createRow(2, "str2").build(),
+                            createRow(3, "str3").build()
+                    ).iterator();
+
                     Mockito.when(res.wasApplied()).thenReturn(false);
                     Mockito.when(res.hasRowSet()).thenReturn(true);
                     Mockito.when(res.affectedRows()).thenReturn(-1L);
-                    Mockito.when(res.iterator())
-                            .thenReturn(List.of(
-                                    createRow(2, "str2").build(),
-                                    createRow(3, "str3").build()
-                            ).iterator());
+                    Mockito.when(res.hasNext()).thenAnswer(ans0 -> {
+                        System.out.println();
+                        return tbl2iter.hasNext();
+                    });
+                    Mockito.when(res.next()).thenAnswer(ans1 -> tbl2iter.next());
                     return res;
                 });
 
         Mockito.when(session.execute(Mockito.isNull(), Mockito.eq("CREATE TABLE IF NOT EXITS tbl (id INT PRIMARY KEY, val VARCHAR)")))
                 .thenAnswer(ans -> {
                     ResultSet res = Mockito.mock(ResultSet.class);
-                    Mockito.when(res.iterator()).thenThrow(AssertionError.class);
+                    Mockito.when(res.hasNext()).thenThrow(AssertionError.class);
                     Mockito.when(res.wasApplied()).thenReturn(true);
                     Mockito.when(res.hasRowSet()).thenReturn(false);
                     Mockito.when(res.affectedRows()).thenReturn(-1L);