You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/04/10 14:55:24 UTC
[ignite] branch master updated: IGNITE-12764: Thin JDBC streaming
fails BatchUpdateException if function is used (#7615)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 00cb1ad IGNITE-12764: Thin JDBC streaming fails BatchUpdateException if function is used (#7615)
00cb1ad is described below
commit 00cb1ad7a3420024b2a4ecdab58d1c1056ea35b6
Author: Dmitriy Sorokin <sb...@gmail.com>
AuthorDate: Fri Apr 10 17:55:05 2020 +0300
IGNITE-12764: Thin JDBC streaming fails BatchUpdateException if function is used (#7615)
---
.../thin/JdbcThinStreamingAbstractSelfTest.java | 49 ++++++++++++++++++++
.../processors/query/h2/IgniteH2Indexing.java | 53 ++++++++++++++++++----
2 files changed, 93 insertions(+), 9 deletions(-)
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
index 97e863c..ac7113d 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
@@ -132,6 +133,54 @@ public abstract class JdbcThinStreamingAbstractSelfTest extends JdbcStreamingSel
}
/**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testStreamedBatchedInsertFunctionSuppliedValues() throws Exception {
+ doStreamedInsertFunctionSuppliedValues(true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testStreamedInsertFunctionSuppliedValues() throws Exception {
+ doStreamedInsertFunctionSuppliedValues(false);
+ }
+
+ /**
+ * Inserts data using built-in function for column value.
+ *
+ * @param batch Batch mode flag.
+ * @throws Exception if failed.
+ */
+ private void doStreamedInsertFunctionSuppliedValues(boolean batch) throws Exception {
+ try (Connection conn = createStreamedConnection(false)) {
+ assertStreamingState(true);
+
+ try (PreparedStatement stmt = conn.prepareStatement(
+ "insert into Person(\"id\", \"name\") values (?, RANDOM_UUID())")) {
+ for (int i = 1; i <= 10; i++) {
+ stmt.setInt(1, i);
+
+ if (batch)
+ stmt.addBatch();
+ else
+ stmt.execute();
+ }
+
+ if (batch)
+ stmt.executeBatch();
+ }
+ }
+
+ U.sleep(500);
+
+ for (int i = 1; i <= 10; i++)
+ UUID.fromString(nameForIdInCache(i));
+ }
+
+ /**
* @throws SQLException if failed.
*/
@Test
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 48c7149..6a05b2a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -678,17 +678,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
try {
UpdatePlan plan = dml.plan();
- List<List<?>> planRows = plan.createRows(args != null ? args : X.EMPTY_OBJECT_ARRAY);
+ Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(updateQueryRows(schemaName, plan, args),
+ objectContext(), true);
- Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(
- planRows.iterator(),
- objectContext(),
- true
- );
+ if (!iter.hasNext())
+ return 0;
- if (planRows.size() == 1) {
- IgniteBiTuple t = plan.processRow(iter.next());
+ IgniteBiTuple<?, ?> t = plan.processRow(iter.next());
+ if (!iter.hasNext()) {
streamer.addData(t.getKey(), t.getValue());
return 1;
@@ -696,10 +694,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
else {
Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
+ rows.put(t.getKey(), t.getValue());
+
while (iter.hasNext()) {
List<?> row = iter.next();
- IgniteBiTuple t = plan.processRow(row);
+ t = plan.processRow(row);
rows.put(t.getKey(), t.getValue());
}
@@ -720,6 +720,41 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * Calculates rows for update query.
+ *
+ * @param schemaName Schema name.
+ * @param plan Update plan.
+ * @param args Statement arguments.
+ * @return Rows for update.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Iterator<List<?>> updateQueryRows(String schemaName, UpdatePlan plan, Object[] args) throws IgniteCheckedException {
+ Object[] params = args != null ? args : X.EMPTY_OBJECT_ARRAY;
+
+ if (!F.isEmpty(plan.selectQuery())) {
+ SqlFieldsQuery selectQry = new SqlFieldsQuery(plan.selectQuery())
+ .setArgs(params)
+ .setLocal(true);
+
+ QueryParserResult selectParseRes = parser.parse(schemaName, selectQry, false);
+
+ GridQueryFieldsResult res = executeSelectLocal(
+ selectParseRes.queryDescriptor(),
+ selectParseRes.queryParameters(),
+ selectParseRes.select(),
+ null,
+ null,
+ null,
+ false,
+ 0
+ );
+
+ return res.iterator();
+ } else
+ return plan.createRows(params).iterator();
+ }
+
+ /**
* Parse statement for streamer.
*
* @param schemaName Schema name.