You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2022/06/09 09:02:06 UTC

[ignite-3] branch main updated: IGNITE-16963 SQL API: Add batched DML queries support (#843)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 45619b1c3 IGNITE-16963 SQL API: Add batched DML queries support (#843)
45619b1c3 is described below

commit 45619b1c3bfe1344a04f4c76842a187aba194677
Author: Taras Ledkov <tl...@gridgain.com>
AuthorDate: Thu Jun 9 12:02:02 2022 +0300

    IGNITE-16963 SQL API: Add batched DML queries support (#843)
---
 .../apache/ignite/example/sql/SqlApiExample.java   |   2 +-
 .../apache/ignite/internal/sql/ResultSetImpl.java  |   6 +-
 .../org/apache/ignite/sql/BatchedArguments.java    |   9 ++
 .../main/java/org/apache/ignite/sql/Session.java   |  29 ++++--
 .../{SqlException.java => SqlBatchException.java}  |  35 ++++---
 .../java/org/apache/ignite/sql/SqlException.java   |   7 ++
 .../ignite/internal/client/sql/ClientSession.java  |  17 +---
 .../apache/ignite/client/fakes/FakeSession.java    |  12 +--
 .../internal/testframework/IgniteTestUtils.java    |  54 +++++++++++
 .../internal/sql/api/ItSqlAsynchronousApiTest.java |  76 ++++++++++++++-
 .../internal/sql/api/ItSqlSynchronousApiTest.java  |  75 +++++++++++++-
 .../internal/sql/api/AsyncResultSetImpl.java       |   3 +-
 .../internal/sql/api/IgniteSqlException.java       |  70 -------------
 .../ignite/internal/sql/api/SessionImpl.java       | 108 +++++++++++++++++----
 .../ignite/internal/sql/engine/QueryContext.java   |   2 +-
 .../internal/sql/engine/SqlQueryProcessor.java     |   4 +-
 .../internal/sql/engine/IgniteSqlApiTest.java      |   8 +-
 17 files changed, 369 insertions(+), 148 deletions(-)

diff --git a/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java b/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
index feeb79189..502de66e6 100644
--- a/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
+++ b/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
@@ -113,7 +113,7 @@ public class SqlApiExample {
                                         .add(2, 1, "Jane", "Roe", 2000.0d)
                                         .add(3, 1, "Mary", "Major", 1500.0d)
                                         .add(4, 1, "Richard", "Miles", 1450.0d)))
-                        .asLongStream().sum();
+                        .sum();
 
                 System.out.println("\nAdded accounts: " + rowsAdded);
 
diff --git a/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java b/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java
index 28ae4cd4b..6c774ffe9 100644
--- a/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java
+++ b/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal.sql;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletionStage;
-import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.jetbrains.annotations.Nullable;
@@ -83,7 +83,7 @@ public class ResultSetImpl implements ResultSet {
     @Override
     public boolean hasNext() {
         if (it == null) {
-            throw new IgniteException("There are no results");
+            throw new SqlException("There are no results");
         }
 
         return it.hasNext();
@@ -93,7 +93,7 @@ public class ResultSetImpl implements ResultSet {
     @Override
     public SqlRow next() {
         if (it == null) {
-            throw new IgniteException("There are no results");
+            throw new SqlException("There are no results");
         }
 
         return it.next();
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java b/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java
index 20949084a..e991c49db 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java
@@ -29,6 +29,15 @@ import java.util.List;
  * TODO: add named arguments support.
  */
 public class BatchedArguments extends ArrayList<List<Object>> implements List<List<Object>> {
+    /**
+     * Creates batched arguments.
+     *
+     * @return Batch query arguments.
+     */
+    public static BatchedArguments create() {
+        return new BatchedArguments();
+    }
+
     /**
      * Creates batched arguments.
      *
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/Session.java b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
index fc3753839..fb4a612a6 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/Session.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
@@ -131,8 +131,16 @@ public interface Session extends AutoCloseable {
      * @param dmlQuery DML query template.
      * @param batch Batch of query arguments.
      * @return Number of rows affected by each query in the batch.
+     * @throws SqlBatchException If the batch fails.
      */
-    int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch);
+    default long[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) {
+        // TODO: IGNITE-17135 fix exception handling.
+        try {
+            return executeBatchAsync(transaction, dmlQuery, batch).join();
+        } catch (CompletionException e) {
+            throw new SqlException(e);
+        }
+    }
 
     /**
      * Executes batched SQL query. Only DML queries are supported.
@@ -141,8 +149,9 @@ public interface Session extends AutoCloseable {
      * @param dmlStatement DML statement to execute.
      * @param batch Batch of query arguments.
      * @return Number of rows affected by each query in the batch.
+     * @throws SqlBatchException If the batch fails.
      */
-    int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch);
+    long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch);
 
     /**
      * Executes batched SQL query in an asynchronous way.
@@ -150,10 +159,10 @@ public interface Session extends AutoCloseable {
      * @param transaction Transaction to execute the statement within or {@code null}.
      * @param query SQL query template.
      * @param batch List of batch rows, where each row is a list of statement arguments.
-     * @return Operation future.
-     * @throws SqlException If failed.
+     * @return Operation future completed with number of rows affected by each query in the batch on batch success,
+     *      if the batch fails the future completed with the {@link SqlBatchException}.
      */
-    CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch);
+    CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch);
 
     /**
      * Executes batched SQL query in an asynchronous way.
@@ -161,10 +170,10 @@ public interface Session extends AutoCloseable {
      * @param transaction Transaction to execute the statement within or {@code null}.
      * @param statement SQL statement to execute.
      * @param batch List of batch rows, where each row is a list of statement arguments.
-     * @return Operation future.
-     * @throws SqlException If failed.
+     * @return Operation future completed with number of rows affected by each query in the batch on batch success,
+     *      if the batch fails the future completed with the {@link SqlBatchException}.
      */
-    CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch);
+    CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch);
 
     /**
      * Executes batched SQL query in a reactive way.
@@ -175,7 +184,7 @@ public interface Session extends AutoCloseable {
      * @return Publisher for the number of rows affected by the query.
      * @throws SqlException If failed.
      */
