You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2022/06/09 09:02:06 UTC
[ignite-3] branch main updated: IGNITE-16963 SQL API: Add batched DML queries support (#843)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 45619b1c3 IGNITE-16963 SQL API: Add batched DML queries support (#843)
45619b1c3 is described below
commit 45619b1c3bfe1344a04f4c76842a187aba194677
Author: Taras Ledkov <tl...@gridgain.com>
AuthorDate: Thu Jun 9 12:02:02 2022 +0300
IGNITE-16963 SQL API: Add batched DML queries support (#843)
---
.../apache/ignite/example/sql/SqlApiExample.java | 2 +-
.../apache/ignite/internal/sql/ResultSetImpl.java | 6 +-
.../org/apache/ignite/sql/BatchedArguments.java | 9 ++
.../main/java/org/apache/ignite/sql/Session.java | 29 ++++--
.../{SqlException.java => SqlBatchException.java} | 35 ++++---
.../java/org/apache/ignite/sql/SqlException.java | 7 ++
.../ignite/internal/client/sql/ClientSession.java | 17 +---
.../apache/ignite/client/fakes/FakeSession.java | 12 +--
.../internal/testframework/IgniteTestUtils.java | 54 +++++++++++
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 76 ++++++++++++++-
.../internal/sql/api/ItSqlSynchronousApiTest.java | 75 +++++++++++++-
.../internal/sql/api/AsyncResultSetImpl.java | 3 +-
.../internal/sql/api/IgniteSqlException.java | 70 -------------
.../ignite/internal/sql/api/SessionImpl.java | 108 +++++++++++++++++----
.../ignite/internal/sql/engine/QueryContext.java | 2 +-
.../internal/sql/engine/SqlQueryProcessor.java | 4 +-
.../internal/sql/engine/IgniteSqlApiTest.java | 8 +-
17 files changed, 369 insertions(+), 148 deletions(-)
diff --git a/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java b/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
index feeb79189..502de66e6 100644
--- a/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
+++ b/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
@@ -113,7 +113,7 @@ public class SqlApiExample {
.add(2, 1, "Jane", "Roe", 2000.0d)
.add(3, 1, "Mary", "Major", 1500.0d)
.add(4, 1, "Richard", "Miles", 1450.0d)))
- .asLongStream().sum();
+ .sum();
System.out.println("\nAdded accounts: " + rowsAdded);
diff --git a/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java b/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java
index 28ae4cd4b..6c774ffe9 100644
--- a/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java
+++ b/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal.sql;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletionStage;
-import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.jetbrains.annotations.Nullable;
@@ -83,7 +83,7 @@ public class ResultSetImpl implements ResultSet {
@Override
public boolean hasNext() {
if (it == null) {
- throw new IgniteException("There are no results");
+ throw new SqlException("There are no results");
}
return it.hasNext();
@@ -93,7 +93,7 @@ public class ResultSetImpl implements ResultSet {
@Override
public SqlRow next() {
if (it == null) {
- throw new IgniteException("There are no results");
+ throw new SqlException("There are no results");
}
return it.next();
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java b/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java
index 20949084a..e991c49db 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java
@@ -29,6 +29,15 @@ import java.util.List;
* TODO: add named arguments support.
*/
public class BatchedArguments extends ArrayList<List<Object>> implements List<List<Object>> {
+ /**
+ * Creates batched arguments.
+ *
+ * @return Batch query arguments.
+ */
+ public static BatchedArguments create() {
+ return new BatchedArguments();
+ }
+
/**
* Creates batched arguments.
*
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/Session.java b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
index fc3753839..fb4a612a6 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/Session.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
@@ -131,8 +131,16 @@ public interface Session extends AutoCloseable {
* @param dmlQuery DML query template.
* @param batch Batch of query arguments.
* @return Number of rows affected by each query in the batch.
+ * @throws SqlBatchException If the batch fails.
*/
- int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch);
+ default long[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) {
+ // TODO: IGNITE-17135 fix exception handling.
+ try {
+ return executeBatchAsync(transaction, dmlQuery, batch).join();
+ } catch (CompletionException e) {
+ throw new SqlException(e);
+ }
+ }
/**
* Executes batched SQL query. Only DML queries are supported.
@@ -141,8 +149,9 @@ public interface Session extends AutoCloseable {
* @param dmlStatement DML statement to execute.
* @param batch Batch of query arguments.
* @return Number of rows affected by each query in the batch.
+ * @throws SqlBatchException If the batch fails.
*/
- int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch);
+ long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch);
/**
* Executes batched SQL query in an asynchronous way.
@@ -150,10 +159,10 @@ public interface Session extends AutoCloseable {
* @param transaction Transaction to execute the statement within or {@code null}.
* @param query SQL query template.
* @param batch List of batch rows, where each row is a list of statement arguments.
- * @return Operation future.
- * @throws SqlException If failed.
+ * @return Operation future completed with number of rows affected by each query in the batch on batch success,
+ * if the batch fails the future completed with the {@link SqlBatchException}.
*/
- CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch);
+ CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch);
/**
* Executes batched SQL query in an asynchronous way.
@@ -161,10 +170,10 @@ public interface Session extends AutoCloseable {
* @param transaction Transaction to execute the statement within or {@code null}.
* @param statement SQL statement to execute.
* @param batch List of batch rows, where each row is a list of statement arguments.
- * @return Operation future.
- * @throws SqlException If failed.
+ * @return Operation future completed with number of rows affected by each query in the batch on batch success,
+ * if the batch fails the future completed with the {@link SqlBatchException}.
*/
- CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch);
+ CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch);
/**
* Executes batched SQL query in a reactive way.
@@ -175,7 +184,7 @@ public interface Session extends AutoCloseable {
* @return Publisher for the number of rows affected by the query.
* @throws SqlException If failed.
*/
- Flow.Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch);
+ Flow.Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch);
/**
* Executes batched SQL query in a reactive way.
@@ -186,7 +195,7 @@ public interface Session extends AutoCloseable {
* @return Publisher for the number of rows affected by the query.
* @throws SqlException If failed.
*/
- Flow.Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch);
+ Flow.Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch);
/**
* Executes multi-statement SQL query.
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
similarity index 50%
copy from modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
copy to modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
index 85c801481..3ebf0ef08 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
@@ -17,38 +17,37 @@
package org.apache.ignite.sql;
-import org.apache.ignite.lang.IgniteException;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.util.ArrayUtils;
/**
- * SQL exception base class.
+ * The subclass of {@link SqlException} thrown when an error occurs during a batch update operation. In addition to the
+ * information provided by {@link SqlException}, a <code>SqlBatchException</code> provides the update
+ * counts for all commands that were executed successfully during the batch update, that is,
+ * all commands that were executed before the error occurred. The order of elements in an array of update counts
+ * corresponds to the order in which commands were added to the batch.
+ *
*/
-public class SqlException extends IgniteException {
- /**
- * Creates a new exception with the given error message.
- *
- * @param msg Error message.
- */
- public SqlException(String msg) {
- super(msg);
- }
+public class SqlBatchException extends SqlException {
+ private final long[] updCntrs;
/**
* Creates a new grid exception with the given throwable as a cause and source of error message.
*
+ * @param updCntrs Array that describes the outcome of a batch execution.
* @param cause Non-null throwable cause.
*/
- public SqlException(Throwable cause) {
+ public SqlBatchException(long[] updCntrs, Throwable cause) {
super(cause);
+
+ this.updCntrs = updCntrs != null ? updCntrs : ArrayUtils.LONG_EMPTY_ARRAY;
}
/**
- * Creates a new exception with the given error message and optional nested exception.
+ * Returns the array that describes the outcome of a batch execution.
*
- * @param msg Error message.
- * @param cause Optional nested exception (can be {@code null}).
+ * @return Array that describes the outcome of a batch execution.
*/
- public SqlException(String msg, @Nullable Throwable cause) {
- super(msg, cause);
+ public long[] updateCounters() {
+ return updCntrs;
}
}
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java b/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
index 85c801481..76e360b07 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
@@ -24,6 +24,13 @@ import org.jetbrains.annotations.Nullable;
* SQL exception base class.
*/
public class SqlException extends IgniteException {
+ /**
+ * Empty constructor for subclasses.
+ */
+ protected SqlException() {
+ // No-op.
+ }
+
/**
* Creates a new exception with the given error message.
*
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
index 455b9dcb1..c0de66705 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
@@ -142,42 +142,35 @@ public class ClientSession implements Session {
/** {@inheritDoc} */
@Override
- public int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) {
+ public long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
// TODO IGNITE-17059.
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override
- public int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
// TODO IGNITE-17059.
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
// TODO IGNITE-17059.
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
- // TODO IGNITE-17059.
- throw new UnsupportedOperationException("Not implemented yet.");
- }
-
- /** {@inheritDoc} */
- @Override
- public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+ public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
// TODO IGNITE-17058.
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override
- public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
+ public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
// TODO IGNITE-17058.
throw new UnsupportedOperationException("Not implemented yet.");
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
index bed8390a2..809f6a8d2 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
@@ -99,37 +99,37 @@ public class FakeSession implements Session {
/** {@inheritDoc} */
@Override
- public int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) {
+ public long[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override
- public int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
+ public long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override
- public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+ public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override
- public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
+ public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
throw new UnsupportedOperationException();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 53f261f5b..46f9ecc83 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -330,6 +330,60 @@ public final class IgniteTestUtils {
return false;
}
+ /**
+ * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy
+ * <b>including</b> that throwable itself.
+ *
+ * <p>Note that this method follows includes {@link Throwable#getSuppressed()} into check.
+ *
+ * @param t Throwable to check.
+ * @param cls Cause classes to check.
+ * @return reference to the cause error if found, otherwise returns {@code null}.
+ */
+ public static <T extends Throwable> T cause(
+ @NotNull Throwable t,
+ @NotNull Class<T> cls
+ ) {
+ return cause(t, cls, null);
+ }
+
+ /**
+ * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy
+ * <b>including</b> that throwable itself.
+ *
+ * <p>Note that this method follows includes {@link Throwable#getSuppressed()} into check.
+ *
+ * @param t Throwable to check.
+ * @param cls Cause classes to check.
+ * @param msg Message text that should be in cause (if {@code null}, message won't be checked).
+ * @return reference to the cause error if found, otherwise returns {@code null}.
+ */
+ public static <T extends Throwable> T cause(
+ @NotNull Throwable t,
+ @NotNull Class<T> cls,
+ @Nullable String msg
+ ) {
+ for (Throwable th = t; th != null; th = th.getCause()) {
+ if (cls.isAssignableFrom(th.getClass())) {
+ if (msg != null) {
+ if (th.getMessage() != null && th.getMessage().contains(msg)) {
+ return (T) th;
+ } else {
+ continue;
+ }
+ }
+
+ return (T) th;
+ }
+
+ if (th.getCause() == th) {
+ break;
+ }
+ }
+
+ return null;
+ }
+
/**
* Runs runnable task asyncronously.
*
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index af0edc6a4..6ddc88719 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.sql.api;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.cause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -28,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -36,6 +39,7 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl;
@@ -50,10 +54,13 @@ import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSetMetadata;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlBatchException;
import org.apache.ignite.sql.SqlColumnType;
+import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.table.Table;
@@ -330,7 +337,7 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
// Multiple statements error.
{
CompletableFuture<AsyncResultSet> f = ses.executeAsync(null, "SELECT 1; SELECT 2");
- assertThrowsWithCause(f::get, IgniteSqlException.class, "Multiple statements aren't allowed");
+ assertThrowsWithCause(f::get, SqlException.class, "Multiple statements aren't allowed");
}
// Planning error.
@@ -372,13 +379,78 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
assertThrowsWithCause(
() -> ses.executeAsync(null, "SELECT ID FROM TEST").get(),
- IgniteSqlException.class,
+ SqlException.class,
"Session is closed"
);
checkSession(ses);
}
+ @Test
+ public void batch() {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
+
+ BatchedArguments args = BatchedArguments.of(0, 0);
+
+ for (int i = 1; i < ROW_COUNT; ++i) {
+ args.add(i, i);
+ }
+
+ long[] batchRes = ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args).join();
+
+ Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
+
+ // Check that data are inserted OK
+ List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID");
+ IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, res.get(i).get(0)));
+
+ // Check invalid query type
+ assertThrowsWithCause(
+ () -> ses.executeBatchAsync(null, "SELECT * FROM TEST", args).get(),
+ SqlException.class,
+ "Invalid SQL statement type in the batch"
+ );
+
+ assertThrowsWithCause(
+ () -> ses.executeBatchAsync(null, "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL0 INT)", args).get(),
+ SqlException.class,
+ "Invalid SQL statement type in the batch"
+ );
+ }
+
+ @Test
+ public void batchIncomplete() {
+ int err = ROW_COUNT / 2;
+
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
+
+ BatchedArguments args = BatchedArguments.of(0, 0);
+
+ for (int i = 1; i < ROW_COUNT; ++i) {
+ if (i == err) {
+ args.add(1, 1);
+ } else {
+ args.add(i, i);
+ }
+ }
+
+ Throwable ex = assertThrowsWithCause(
+ () -> ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args).join(),
+ SqlBatchException.class
+ );
+ assertTrue(hasCause(ex, IgniteInternalException.class, "Failed to INSERT some keys because they are already in cache"));
+ SqlBatchException batchEx = cause(ex, SqlBatchException.class, null);
+
+ assertEquals(err, batchEx.updateCounters().length);
+ IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i]));
+ }
+
private static void checkDdl(boolean expectedApplied, Session ses, String sql) throws ExecutionException, InterruptedException {
CompletableFuture<AsyncResultSet> fut = ses.executeAsync(
null,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index 082dfc93c..2643ed179 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -18,14 +18,19 @@
package org.apache.ignite.internal.sql.api;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.cause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.Streams;
+import java.util.Arrays;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
import org.apache.ignite.lang.ColumnAlreadyExistsException;
import org.apache.ignite.lang.ColumnNotFoundException;
@@ -34,9 +39,12 @@ import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlBatchException;
+import org.apache.ignite.sql.SqlException;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
@@ -199,7 +207,7 @@ public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest {
// Multiple statements error.
assertThrowsWithCause(
() -> ses.execute(null, "SELECT 1; SELECT 2"),
- IgniteSqlException.class,
+ SqlException.class,
"Multiple statements aren't allowed"
);
@@ -218,6 +226,71 @@ public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest {
);
}
+ @Test
+ public void batch() {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
+
+ BatchedArguments args = BatchedArguments.of(0, 0);
+
+ for (int i = 1; i < ROW_COUNT; ++i) {
+ args.add(i, i);
+ }
+
+ long[] batchRes = ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args);
+
+ Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
+
+ // Check that data are inserted OK
+ List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID");
+ IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, res.get(i).get(0)));
+
+ // Check invalid query type
+ assertThrowsWithCause(
+ () -> ses.executeBatch(null, "SELECT * FROM TEST", args),
+ SqlException.class,
+ "Invalid SQL statement type in the batch"
+ );
+
+ assertThrowsWithCause(
+ () -> ses.executeBatch(null, "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL0 INT)", args),
+ SqlException.class,
+ "Invalid SQL statement type in the batch"
+ );
+ }
+
+ @Test
+ public void batchIncomplete() {
+ int err = ROW_COUNT / 2;
+
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
+
+ BatchedArguments args = BatchedArguments.of(0, 0);
+
+ for (int i = 1; i < ROW_COUNT; ++i) {
+ if (i == err) {
+ args.add(1, 1);
+ } else {
+ args.add(i, i);
+ }
+ }
+
+ Throwable ex = assertThrowsWithCause(
+ () -> ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args),
+ SqlBatchException.class
+ );
+ assertTrue(hasCause(ex, IgniteInternalException.class, "Failed to INSERT some keys because they are already in cache"));
+ SqlBatchException batchEx = cause(ex, SqlBatchException.class, null);
+
+ assertEquals(err, batchEx.updateCounters().length);
+ IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i]));
+ }
+
private static void checkDdl(boolean expectedApplied, Session ses, String sql) {
ResultSet res = ses.execute(
null,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 5a0703f96..5a6629d47 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.util.TransformingIterator;
import org.apache.ignite.sql.NoRowSetExpectedException;
import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.table.Tuple;
@@ -45,7 +46,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class AsyncResultSetImpl implements AsyncResultSet {
private static final CompletableFuture<? extends AsyncResultSet> HAS_NO_MORE_PAGE_FUTURE =
- CompletableFuture.failedFuture(new IgniteSqlException("No more pages."));
+ CompletableFuture.failedFuture(new SqlException("There are no more pages."));
private final AsyncSqlCursor<List<Object>> cur;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java
deleted file mode 100644
index 72c510f21..000000000
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.api;
-
-import org.apache.ignite.lang.IgniteException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Ignite SQL exception.
- */
-public class IgniteSqlException extends IgniteException {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /**
- * Creates an empty exception.
- */
- public IgniteSqlException() {
- // No-op.
- }
-
- /**
- * Creates a new exception with the given error message.
- *
- * @param msg Error message.
- */
- public IgniteSqlException(String msg) {
- super(msg);
- }
-
- /**
- * Creates a new grid exception with the given throwable as a cause and source of error message.
- *
- * @param cause Non-null throwable cause.
- */
- public IgniteSqlException(Throwable cause) {
- this(cause.getMessage(), cause);
- }
-
- /**
- * Creates a new exception with the given error message and optional nested exception.
- *
- * @param msg Error message.
- * @param cause Optional nested exception (can be {@code null}).
- */
- public IgniteSqlException(String msg, @Nullable Throwable cause) {
- super(msg, cause);
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- return getClass() + ": " + getMessage();
- }
-}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index c3d94e60a..4c75caaf1 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -17,24 +17,35 @@
package org.apache.ignite.internal.sql.api;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.sql.engine.AsyncCursor;
+import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.QueryContext;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.QueryTimeout;
+import org.apache.ignite.internal.sql.engine.QueryValidator;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan.Type;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlBatchException;
+import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.sql.reactive.ReactiveResultSet;
@@ -56,7 +67,7 @@ public class SessionImpl implements Session {
private final int pageSize;
- private final Set<CompletableFuture<AsyncSqlCursor<List<Object>>>> futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final Set<CompletableFuture<?>> futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<AsyncSqlCursor<List<Object>>> cursToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -87,13 +98,7 @@ public class SessionImpl implements Session {
/** {@inheritDoc} */
@Override
- public int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) {
- throw new UnsupportedOperationException("Not implemented yet.");
- }
-
- /** {@inheritDoc} */
- @Override
- public int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
+ public long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) {
throw new UnsupportedOperationException("Not implemented yet.");
}
@@ -131,11 +136,11 @@ public class SessionImpl implements Session {
@Override
public SessionBuilder toBuilder() {
if (!busyLock.enterBusy()) {
- throw new IgniteSqlException("Session is closed");
+ throw new SqlException("Session is closed");
}
try {
- return new SessionBuilderImpl(qryProc, props)
+ return new SessionBuilderImpl(qryProc, new HashMap<>(props))
.defaultPageSize(pageSize)
.defaultTimeout(timeout, TimeUnit.NANOSECONDS)
.defaultSchema(schema);
@@ -148,7 +153,7 @@ public class SessionImpl implements Session {
@Override
public CompletableFuture<AsyncResultSet> executeAsync(@Nullable Transaction transaction, String query, @Nullable Object... arguments) {
if (!busyLock.enterBusy()) {
- return CompletableFuture.failedFuture(new IgniteSqlException("Session is closed."));
+ return CompletableFuture.failedFuture(new SqlException("Session is closed."));
}
try {
@@ -163,7 +168,7 @@ public class SessionImpl implements Session {
.thenCompose(cur -> {
if (!busyLock.enterBusy()) {
return cur.closeAsync()
- .thenCompose((v) -> CompletableFuture.failedFuture(new IgniteSqlException("Session is closed")));
+ .thenCompose((v) -> CompletableFuture.failedFuture(new SqlException("Session is closed")));
}
try {
@@ -215,13 +220,70 @@ public class SessionImpl implements Session {
/** {@inheritDoc} */
@Override
- public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+ if (!busyLock.enterBusy()) {
+ return CompletableFuture.failedFuture(new SqlException("Session is closed."));
+ }
+
+ try {
+ QueryContext ctx = QueryContext.of(
+ transaction,
+ new QueryTimeout(timeout, TimeUnit.NANOSECONDS),
+ (QueryValidator) plan -> {
+ if (plan.type() != Type.DML) {
+ throw new SqlException("Invalid SQL statement type in the batch [plan=" + plan + ']');
+ }
+ }
+ );
+
+ final var counters = new LongArrayList(batch.size());
+ CompletableFuture<Void> tail = CompletableFuture.completedFuture(null);
+ ArrayList<CompletableFuture<Void>> batchFuts = new ArrayList<>(batch.size());
+
+ for (int i = 0; i < batch.size(); ++i) {
+ Object[] args = batch.get(i).toArray();
+
+ final var qryFut = tail
+ .thenCompose(v -> qryProc.querySingleAsync(ctx, schema, query, args));
+
+ tail = qryFut.thenCompose(cur -> cur.requestNextAsync(1))
+ .thenAccept(page -> {
+ validateDmlResult(page);
+
+ counters.add((long) page.items().get(0).get(0));
+ })
+ .whenComplete((v, ex) -> {
+ if (ex instanceof CancellationException) {
+ qryFut.cancel(false);
+ }
+ });
+
+ batchFuts.add(tail);
+ }
+
+ CompletableFuture<long[]> resFut = tail
+ .exceptionally((ex) -> {
+ throw new SqlBatchException(counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY), ex);
+ })
+ .thenApply(v -> counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY));
+
+ resFut.whenComplete((cur, ex) -> {
+ if (ex instanceof CancellationException) {
+ batchFuts.forEach(f -> f.cancel(false));
+ }
+ });
+
+ return resFut;
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
throw new UnsupportedOperationException("Not implemented yet.");
}
@@ -245,13 +307,13 @@ public class SessionImpl implements Session {
/** {@inheritDoc} */
@Override
- public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
+ public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override
- public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
+ public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) {
throw new UnsupportedOperationException("Not implemented yet.");
}
@@ -295,8 +357,20 @@ public class SessionImpl implements Session {
public static <T> T await(CompletionStage<T> stage) {
try {
return stage.toCompletableFuture().get();
+ } catch (ExecutionException e) {
+ throw new IgniteException(e.getCause());
} catch (Throwable e) {
throw new IgniteException(e);
}
}
+
+ private static void validateDmlResult(AsyncCursor.BatchedResult<List<Object>> page) {
+ if (page == null
+ || page.items() == null
+ || page.items().size() != 1
+ || page.items().get(0).size() != 1
+ || page.hasMore()) {
+ throw new SqlException("Invalid DML results: " + page);
+ }
+ }
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java
index ee6c133d6..ab6e408aa 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java
@@ -35,7 +35,7 @@ public class QueryContext implements Context {
* Constructor.
*
* @param params Context params.
- * */
+ */
private QueryContext(Object[] params) {
this.params = params;
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 11394e6ef..2492082b8 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -37,7 +37,6 @@ import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.manager.EventListener;
-import org.apache.ignite.internal.sql.api.IgniteSqlException;
import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
@@ -63,6 +62,7 @@ import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -274,7 +274,7 @@ public class SqlQueryProcessor implements QueryProcessor {
)
.thenApply(nodes -> {
if (nodes.size() > 1) {
- throw new IgniteSqlException("Multiple statements aren't allowed.");
+ throw new SqlException("Multiple statements aren't allowed.");
}
return nodes.get(0);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
index 85f7d2d27..cdfbb80af 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
@@ -142,7 +142,7 @@ public class IgniteSqlApiTest {
assertFalse(rs.hasRowSet());
// Execute batched DML query.
- int[] res = sess.executeBatch(null, "INSERT INTO tbl VALUES (?, ?)",
+ long[] res = sess.executeBatch(null, "INSERT INTO tbl VALUES (?, ?)",
BatchedArguments.of(2, "str2").add(3, "str3").add(4, "str4"));
assertEquals(3, res.length);
@@ -183,7 +183,7 @@ public class IgniteSqlApiTest {
assertEquals(1, rs.affectedRows());
// Execute batched DML query.
- int[] res = sess.executeBatch(tx, "INSERT INTO tbl VALUES (?, ?)",
+ long[] res = sess.executeBatch(tx, "INSERT INTO tbl VALUES (?, ?)",
BatchedArguments.of(2, "str2").add(3, "str3").add(4, "str4"));
assertTrue(Arrays.stream(res).allMatch(i -> i == 1));
@@ -470,8 +470,8 @@ public class IgniteSqlApiTest {
args.forEach(a -> state(ans.getArgument(0)).put((Integer) a.get(0), (String) a.get(1)));
- int[] res = new int[args.size()];
- Arrays.fill(res, 1);
+ long[] res = new long[args.size()];
+ Arrays.fill(res, 1L);
return res;
});