You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/04/10 14:55:24 UTC

[ignite] branch master updated: IGNITE-12764: Thin JDBC streaming fails BatchUpdateException if function is used (#7615)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 00cb1ad  IGNITE-12764: Thin JDBC streaming fails BatchUpdateException if function is used (#7615)
00cb1ad is described below

commit 00cb1ad7a3420024b2a4ecdab58d1c1056ea35b6
Author: Dmitriy Sorokin <sb...@gmail.com>
AuthorDate: Fri Apr 10 17:55:05 2020 +0300

    IGNITE-12764: Thin JDBC streaming fails BatchUpdateException if function is used (#7615)
---
 .../thin/JdbcThinStreamingAbstractSelfTest.java    | 49 ++++++++++++++++++++
 .../processors/query/h2/IgniteH2Indexing.java      | 53 ++++++++++++++++++----
 2 files changed, 93 insertions(+), 9 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
index 97e863c..ac7113d 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
@@ -132,6 +133,54 @@ public abstract class JdbcThinStreamingAbstractSelfTest extends JdbcStreamingSel
     }
 
     /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testStreamedBatchedInsertFunctionSuppliedValues() throws Exception {
+        doStreamedInsertFunctionSuppliedValues(true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testStreamedInsertFunctionSuppliedValues() throws Exception {
+        doStreamedInsertFunctionSuppliedValues(false);
+    }
+
+    /**
+     * Inserts data using built-in function for column value.
+     *
+     * @param batch Batch mode flag.
+     * @throws Exception if failed.
+     */
+    private void doStreamedInsertFunctionSuppliedValues(boolean batch) throws Exception {
+        try (Connection conn = createStreamedConnection(false)) {
+            assertStreamingState(true);
+
+            try (PreparedStatement stmt = conn.prepareStatement(
+                "insert into Person(\"id\", \"name\") values (?, RANDOM_UUID())")) {
+                for (int i = 1; i <= 10; i++) {
+                    stmt.setInt(1, i);
+
+                    if (batch)
+                        stmt.addBatch();
+                    else
+                        stmt.execute();
+                }
+
+                if (batch)
+                    stmt.executeBatch();
+            }
+        }
+
+        U.sleep(500);
+
+        for (int i = 1; i <= 10; i++)
+            UUID.fromString(nameForIdInCache(i));
+    }
+
+    /**
      * @throws SQLException if failed.
      */
     @Test
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 48c7149..6a05b2a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -678,17 +678,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         try {
             UpdatePlan plan = dml.plan();
 
-            List<List<?>> planRows = plan.createRows(args != null ? args : X.EMPTY_OBJECT_ARRAY);
+            Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(updateQueryRows(schemaName, plan, args),
+                objectContext(), true);
 
-            Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(
-                planRows.iterator(),
-                objectContext(),
-                true
-            );
+            if (!iter.hasNext())
+                return 0;
 
-            if (planRows.size() == 1) {
-                IgniteBiTuple t = plan.processRow(iter.next());
+            IgniteBiTuple<?, ?> t = plan.processRow(iter.next());
 
+            if (!iter.hasNext()) {
                 streamer.addData(t.getKey(), t.getValue());
 
                 return 1;
@@ -696,10 +694,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             else {
                 Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
 
+                rows.put(t.getKey(), t.getValue());
+
                 while (iter.hasNext()) {
                     List<?> row = iter.next();
 
-                    IgniteBiTuple t = plan.processRow(row);
+                    t = plan.processRow(row);
 
                     rows.put(t.getKey(), t.getValue());
                 }
@@ -720,6 +720,41 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * Calculates rows for update query.
+     *
+     * @param schemaName Schema name.
+     * @param plan Update plan.
+     * @param args Statement arguments.
+     * @return Rows for update.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Iterator<List<?>> updateQueryRows(String schemaName, UpdatePlan plan, Object[] args) throws IgniteCheckedException {
+        Object[] params = args != null ? args : X.EMPTY_OBJECT_ARRAY;
+
+        if (!F.isEmpty(plan.selectQuery())) {
+            SqlFieldsQuery selectQry = new SqlFieldsQuery(plan.selectQuery())
+                .setArgs(params)
+                .setLocal(true);
+
+            QueryParserResult selectParseRes = parser.parse(schemaName, selectQry, false);
+
+            GridQueryFieldsResult res = executeSelectLocal(
+                selectParseRes.queryDescriptor(),
+                selectParseRes.queryParameters(),
+                selectParseRes.select(),
+                null,
+                null,
+                null,
+                false,
+                0
+            );
+
+            return res.iterator();
+        } else
+            return plan.createRows(params).iterator();
+    }
+
+    /**
      * Parse statement for streamer.
      *
      * @param schemaName Schema name.