-    Flow.Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch);
+    Flow.Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch);
 
     /**
      * Executes batched SQL query in a reactive way.
@@ -186,7 +195,7 @@ public interface Session extends AutoCloseable {
      * @return Publisher for the number of rows affected by the query.
      * @throws SqlException If failed.
      */
-    Flow.Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch);
+    Flow.Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch);
 
     /**
      * Executes multi-statement SQL query.
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
similarity index 50%
copy from modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
copy to modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
index 85c801481..3ebf0ef08 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
@@ -17,38 +17,37 @@
 
 package org.apache.ignite.sql;
 
-import org.apache.ignite.lang.IgniteException;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.util.ArrayUtils;
 
 /**
- * SQL exception base class.
+ * The subclass of {@link SqlException} thrown when an error occurs during a batch update operation. In addition to the
+ * information provided by {@link SqlException}, a <code>SqlBatchException</code> provides the update
+ * counts for all commands that were executed successfully during the batch update, that is,
+ * all commands that were executed before the error occurred. The order of elements in an array of update counts
+ * corresponds to the order in which commands were added to the batch.
+ *
  */
-public class SqlException extends IgniteException {
-    /**
-     * Creates a new exception with the given error message.
-     *
-     * @param msg Error message.
-     */
-    public SqlException(String msg) {
-        super(msg);
-    }
+public class SqlBatchException extends SqlException {
+    private final long[] updCntrs;
 
     /**
      * Creates a new grid exception with the given throwable as a cause and source of error message.
      *
+     * @param updCntrs Array that describes the outcome of a batch execution.
      * @param cause Non-null throwable cause.
      */
-    public SqlException(Throwable cause) {
+    public SqlBatchException(long[] updCntrs, Throwable cause) {
         super(cause);
+
+        this.updCntrs = updCntrs != null ? updCntrs : ArrayUtils.LONG_EMPTY_ARRAY;
     }
 
     /**
-     * Creates a new exception with the given error message and optional nested exception.
+     * Returns the array that describes the outcome of a batch execution.
      *
-     * @param msg Error message.
-     * @param cause Optional nested exception (can be {@code null}).
+     * @return Array that describes the outcome of a batch execution.
      */
-    public SqlException(String msg, @Nullable Throwable cause) {
-        super(msg, cause);
+    public long[] updateCounters() {
+        return updCntrs;
     }
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java b/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
index 85c801481..76e360b07 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
@@ -24,6 +24,13 @@ import org.jetbrains.annotations.Nullable;
  * SQL exception base class.
  */
 public class SqlException extends IgniteException {
+    /**
+     * Empty constructor for subclasses.
+     */
+    protected SqlException() {
+        // No-op.
+    }
+
     /**
      * Creates a new exception with the given error message.
      *
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
index 455b9dcb1..c0de66705 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
@@ -142,42 +142,35 @@ public class ClientSession implements Session {
 
     /** {@inheritDoc} */
     @Override
-    public int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) {
+    public long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
         // TODO IGNITE-17059.
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
     @Override
-    public int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
         // TODO IGNITE-17059.
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
         // TODO IGNITE-17059.
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
-        // TODO IGNITE-17059.
-        throw new UnsupportedOperationException("Not implemented yet.");
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+    public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
         // TODO IGNITE-17058.
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
     @Override
-    public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
+    public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
         // TODO IGNITE-17058.
         throw new UnsupportedOperationException("Not implemented yet.");
     }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
index bed8390a2..809f6a8d2 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
@@ -99,37 +99,37 @@ public class FakeSession implements Session {
 
     /** {@inheritDoc} */
     @Override
-    public int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) {
+    public long[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) {
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override
-    public int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
+    public long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override
-    public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+    public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override
-    public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
+    public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
         throw new UnsupportedOperationException();
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 53f261f5b..46f9ecc83 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -330,6 +330,60 @@ public final class IgniteTestUtils {
         return false;
     }
 
+    /**
+     * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy
+     * <b>including</b> that throwable itself.
+     *
+     * <p>Note that this method follows includes {@link Throwable#getSuppressed()} into check.
+     *
+     * @param t   Throwable to check.
+     * @param cls Cause classes to check.
+     * @return reference to the cause error if found, otherwise returns {@code null}.
+     */
+    public static <T extends Throwable> T cause(
+            @NotNull Throwable t,
+            @NotNull Class<T> cls
+    ) {
+        return cause(t, cls, null);
+    }
+
+    /**
+     * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy
+     * <b>including</b> that throwable itself.
+     *
+     * <p>Note that this method follows includes {@link Throwable#getSuppressed()} into check.
+     *
+     * @param t   Throwable to check.
+     * @param cls Cause classes to check.
+     * @param msg Message text that should be in cause (if {@code null}, message won't be checked).
+     * @return reference to the cause error if found, otherwise returns {@code null}.
+     */
+    public static <T extends Throwable> T cause(
+            @NotNull Throwable t,
+            @NotNull Class<T> cls,
+            @Nullable String msg
+    ) {
+        for (Throwable th = t; th != null; th = th.getCause()) {
+            if (cls.isAssignableFrom(th.getClass())) {
+                if (msg != null) {
+                    if (th.getMessage() != null && th.getMessage().contains(msg)) {
+                        return (T) th;
+                    } else {
+                        continue;
+                    }
+                }
+
+                return (T) th;
+            }
+
+            if (th.getCause() == th) {
+                break;
+            }
+        }
+
+        return null;
+    }
+
     /**
      * Runs runnable task asyncronously.
      *
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index af0edc6a4..6ddc88719 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.sql.api;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.cause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -28,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
@@ -36,6 +39,7 @@ import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl;
@@ -50,10 +54,13 @@ import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IndexAlreadyExistsException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlBatchException;
 import org.apache.ignite.sql.SqlColumnType;
+import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.table.Table;
@@ -330,7 +337,7 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
         // Multiple statements error.
         {
             CompletableFuture<AsyncResultSet> f = ses.executeAsync(null, "SELECT 1; SELECT 2");
-            assertThrowsWithCause(f::get, IgniteSqlException.class, "Multiple statements aren't allowed");
+            assertThrowsWithCause(f::get, SqlException.class, "Multiple statements aren't allowed");
         }
 
         // Planning error.
@@ -372,13 +379,78 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
 
         assertThrowsWithCause(
                 () -> ses.executeAsync(null, "SELECT ID FROM TEST").get(),
-                IgniteSqlException.class,
+                SqlException.class,
                 "Session is closed"
         );
 
         checkSession(ses);
     }
 
+    @Test
+    public void batch() {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
+
+        BatchedArguments args = BatchedArguments.of(0, 0);
+
+        for (int i = 1; i < ROW_COUNT; ++i) {
+            args.add(i, i);
+        }
+
+        long[] batchRes = ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args).join();
+
+        Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
+
+        // Check that data are inserted OK
+        List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID");
+        IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, res.get(i).get(0)));
+
+        // Check invalid query type
+        assertThrowsWithCause(
+                () -> ses.executeBatchAsync(null, "SELECT * FROM TEST", args).get(),
+                SqlException.class,
+                "Invalid SQL statement type in the batch"
+        );
+
+        assertThrowsWithCause(
+                () -> ses.executeBatchAsync(null, "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL0 INT)", args).get(),
+                SqlException.class,
+                "Invalid SQL statement type in the batch"
+        );
+    }
+
+    @Test
+    public void batchIncomplete() {
+        int err = ROW_COUNT / 2;
+
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
+
+        BatchedArguments args = BatchedArguments.of(0, 0);
+
+        for (int i = 1; i < ROW_COUNT; ++i) {
+            if (i == err) {
+                args.add(1, 1);
+            } else {
+                args.add(i, i);
+            }
+        }
+
+        Throwable ex = assertThrowsWithCause(
+                () -> ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args).join(),
+                SqlBatchException.class
+        );
+        assertTrue(hasCause(ex, IgniteInternalException.class, "Failed to INSERT some keys because they are already in cache"));
+        SqlBatchException batchEx = cause(ex, SqlBatchException.class, null);
+
+        assertEquals(err, batchEx.updateCounters().length);
+        IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i]));
+    }
+
     private static void checkDdl(boolean expectedApplied, Session ses, String sql) throws ExecutionException, InterruptedException {
         CompletableFuture<AsyncResultSet> fut = ses.executeAsync(
                 null,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index 082dfc93c..2643ed179 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -18,14 +18,19 @@
 package org.apache.ignite.internal.sql.api;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.cause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.google.common.collect.Streams;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
 import org.apache.ignite.lang.ColumnAlreadyExistsException;
 import org.apache.ignite.lang.ColumnNotFoundException;
@@ -34,9 +39,12 @@ import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IndexAlreadyExistsException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlBatchException;
+import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.table.Table;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Disabled;
@@ -199,7 +207,7 @@ public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest {
         // Multiple statements error.
         assertThrowsWithCause(
                 () -> ses.execute(null, "SELECT 1; SELECT 2"),
-                IgniteSqlException.class,
+                SqlException.class,
                 "Multiple statements aren't allowed"
         );
 
@@ -218,6 +226,71 @@ public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest {
         );
     }
 
+    @Test
+    public void batch() {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
+
+        BatchedArguments args = BatchedArguments.of(0, 0);
+
+        for (int i = 1; i < ROW_COUNT; ++i) {
+            args.add(i, i);
+        }
+
+        long[] batchRes = ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args);
+
+        Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
+
+        // Check that data are inserted OK
+        List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID");
+        IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, res.get(i).get(0)));
+
+        // Check invalid query type
+        assertThrowsWithCause(
+                () -> ses.executeBatch(null, "SELECT * FROM TEST", args),
+                SqlException.class,
+                "Invalid SQL statement type in the batch"
+        );
+
+        assertThrowsWithCause(
+                () -> ses.executeBatch(null, "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL0 INT)", args),
+                SqlException.class,
+                "Invalid SQL statement type in the batch"
+        );
+    }
+
+    @Test
+    public void batchIncomplete() {
+        int err = ROW_COUNT / 2;
+
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
+
+        BatchedArguments args = BatchedArguments.of(0, 0);
+
+        for (int i = 1; i < ROW_COUNT; ++i) {
+            if (i == err) {
+                args.add(1, 1);
+            } else {
+                args.add(i, i);
+            }
+        }
+
+        Throwable ex = assertThrowsWithCause(
+                () -> ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args),
+                SqlBatchException.class
+        );
+        assertTrue(hasCause(ex, IgniteInternalException.class, "Failed to INSERT some keys because they are already in cache"));
+        SqlBatchException batchEx = cause(ex, SqlBatchException.class, null);
+
+        assertEquals(err, batchEx.updateCounters().length);
+        IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i]));
+    }
+
     private static void checkDdl(boolean expectedApplied, Session ses, String sql) {
         ResultSet res = ses.execute(
                 null,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 5a0703f96..5a6629d47 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.util.TransformingIterator;
 import org.apache.ignite.sql.NoRowSetExpectedException;
 import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.table.Tuple;
@@ -45,7 +46,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public class AsyncResultSetImpl implements AsyncResultSet {
     private static final CompletableFuture<? extends AsyncResultSet> HAS_NO_MORE_PAGE_FUTURE =
-            CompletableFuture.failedFuture(new IgniteSqlException("No more pages."));
+            CompletableFuture.failedFuture(new SqlException("There are no more pages."));
 
     private final AsyncSqlCursor<List<Object>> cur;
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java
deleted file mode 100644
index 72c510f21..000000000
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.api;
-
-import org.apache.ignite.lang.IgniteException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Ignite SQL exception.
- */
-public class IgniteSqlException extends IgniteException {
-    /** Serial version uid. */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * Creates an empty exception.
-     */
-    public IgniteSqlException() {
-        // No-op.
-    }
-
-    /**
-     * Creates a new exception with the given error message.
-     *
-     * @param msg Error message.
-     */
-    public IgniteSqlException(String msg) {
-        super(msg);
-    }
-
-    /**
-     * Creates a new grid exception with the given throwable as a cause and source of error message.
-     *
-     * @param cause Non-null throwable cause.
-     */
-    public IgniteSqlException(Throwable cause) {
-        this(cause.getMessage(), cause);
-    }
-
-    /**
-     * Creates a new exception with the given error message and optional nested exception.
-     *
-     * @param msg   Error message.
-     * @param cause Optional nested exception (can be {@code null}).
-     */
-    public IgniteSqlException(String msg, @Nullable Throwable cause) {
-        super(msg, cause);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String toString() {
-        return getClass() + ": " + getMessage();
-    }
-}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index c3d94e60a..4c75caaf1 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -17,24 +17,35 @@
 
 package org.apache.ignite.internal.sql.api;
 
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.sql.engine.AsyncCursor;
+import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.QueryContext;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.sql.engine.QueryTimeout;
+import org.apache.ignite.internal.sql.engine.QueryValidator;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan.Type;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlBatchException;
+import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.sql.reactive.ReactiveResultSet;
@@ -56,7 +67,7 @@ public class SessionImpl implements Session {
 
     private final int pageSize;
 
-    private final Set<CompletableFuture<AsyncSqlCursor<List<Object>>>> futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private final Set<CompletableFuture<?>> futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     private final Set<AsyncSqlCursor<List<Object>>> cursToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
@@ -87,13 +98,7 @@ public class SessionImpl implements Session {
 
     /** {@inheritDoc} */
     @Override
-    public int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) {
-        throw new UnsupportedOperationException("Not implemented yet.");
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
+    public long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
@@ -131,11 +136,11 @@ public class SessionImpl implements Session {
     @Override
     public SessionBuilder toBuilder() {
         if (!busyLock.enterBusy()) {
-            throw new IgniteSqlException("Session is closed");
+            throw new SqlException("Session is closed");
         }
 
         try {
-            return new SessionBuilderImpl(qryProc, props)
+            return new SessionBuilderImpl(qryProc, new HashMap<>(props))
                     .defaultPageSize(pageSize)
                     .defaultTimeout(timeout, TimeUnit.NANOSECONDS)
                     .defaultSchema(schema);
@@ -148,7 +153,7 @@ public class SessionImpl implements Session {
     @Override
     public CompletableFuture<AsyncResultSet> executeAsync(@Nullable Transaction transaction, String query, @Nullable Object... arguments) {
         if (!busyLock.enterBusy()) {
-            return CompletableFuture.failedFuture(new IgniteSqlException("Session is closed."));
+            return CompletableFuture.failedFuture(new SqlException("Session is closed."));
         }
 
         try {
@@ -163,7 +168,7 @@ public class SessionImpl implements Session {
                     .thenCompose(cur -> {
                         if (!busyLock.enterBusy()) {
                             return cur.closeAsync()
-                                    .thenCompose((v) -> CompletableFuture.failedFuture(new IgniteSqlException("Session is closed")));
+                                    .thenCompose((v) -> CompletableFuture.failedFuture(new SqlException("Session is closed")));
                         }
 
                         try {
@@ -215,13 +220,70 @@ public class SessionImpl implements Session {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new SqlException("Session is closed."));
+        }
+
+        try {
+            QueryContext ctx = QueryContext.of(
+                    transaction,
+                    new QueryTimeout(timeout, TimeUnit.NANOSECONDS),
+                    (QueryValidator) plan -> {
+                        if (plan.type() != Type.DML) {
+                            throw new SqlException("Invalid SQL statement type in the batch [plan=" + plan + ']');
+                        }
+                    }
+            );
+
+            final var counters = new LongArrayList(batch.size());
+            CompletableFuture<Void> tail = CompletableFuture.completedFuture(null);
+            ArrayList<CompletableFuture<Void>> batchFuts = new ArrayList<>(batch.size());
+
+            for (int i = 0; i < batch.size(); ++i) {
+                Object[] args = batch.get(i).toArray();
+
+                final var qryFut = tail
+                        .thenCompose(v -> qryProc.querySingleAsync(ctx, schema, query, args));
+
+                tail = qryFut.thenCompose(cur -> cur.requestNextAsync(1))
+                        .thenAccept(page -> {
+                            validateDmlResult(page);
+
+                            counters.add((long) page.items().get(0).get(0));
+                        })
+                        .whenComplete((v, ex) -> {
+                            if (ex instanceof CancellationException) {
+                                qryFut.cancel(false);
+                            }
+                        });
+
+                batchFuts.add(tail);
+            }
+
+            CompletableFuture<long[]> resFut = tail
+                    .exceptionally((ex) -> {
+                        throw new SqlBatchException(counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY), ex);
+                    })
+                    .thenApply(v -> counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY));
+
+            resFut.whenComplete((cur, ex) -> {
+                if (ex instanceof CancellationException) {
+                    batchFuts.forEach(f -> f.cancel(false));
+                }
+            });
+
+            return resFut;
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
@@ -245,13 +307,13 @@ public class SessionImpl implements Session {
 
     /** {@inheritDoc} */
     @Override
-    public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+    public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
     @Override
-    public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
+    public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
@@ -295,8 +357,20 @@ public class SessionImpl implements Session {
     public static <T> T await(CompletionStage<T> stage) {
         try {
             return stage.toCompletableFuture().get();
+        } catch (ExecutionException e) {
+            throw new IgniteException(e.getCause());
         } catch (Throwable e) {
             throw new IgniteException(e);
         }
     }
+
+    private static void validateDmlResult(AsyncCursor.BatchedResult<List<Object>> page) {
+        if (page == null
+                || page.items() == null
+                || page.items().size() != 1
+                || page.items().get(0).size() != 1
+                || page.hasMore()) {
+            throw new SqlException("Invalid DML results: " + page);
+        }
+    }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java
index ee6c133d6..ab6e408aa 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java
@@ -35,7 +35,7 @@ public class QueryContext implements Context {
      * Constructor.
      *
      * @param params Context params.
-     * */
+     */
     private QueryContext(Object[] params) {
         this.params = params;
     }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 11394e6ef..2492082b8 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -37,7 +37,6 @@ import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.manager.EventListener;
-import org.apache.ignite.internal.sql.api.IgniteSqlException;
 import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
@@ -63,6 +62,7 @@ import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -274,7 +274,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                 )
                 .thenApply(nodes -> {
                     if (nodes.size() > 1) {
-                        throw new IgniteSqlException("Multiple statements aren't allowed.");
+                        throw new SqlException("Multiple statements aren't allowed.");
                     }
 
                     return nodes.get(0);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
index 85f7d2d27..cdfbb80af 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
@@ -142,7 +142,7 @@ public class IgniteSqlApiTest {
         assertFalse(rs.hasRowSet());
 
         // Execute batched DML query.
-        int[] res = sess.executeBatch(null, "INSERT INTO tbl VALUES (?, ?)",
+        long[] res = sess.executeBatch(null, "INSERT INTO tbl VALUES (?, ?)",
                 BatchedArguments.of(2, "str2").add(3, "str3").add(4, "str4"));
 
         assertEquals(3, res.length);
@@ -183,7 +183,7 @@ public class IgniteSqlApiTest {
             assertEquals(1, rs.affectedRows());
 
             // Execute batched DML query.
-            int[] res = sess.executeBatch(tx, "INSERT INTO tbl VALUES (?, ?)",
+            long[] res = sess.executeBatch(tx, "INSERT INTO tbl VALUES (?, ?)",
                     BatchedArguments.of(2, "str2").add(3, "str3").add(4, "str4"));
 
             assertTrue(Arrays.stream(res).allMatch(i -> i == 1));
@@ -470,8 +470,8 @@ public class IgniteSqlApiTest {
 
                     args.forEach(a -> state(ans.getArgument(0)).put((Integer) a.get(0), (String) a.get(1)));
 
-                    int[] res = new int[args.size()];
-                    Arrays.fill(res, 1);
+                    long[] res = new long[args.size()];
+                    Arrays.fill(res, 1L);
                     return res;
                 });