You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/17 16:51:54 UTC
[07/19] ignite git commit: IGNITE-5738: JDBC: add batch support. This
closes #2393.
IGNITE-5738: JDBC: add batch support. This closes #2393.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7781823d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7781823d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7781823d
Branch: refs/heads/ignite-5901
Commit: 7781823d4552bb30efa48758b7473d07c9e8aee3
Parents: 9cfb050
Author: Sergey Kalashnikov <sk...@gridgain.com>
Authored: Thu Aug 17 14:58:38 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Aug 17 14:58:38 2017 +0300
----------------------------------------------------------------------
.../jdbc2/JdbcDeleteStatementSelfTest.java | 22 ++
.../jdbc2/JdbcInsertStatementSelfTest.java | 159 ++++++++++++++
.../jdbc2/JdbcMergeStatementSelfTest.java | 41 ++++
.../jdbc2/JdbcStatementBatchingSelfTest.java | 133 ++++++++++++
.../jdbc2/JdbcUpdateStatementSelfTest.java | 24 +++
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 3 +
.../internal/jdbc2/JdbcBatchUpdateTask.java | 215 +++++++++++++++++++
.../internal/jdbc2/JdbcDatabaseMetadata.java | 2 +-
.../internal/jdbc2/JdbcPreparedStatement.java | 25 ++-
.../ignite/internal/jdbc2/JdbcStatement.java | 59 ++++-
10 files changed, 675 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java
index d55c979..3eec5a0 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.jdbc2;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
@@ -46,4 +47,25 @@ public class JdbcDeleteStatementSelfTest extends JdbcAbstractUpdateStatementSelf
assertFalse(jcache(0).containsKey("p2"));
assertTrue(jcache(0).containsKeys(new HashSet<Object>(Arrays.asList("p1", "p3"))));
}
+
+ /**
+ *
+ */
+ public void testBatch() throws SQLException {
+ PreparedStatement ps = conn.prepareStatement("delete from Person where firstName = ?");
+
+ ps.setString(1, "John");
+
+ ps.addBatch();
+
+ ps.setString(1, "Harry");
+
+ ps.addBatch();
+
+ int[] res = ps.executeBatch();
+
+ assertFalse(jcache(0).containsKey("p1"));
+ assertTrue(jcache(0).containsKeys(new HashSet<Object>(Arrays.asList("p2", "p3"))));
+ assertTrue(Arrays.equals(new int[] {1, 0}, res));
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
index 0e7539f..407d6e2 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.jdbc2;
+import java.sql.BatchUpdateException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -26,6 +27,7 @@ import java.util.HashSet;
import java.util.concurrent.Callable;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.GridTestUtils;
/**
@@ -174,4 +176,161 @@ public class JdbcInsertStatementSelfTest extends JdbcAbstractDmlStatementSelfTes
assertEquals(3, jcache(0).withKeepBinary().getAll(new HashSet<>(Arrays.asList("p1", "p2", "p3"))).size());
}
+
+ /**
+ * @throws SQLException if failed.
+ */
+ public void testBatch() throws SQLException {
+ formBatch(1, 2);
+ formBatch(3, 4);
+
+ int[] res = prepStmt.executeBatch();
+
+ assertTrue(Arrays.equals(new int[] {2, 2}, res));
+ }
+
+ /**
+ * @throws SQLException if failed.
+ */
+ public void testSingleItemBatch() throws SQLException {
+ formBatch(1, 2);
+
+ int[] res = prepStmt.executeBatch();
+
+ assertTrue(Arrays.equals(new int[] {2}, res));
+ }
+
+ /**
+ * @throws SQLException if failed.
+ */
+ public void testSingleItemBatchError() throws SQLException {
+ formBatch(1, 2);
+
+ prepStmt.executeBatch();
+
+ formBatch(1, 2); // Duplicate key
+
+ BatchUpdateException reason = (BatchUpdateException)
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return prepStmt.executeBatch();
+ }
+ },
+ BatchUpdateException.class,
+ "Failed to INSERT some keys because they are already in cache");
+
+ // Check update counts in the exception.
+ assertTrue(F.isEmpty(reason.getUpdateCounts()));
+ }
+
+ /**
+ * @throws SQLException if failed.
+ */
+ public void testErrorAmidstBatch() throws SQLException {
+ formBatch(1, 2);
+ formBatch(3, 1); // Duplicate key
+
+ BatchUpdateException reason = (BatchUpdateException)
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return prepStmt.executeBatch();
+ }
+ },
+ BatchUpdateException.class,
+ "Failed to INSERT some keys because they are already in cache");
+
+ // Check update counts in the exception.
+ int[] counts = reason.getUpdateCounts();
+
+ assertNotNull(counts);
+
+ assertEquals(1, counts.length);
+ assertEquals(2, counts[0]);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClearBatch() throws Exception {
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws SQLException {
+ return prepStmt.executeBatch();
+ }
+ }, SQLException.class, "Batch is empty");
+
+ formBatch(1, 2);
+
+ prepStmt.clearBatch();
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws SQLException {
+ return prepStmt.executeBatch();
+ }
+ }, SQLException.class, "Batch is empty");
+ }
+
+ /**
+ * Form batch on prepared statement.
+ *
+ * @param id1 id for first row.
+ * @param id2 id for second row.
+ * @throws SQLException if failed.
+ */
+ private void formBatch(int id1, int id2) throws SQLException {
+ int[] ids = new int[] { id1, id2 };
+
+ int arg = 0;
+ for (int id: ids) {
+ String key = "p" + id;
+
+ switch (id) {
+ case 1:
+ prepStmt.setString(arg + 1, key);
+ prepStmt.setInt(arg + 2, 1);
+ prepStmt.setString(arg + 3, "John");
+ prepStmt.setString(arg + 4, "White");
+ prepStmt.setInt(arg + 5, 25);
+ prepStmt.setBytes(arg + 6, getBytes("White"));
+
+ break;
+
+ case 2:
+ prepStmt.setString(arg + 1, key);
+ prepStmt.setInt(arg + 2, 2);
+ prepStmt.setString(arg + 3, "Joe");
+ prepStmt.setString(arg + 4, "Black");
+ prepStmt.setInt(arg + 5, 35);
+ prepStmt.setBytes(arg + 6, getBytes("Black"));
+
+ break;
+
+ case 3:
+ prepStmt.setString(arg + 1, key);
+ prepStmt.setInt(arg + 2, 3);
+ prepStmt.setString(arg + 3, "Mike");
+ prepStmt.setString(arg + 4, "Green");
+ prepStmt.setInt(arg + 5, 40);
+ prepStmt.setBytes(arg + 6, getBytes("Green"));
+
+ break;
+
+ case 4:
+ prepStmt.setString(arg + 1, key);
+ prepStmt.setInt(arg + 2, 4);
+ prepStmt.setString(arg + 3, "Leah");
+ prepStmt.setString(arg + 4, "Grey");
+ prepStmt.setInt(arg + 5, 22);
+ prepStmt.setBytes(arg + 6, getBytes("Grey"));
+
+ break;
+
+ default:
+ assert false;
+ }
+
+ arg += 6;
+ }
+
+ prepStmt.addBatch();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
index 1432a78..489bacd 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
@@ -21,6 +21,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
import org.apache.ignite.cache.CachePeekMode;
/**
@@ -143,4 +144,44 @@ public class JdbcMergeStatementSelfTest extends JdbcAbstractDmlStatementSelfTest
assertEquals(false, res);
}
+
+ /**
+ * @throws SQLException if failed.
+ */
+ public void testBatch() throws SQLException {
+ prepStmt.setString(1, "p1");
+ prepStmt.setInt(2, 1);
+ prepStmt.setString(3, "John");
+ prepStmt.setString(4, "White");
+ prepStmt.setInt(5, 25);
+ prepStmt.setBytes(6, getBytes("White"));
+
+ prepStmt.setString(7, "p2");
+ prepStmt.setInt(8, 2);
+ prepStmt.setString(9, "Joe");
+ prepStmt.setString(10, "Black");
+ prepStmt.setInt(11, 35);
+ prepStmt.setBytes(12, getBytes("Black"));
+ prepStmt.addBatch();
+
+ prepStmt.setString(1, "p3");
+ prepStmt.setInt(2, 3);
+ prepStmt.setString(3, "Mike");
+ prepStmt.setString(4, "Green");
+ prepStmt.setInt(5, 40);
+ prepStmt.setBytes(6, getBytes("Green"));
+
+ prepStmt.setString(7, "p4");
+ prepStmt.setInt(8, 4);
+ prepStmt.setString(9, "Leah");
+ prepStmt.setString(10, "Grey");
+ prepStmt.setInt(11, 22);
+ prepStmt.setBytes(12, getBytes("Grey"));
+
+ prepStmt.addBatch();
+
+ int[] res = prepStmt.executeBatch();
+
+ assertTrue(Arrays.equals(new int[] {2, 2}, res));
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java
new file mode 100644
index 0000000..c9169b9
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.BatchUpdateException;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Statement batch test.
+ */
+public class JdbcStatementBatchingSelfTest extends JdbcAbstractDmlStatementSelfTest {
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ jcache(0).clear();
+ }
+
+ /**
+ * @throws SQLException If failed.
+ */
+ public void testDatabaseMetadataBatchSupportFlag() throws SQLException {
+ DatabaseMetaData meta = conn.getMetaData();
+
+ assertNotNull(meta);
+
+ assertTrue(meta.supportsBatchUpdates());
+ }
+
+ /**
+ * @throws SQLException If failed.
+ */
+ public void testBatch() throws SQLException {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.addBatch("INSERT INTO Person(_key, id, firstName, lastName, age, data) " +
+ "VALUES ('p1', 0, 'J', 'W', 250, RAWTOHEX('W'))");
+
+ stmt.addBatch("MERGE INTO Person(_key, id, firstName, lastName, age, data) VALUES " +
+ "('p1', 1, 'John', 'White', 25, RAWTOHEX('White')), " +
+ "('p2', 2, 'Joe', 'Black', 35, RAWTOHEX('Black')), " +
+ "('p3', 0, 'M', 'G', 4, RAWTOHEX('G'))");
+
+ stmt.addBatch("UPDATE Person SET id = 3, firstName = 'Mike', lastName = 'Green', " +
+ "age = 40, data = RAWTOHEX('Green') WHERE _key = 'p3'");
+
+ stmt.addBatch("DELETE FROM Person WHERE _key = 'p1'");
+
+ int[] res = stmt.executeBatch();
+
+ assertEquals(4, res.length);
+ assertEquals(1, res[0]);
+ assertEquals(3, res[1]);
+ assertEquals(1, res[2]);
+ assertEquals(1, res[3]);
+ }
+ }
+
+ /**
+ * @throws SQLException If failed.
+ */
+ public void testErrorAmidstBatch() throws SQLException {
+ BatchUpdateException reason = (BatchUpdateException)
+ GridTestUtils.assertThrows(log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.addBatch("INSERT INTO Person(_key, id, firstName, lastName, age, data) " +
+ "VALUES ('p1', 0, 'J', 'W', 250, RAWTOHEX('W'))");
+
+ stmt.addBatch("UPDATE Person SET id = 3, firstName = 'Mike', lastName = 'Green', " +
+ "age = 40, data = RAWTOHEX('Green') WHERE _key = 'p3'");
+
+ stmt.addBatch("SELECT id FROM Person WHERE _key = 'p1'");
+
+ return stmt.executeBatch();
+ }
+ }
+ },
+ BatchUpdateException.class,
+ "Given statement type does not match that declared by JDBC driver");
+
+ // Check update counts in the exception.
+ int[] counts = reason.getUpdateCounts();
+
+ assertEquals(2, counts.length);
+ assertEquals(1, counts[0]);
+ assertEquals(0, counts[1]);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClearBatch() throws Exception {
+ try (Statement stmt = conn.createStatement()) {
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws SQLException {
+ return stmt.executeBatch();
+ }
+ }, SQLException.class, "Batch is empty");
+
+ stmt.addBatch("INSERT INTO Person(_key, id, firstName, lastName, age, data) " +
+ "VALUES ('p1', 0, 'J', 'W', 250, RAWTOHEX('W'))");
+
+ stmt.clearBatch();
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws SQLException {
+ return stmt.executeBatch();
+ }
+ }, SQLException.class, "Batch is empty");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
index 8ae0e90..07b5587 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.jdbc2;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -47,4 +48,27 @@ public class JdbcUpdateStatementSelfTest extends JdbcAbstractUpdateStatementSelf
assertEquals(Arrays.asList(F.asList("John"), F.asList("Jack"), F.asList("Mike")),
jcache(0).query(new SqlFieldsQuery("select firstName from Person order by _key")).getAll());
}
+
+ /**
+ * @throws SQLException If failed.
+ */
+ public void testBatch() throws SQLException {
+ PreparedStatement ps = conn.prepareStatement("update Person set lastName = concat(firstName, 'son') " +
+ "where firstName = ?");
+
+ ps.setString(1, "John");
+
+ ps.addBatch();
+
+ ps.setString(1, "Harry");
+
+ ps.addBatch();
+
+ int[] res = ps.executeBatch();
+
+ assertEquals(Arrays.asList(F.asList("Johnson"), F.asList("Black"), F.asList("Green")),
+ jcache(0).query(new SqlFieldsQuery("select lastName from Person order by _key")).getAll());
+
+ assertTrue(Arrays.equals(new int[] {1, 0}, res));
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index cf7ee8f..a20002b 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -93,9 +93,12 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDefaultNoOpCacheTest.class));
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcMergeStatementSelfTest.class));
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcBinaryMarshallerMergeStatementSelfTest.class));
+ suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcUpdateStatementSelfTest.class));
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcInsertStatementSelfTest.class));
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcBinaryMarshallerInsertStatementSelfTest.class));
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDeleteStatementSelfTest.class));
+ suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStatementBatchingSelfTest.class));
+
suite.addTest(new TestSuite(JdbcBlobTest.class));
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class));
http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
new file mode 100644
index 0000000..7b4846c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.BatchUpdateException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteJdbcDriver;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+import static java.sql.Statement.SUCCESS_NO_INFO;
+
+/**
+ * Task for SQL batched update statements execution through {@link IgniteJdbcDriver}.
+ */
+class JdbcBatchUpdateTask implements IgniteCallable<int[]> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** Cache name. */
+ private final String cacheName;
+
+ /** Schema name. */
+ private final String schemaName;
+
+ /** SQL command for argument batching. */
+ private final String sql;
+
+ /** Batch of statements. */
+ private final List<String> sqlBatch;
+
+ /** Batch of arguments. */
+ private final List<List<Object>> batchArgs;
+
+ /** Fetch size. */
+ private final int fetchSize;
+
+ /** Local execution flag. */
+ private final boolean loc;
+
+ /** Local query flag. */
+ private final boolean locQry;
+
+ /** Collocated query flag. */
+ private final boolean collocatedQry;
+
+ /** Distributed joins flag. */
+ private final boolean distributedJoins;
+
+ /**
+ * @param ignite Ignite.
+ * @param cacheName Cache name.
+ * @param schemaName Schema name.
+ * @param sql SQL query. {@code null} in case of statement batching.
+ * @param sqlBatch Batch of SQL statements. {@code null} in case of parameter batching.
+ * @param batchArgs Batch of SQL parameters. {@code null} in case of statement batching.
+ * @param loc Local execution flag.
+ * @param fetchSize Fetch size.
+ * @param locQry Local query flag.
+ * @param collocatedQry Collocated query flag.
+ * @param distributedJoins Distributed joins flag.
+ */
+ public JdbcBatchUpdateTask(Ignite ignite, String cacheName, String schemaName, String sql,
+ List<String> sqlBatch, List<List<Object>> batchArgs, boolean loc, int fetchSize,
+ boolean locQry, boolean collocatedQry, boolean distributedJoins) {
+ this.ignite = ignite;
+ this.cacheName = cacheName;
+ this.schemaName = schemaName;
+ this.sql = sql;
+ this.sqlBatch = sqlBatch;
+ this.batchArgs = batchArgs;
+ this.fetchSize = fetchSize;
+ this.loc = loc;
+ this.locQry = locQry;
+ this.collocatedQry = collocatedQry;
+ this.distributedJoins = distributedJoins;
+
+ assert (!F.isEmpty(sql) && !F.isEmpty(batchArgs)) ^ !F.isEmpty(sqlBatch);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] call() throws Exception {
+ IgniteCache<?, ?> cache = ignite.cache(cacheName);
+
+ // Don't create caches on server nodes in order to avoid of data rebalancing.
+ boolean start = ignite.configuration().isClientMode();
+
+ if (cache == null && cacheName == null)
+ cache = ((IgniteKernal)ignite).context().cache().getOrStartPublicCache(start, !loc && locQry);
+
+ if (cache == null) {
+ if (cacheName == null)
+ throw new SQLException("Failed to execute query. No suitable caches found.");
+ else
+ throw new SQLException("Cache not found [cacheName=" + cacheName + ']');
+ }
+
+ int batchSize = F.isEmpty(sql) ? sqlBatch.size() : batchArgs.size();
+
+ int[] updCntrs = new int[batchSize];
+
+ int idx = 0;
+
+ try {
+ if (F.isEmpty(sql)) {
+ for (; idx < batchSize; idx++)
+ updCntrs[idx] = doSingleUpdate(cache, sqlBatch.get(idx), null);
+ }
+ else {
+ for (; idx < batchSize; idx++)
+ updCntrs[idx] = doSingleUpdate(cache, sql, batchArgs.get(idx));
+ }
+ }
+ catch (Exception ex) {
+ throw new BatchUpdateException(Arrays.copyOf(updCntrs, idx), ex);
+ }
+
+ return updCntrs;
+ }
+
+ /**
+ * Performs update.
+ *
+ * @param cache Cache.
+ * @param sqlText SQL text.
+ * @param args Parameters.
+ * @return Update counter.
+ * @throws SQLException If failed.
+ */
+ private Integer doSingleUpdate(IgniteCache<?, ?> cache, String sqlText, List<Object> args) throws SQLException {
+ SqlFieldsQuery qry = new JdbcSqlFieldsQuery(sqlText, false);
+
+ qry.setPageSize(fetchSize);
+ qry.setLocal(locQry);
+ qry.setCollocated(collocatedQry);
+ qry.setDistributedJoins(distributedJoins);
+ qry.setSchema(schemaName);
+ qry.setArgs(args == null ? null : args.toArray());
+
+ QueryCursorImpl<List<?>> qryCursor = (QueryCursorImpl<List<?>>)cache.withKeepBinary().query(qry);
+
+ if (qryCursor.isQuery())
+ throw new SQLException(getError("Query produced result set", qry));
+
+ List<List<?>> rows = qryCursor.getAll();
+
+ if (F.isEmpty(rows))
+ return SUCCESS_NO_INFO;
+
+ if (rows.size() != 1)
+ throw new SQLException(getError("Expected single row for update operation result", qry));
+
+ List<?> row = rows.get(0);
+
+ if (F.isEmpty(row) || row.size() != 1)
+ throw new SQLException(getError("Expected row size of 1 for update operation", qry));
+
+ Object objRes = row.get(0);
+
+ if (!(objRes instanceof Long))
+ throw new SQLException(getError("Unexpected update result type", qry));
+
+ Long longRes = (Long)objRes;
+
+ if (longRes > Integer.MAX_VALUE) {
+ IgniteLogger log = ignite.log();
+
+ if (log != null)
+ log.warning(getError("Query updated row counter (" + longRes + ") exceeds integer range", qry));
+
+ return Integer.MAX_VALUE;
+ }
+
+ return longRes.intValue();
+ }
+
+ /**
+ * Formats error message with query details.
+ *
+ * @param msg Error message.
+ * @param qry Query.
+ * @return Result.
+ */
+ private String getError(String msg, SqlFieldsQuery qry) {
+ return msg + " [qry='" + qry.getSql() + "', params=" + Arrays.deepToString(qry.getArgs()) + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
index 98a2563..b369b0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
@@ -1063,7 +1063,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
/** {@inheritDoc} */
@Override public boolean supportsBatchUpdates() throws SQLException {
- return false;
+ return true;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
index 16030f7..38dfe02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
@@ -39,6 +39,7 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.List;
/**
* JDBC prepared statement implementation.
@@ -50,6 +51,9 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
/** H2's parsed statement to retrieve metadata from. */
PreparedStatement nativeStatement;
+ /** Batch arguments. */
+ private List<List<Object>> batchArgs;
+
/**
* Creates new prepared statement.
*
@@ -66,7 +70,8 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
@Override public void addBatch(String sql) throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Adding new SQL command to batch not supported for prepared statement.");
+ throw new SQLFeatureNotSupportedException("Adding new SQL command to batch is not supported for prepared " +
+ "statement (use addBatch() to add new set of arguments)");
}
/** {@inheritDoc} */
@@ -185,7 +190,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
@Override public void clearBatch() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+ batchArgs = null;
}
/** {@inheritDoc} */
@@ -207,14 +212,26 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
@Override public void addBatch() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+ if (batchArgs == null)
+ batchArgs = new ArrayList<>();
+
+ batchArgs.add(args);
+
+ args = null;
}
/** {@inheritDoc} */
@Override public int[] executeBatch() throws SQLException {
- throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+ ensureNotClosed();
+
+ List<List<Object>> batchArgs = this.batchArgs;
+
+ this.batchArgs = null;
+
+ return doBatchUpdate(sql, null, batchArgs);
}
+
/** {@inheritDoc} */
@Override public void setCharacterStream(int paramIdx, Reader x, int len) throws SQLException {
ensureNotClosed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index 89a80ca..19c20a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -74,7 +74,7 @@ public class JdbcStatement implements Statement {
/** Current updated items count. */
long updateCnt = -1;
- /** Batch statements. */
+ /** Batch of statements. */
private List<String> batch;
/**
@@ -187,7 +187,7 @@ public class JdbcStatement implements Statement {
/**
* @param rows query result.
- * @return update counter, if found
+ * @return update counter, if found.
* @throws SQLException if getting an update counter from result proved to be impossible.
*/
private static long updateCounterFromQueryResult(List<List<?>> rows) throws SQLException {
@@ -461,7 +461,60 @@ public class JdbcStatement implements Statement {
@Override public int[] executeBatch() throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+ List<String> batch = this.batch;
+
+ this.batch = null;
+
+ return doBatchUpdate(null, batch, null);
+ }
+
+ /**
+ * Runs batch of update commands.
+ *
+ * @param command SQL command.
+ * @param batch Batch of SQL commands.
+ * @param batchArgs Batch of SQL parameters.
+ * @return Number of affected rows.
+ * @throws SQLException If failed.
+ */
+ protected int[] doBatchUpdate(String command, List<String> batch, List<List<Object>> batchArgs)
+ throws SQLException {
+ rs = null;
+
+ updateCnt = -1;
+
+ if ((F.isEmpty(command) || F.isEmpty(batchArgs)) && F.isEmpty(batch))
+ throw new SQLException("Batch is empty.");
+
+ Ignite ignite = conn.ignite();
+
+ UUID nodeId = conn.nodeId();
+
+ boolean loc = nodeId == null;
+
+ if (!conn.isDmlSupported())
+ throw new SQLException("Failed to query Ignite: DML operations are supported in versions 1.8.0 and newer");
+
+ JdbcBatchUpdateTask task = new JdbcBatchUpdateTask(loc ? ignite : null, conn.cacheName(),
+ conn.schemaName(), command, batch, batchArgs, loc, getFetchSize(), conn.isLocalQuery(),
+ conn.isCollocatedQuery(), conn.isDistributedJoins());
+
+ try {
+ int[] res = loc ? task.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(task);
+
+ updateCnt = F.isEmpty(res)? -1 : res[res.length - 1];
+
+ return res;
+ }
+ catch (IgniteSQLException e) {
+ throw e.toJdbcException();
+ }
+ catch (SQLException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new SQLException("Failed to query Ignite.", e);
+ }
}
/** {@inheritDoc} */