You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/17 16:51:54 UTC

[07/19] ignite git commit: IGNITE-5738: JDBC: add batch support. This closes #2393.

IGNITE-5738: JDBC: add batch support. This closes #2393.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7781823d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7781823d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7781823d

Branch: refs/heads/ignite-5901
Commit: 7781823d4552bb30efa48758b7473d07c9e8aee3
Parents: 9cfb050
Author: Sergey Kalashnikov <sk...@gridgain.com>
Authored: Thu Aug 17 14:58:38 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Aug 17 14:58:38 2017 +0300

----------------------------------------------------------------------
 .../jdbc2/JdbcDeleteStatementSelfTest.java      |  22 ++
 .../jdbc2/JdbcInsertStatementSelfTest.java      | 159 ++++++++++++++
 .../jdbc2/JdbcMergeStatementSelfTest.java       |  41 ++++
 .../jdbc2/JdbcStatementBatchingSelfTest.java    | 133 ++++++++++++
 .../jdbc2/JdbcUpdateStatementSelfTest.java      |  24 +++
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |   3 +
 .../internal/jdbc2/JdbcBatchUpdateTask.java     | 215 +++++++++++++++++++
 .../internal/jdbc2/JdbcDatabaseMetadata.java    |   2 +-
 .../internal/jdbc2/JdbcPreparedStatement.java   |  25 ++-
 .../ignite/internal/jdbc2/JdbcStatement.java    |  59 ++++-
 10 files changed, 675 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java
index d55c979..3eec5a0 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.jdbc2;
 
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -46,4 +47,25 @@ public class JdbcDeleteStatementSelfTest extends JdbcAbstractUpdateStatementSelf
         assertFalse(jcache(0).containsKey("p2"));
         assertTrue(jcache(0).containsKeys(new HashSet<Object>(Arrays.asList("p1", "p3"))));
     }
+
+    /**
+     *
+     */
+    public void testBatch() throws SQLException {
+        PreparedStatement ps = conn.prepareStatement("delete from Person where firstName = ?");
+
+        ps.setString(1, "John");
+
+        ps.addBatch();
+
+        ps.setString(1, "Harry");
+
+        ps.addBatch();
+
+        int[] res = ps.executeBatch();
+
+        assertFalse(jcache(0).containsKey("p1"));
+        assertTrue(jcache(0).containsKeys(new HashSet<Object>(Arrays.asList("p2", "p3"))));
+        assertTrue(Arrays.equals(new int[] {1, 0}, res));
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
index 0e7539f..407d6e2 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.jdbc2;
 
+import java.sql.BatchUpdateException;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -26,6 +27,7 @@ import java.util.HashSet;
 import java.util.concurrent.Callable;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.testframework.GridTestUtils;
 
 /**
@@ -174,4 +176,161 @@ public class JdbcInsertStatementSelfTest extends JdbcAbstractDmlStatementSelfTes
 
         assertEquals(3, jcache(0).withKeepBinary().getAll(new HashSet<>(Arrays.asList("p1", "p2", "p3"))).size());
     }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testBatch() throws SQLException {
+        formBatch(1, 2);
+        formBatch(3, 4);
+
+        int[] res = prepStmt.executeBatch();
+
+        assertTrue(Arrays.equals(new int[] {2, 2}, res));
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testSingleItemBatch() throws SQLException {
+        formBatch(1, 2);
+
+        int[] res = prepStmt.executeBatch();
+
+        assertTrue(Arrays.equals(new int[] {2}, res));
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testSingleItemBatchError() throws SQLException {
+        formBatch(1, 2);
+
+        prepStmt.executeBatch();
+
+        formBatch(1, 2); // Duplicate key
+
+        BatchUpdateException reason = (BatchUpdateException)
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    return prepStmt.executeBatch();
+                }
+            },
+            BatchUpdateException.class,
+            "Failed to INSERT some keys because they are already in cache");
+
+        // Check update counts in the exception.
+        assertTrue(F.isEmpty(reason.getUpdateCounts()));
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testErrorAmidstBatch() throws SQLException {
+        formBatch(1, 2);
+        formBatch(3, 1); // Duplicate key
+
+        BatchUpdateException reason = (BatchUpdateException)
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    return prepStmt.executeBatch();
+                }
+            },
+            BatchUpdateException.class,
+            "Failed to INSERT some keys because they are already in cache");
+
+        // Check update counts in the exception.
+        int[] counts = reason.getUpdateCounts();
+
+        assertNotNull(counts);
+
+        assertEquals(1, counts.length);
+        assertEquals(2, counts[0]);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClearBatch() throws Exception {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws SQLException {
+                return prepStmt.executeBatch();
+            }
+        }, SQLException.class, "Batch is empty");
+
+        formBatch(1, 2);
+
+        prepStmt.clearBatch();
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws SQLException {
+                return prepStmt.executeBatch();
+            }
+        }, SQLException.class, "Batch is empty");
+    }
+
+    /**
+     * Form batch on prepared statement.
+     *
+     * @param id1 id for first row.
+     * @param id2 id for second row.
+     * @throws SQLException if failed.
+     */
+    private void formBatch(int id1, int id2) throws SQLException {
+        int[] ids = new int[] { id1, id2 };
+
+        int arg = 0;
+        for (int id: ids) {
+            String key = "p" + id;
+
+            switch (id) {
+                case 1:
+                    prepStmt.setString(arg + 1, key);
+                    prepStmt.setInt(arg + 2, 1);
+                    prepStmt.setString(arg + 3, "John");
+                    prepStmt.setString(arg + 4, "White");
+                    prepStmt.setInt(arg + 5, 25);
+                    prepStmt.setBytes(arg + 6, getBytes("White"));
+
+                    break;
+
+                case 2:
+                    prepStmt.setString(arg + 1, key);
+                    prepStmt.setInt(arg + 2, 2);
+                    prepStmt.setString(arg + 3, "Joe");
+                    prepStmt.setString(arg + 4, "Black");
+                    prepStmt.setInt(arg + 5, 35);
+                    prepStmt.setBytes(arg + 6, getBytes("Black"));
+
+                    break;
+
+                case 3:
+                    prepStmt.setString(arg + 1, key);
+                    prepStmt.setInt(arg + 2, 3);
+                    prepStmt.setString(arg + 3, "Mike");
+                    prepStmt.setString(arg + 4, "Green");
+                    prepStmt.setInt(arg + 5, 40);
+                    prepStmt.setBytes(arg + 6, getBytes("Green"));
+
+                    break;
+
+                case 4:
+                    prepStmt.setString(arg + 1, key);
+                    prepStmt.setInt(arg + 2, 4);
+                    prepStmt.setString(arg + 3, "Leah");
+                    prepStmt.setString(arg + 4, "Grey");
+                    prepStmt.setInt(arg + 5, 22);
+                    prepStmt.setBytes(arg + 6, getBytes("Grey"));
+
+                    break;
+
+                default:
+                    assert false;
+            }
+
+            arg += 6;
+        }
+
+        prepStmt.addBatch();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
index 1432a78..489bacd 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
@@ -21,6 +21,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
 import org.apache.ignite.cache.CachePeekMode;
 
 /**
@@ -143,4 +144,44 @@ public class JdbcMergeStatementSelfTest extends JdbcAbstractDmlStatementSelfTest
 
         assertEquals(false, res);
     }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testBatch() throws SQLException {
+        prepStmt.setString(1, "p1");
+        prepStmt.setInt(2, 1);
+        prepStmt.setString(3, "John");
+        prepStmt.setString(4, "White");
+        prepStmt.setInt(5, 25);
+        prepStmt.setBytes(6, getBytes("White"));
+
+        prepStmt.setString(7, "p2");
+        prepStmt.setInt(8, 2);
+        prepStmt.setString(9, "Joe");
+        prepStmt.setString(10, "Black");
+        prepStmt.setInt(11, 35);
+        prepStmt.setBytes(12, getBytes("Black"));
+        prepStmt.addBatch();
+
+        prepStmt.setString(1, "p3");
+        prepStmt.setInt(2, 3);
+        prepStmt.setString(3, "Mike");
+        prepStmt.setString(4, "Green");
+        prepStmt.setInt(5, 40);
+        prepStmt.setBytes(6, getBytes("Green"));
+
+        prepStmt.setString(7, "p4");
+        prepStmt.setInt(8, 4);
+        prepStmt.setString(9, "Leah");
+        prepStmt.setString(10, "Grey");
+        prepStmt.setInt(11, 22);
+        prepStmt.setBytes(12, getBytes("Grey"));
+
+        prepStmt.addBatch();
+
+        int[] res = prepStmt.executeBatch();
+
+        assertTrue(Arrays.equals(new int[] {2, 2}, res));
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java
new file mode 100644
index 0000000..c9169b9
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.BatchUpdateException;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Statement batch test.
+ */
+public class JdbcStatementBatchingSelfTest extends JdbcAbstractDmlStatementSelfTest {
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        jcache(0).clear();
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testDatabaseMetadataBatchSupportFlag() throws SQLException {
+        DatabaseMetaData meta = conn.getMetaData();
+
+        assertNotNull(meta);
+
+        assertTrue(meta.supportsBatchUpdates());
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatch() throws SQLException {
+        try (Statement stmt = conn.createStatement()) {
+            stmt.addBatch("INSERT INTO Person(_key, id, firstName, lastName, age, data) " +
+                "VALUES ('p1', 0, 'J', 'W', 250, RAWTOHEX('W'))");
+
+            stmt.addBatch("MERGE INTO Person(_key, id, firstName, lastName, age, data) VALUES " +
+                "('p1', 1, 'John', 'White', 25, RAWTOHEX('White')), " +
+                "('p2', 2, 'Joe', 'Black', 35, RAWTOHEX('Black')), " +
+                "('p3', 0, 'M', 'G', 4, RAWTOHEX('G'))");
+
+            stmt.addBatch("UPDATE Person SET id = 3, firstName = 'Mike', lastName = 'Green', " +
+                "age = 40, data = RAWTOHEX('Green') WHERE _key = 'p3'");
+
+            stmt.addBatch("DELETE FROM Person WHERE _key = 'p1'");
+
+            int[] res = stmt.executeBatch();
+
+            assertEquals(4, res.length);
+            assertEquals(1, res[0]);
+            assertEquals(3, res[1]);
+            assertEquals(1, res[2]);
+            assertEquals(1, res[3]);
+        }
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testErrorAmidstBatch() throws SQLException {
+        BatchUpdateException reason = (BatchUpdateException)
+            GridTestUtils.assertThrows(log,
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    try (Statement stmt = conn.createStatement()) {
+                        stmt.addBatch("INSERT INTO Person(_key, id, firstName, lastName, age, data) " +
+                            "VALUES ('p1', 0, 'J', 'W', 250, RAWTOHEX('W'))");
+
+                        stmt.addBatch("UPDATE Person SET id = 3, firstName = 'Mike', lastName = 'Green', " +
+                            "age = 40, data = RAWTOHEX('Green') WHERE _key = 'p3'");
+
+                        stmt.addBatch("SELECT id FROM Person WHERE _key = 'p1'");
+
+                        return stmt.executeBatch();
+                    }
+                }
+            },
+            BatchUpdateException.class,
+            "Given statement type does not match that declared by JDBC driver");
+
+        // Check update counts in the exception.
+        int[] counts = reason.getUpdateCounts();
+
+        assertEquals(2, counts.length);
+        assertEquals(1, counts[0]);
+        assertEquals(0, counts[1]);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClearBatch() throws Exception {
+        try (Statement stmt = conn.createStatement()) {
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws SQLException {
+                    return stmt.executeBatch();
+                }
+            }, SQLException.class, "Batch is empty");
+
+            stmt.addBatch("INSERT INTO Person(_key, id, firstName, lastName, age, data) " +
+                "VALUES ('p1', 0, 'J', 'W', 250, RAWTOHEX('W'))");
+
+            stmt.clearBatch();
+
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws SQLException {
+                    return stmt.executeBatch();
+                }
+            }, SQLException.class, "Batch is empty");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
index 8ae0e90..07b5587 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.jdbc2;
 
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Arrays;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -47,4 +48,27 @@ public class JdbcUpdateStatementSelfTest extends JdbcAbstractUpdateStatementSelf
         assertEquals(Arrays.asList(F.asList("John"), F.asList("Jack"), F.asList("Mike")),
                 jcache(0).query(new SqlFieldsQuery("select firstName from Person order by _key")).getAll());
     }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatch() throws SQLException {
+        PreparedStatement ps = conn.prepareStatement("update Person set lastName = concat(firstName, 'son') " +
+            "where firstName = ?");
+
+        ps.setString(1, "John");
+
+        ps.addBatch();
+
+        ps.setString(1, "Harry");
+
+        ps.addBatch();
+
+        int[] res = ps.executeBatch();
+
+        assertEquals(Arrays.asList(F.asList("Johnson"), F.asList("Black"), F.asList("Green")),
+            jcache(0).query(new SqlFieldsQuery("select lastName from Person order by _key")).getAll());
+
+        assertTrue(Arrays.equals(new int[] {1, 0}, res));
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index cf7ee8f..a20002b 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -93,9 +93,12 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDefaultNoOpCacheTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcMergeStatementSelfTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcBinaryMarshallerMergeStatementSelfTest.class));
+        suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcUpdateStatementSelfTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcInsertStatementSelfTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcBinaryMarshallerInsertStatementSelfTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDeleteStatementSelfTest.class));
+        suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStatementBatchingSelfTest.class));
+
         suite.addTest(new TestSuite(JdbcBlobTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
new file mode 100644
index 0000000..7b4846c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.BatchUpdateException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteJdbcDriver;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+import static java.sql.Statement.SUCCESS_NO_INFO;
+
+/**
+ * Task for SQL batched update statements execution through {@link IgniteJdbcDriver}.
+ */
+class JdbcBatchUpdateTask implements IgniteCallable<int[]> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite. */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Schema name. */
+    private final String schemaName;
+
+    /** SQL command for argument batching. */
+    private final String sql;
+
+    /** Batch of statements. */
+    private final List<String> sqlBatch;
+
+    /** Batch of arguments. */
+    private final List<List<Object>> batchArgs;
+
+    /** Fetch size. */
+    private final int fetchSize;
+
+    /** Local execution flag. */
+    private final boolean loc;
+
+    /** Local query flag. */
+    private final boolean locQry;
+
+    /** Collocated query flag. */
+    private final boolean collocatedQry;
+
+    /** Distributed joins flag. */
+    private final boolean distributedJoins;
+
+    /**
+     * @param ignite Ignite.
+     * @param cacheName Cache name.
+     * @param schemaName Schema name.
+     * @param sql SQL query. {@code null} in case of statement batching.
+     * @param sqlBatch Batch of SQL statements. {@code null} in case of parameter batching.
+     * @param batchArgs Batch of SQL parameters. {@code null} in case of statement batching.
+     * @param loc Local execution flag.
+     * @param fetchSize Fetch size.
+     * @param locQry Local query flag.
+     * @param collocatedQry Collocated query flag.
+     * @param distributedJoins Distributed joins flag.
+     */
+    public JdbcBatchUpdateTask(Ignite ignite, String cacheName, String schemaName, String sql,
+        List<String> sqlBatch, List<List<Object>> batchArgs, boolean loc, int fetchSize,
+        boolean locQry, boolean collocatedQry, boolean distributedJoins) {
+        this.ignite = ignite;
+        this.cacheName = cacheName;
+        this.schemaName = schemaName;
+        this.sql = sql;
+        this.sqlBatch = sqlBatch;
+        this.batchArgs = batchArgs;
+        this.fetchSize = fetchSize;
+        this.loc = loc;
+        this.locQry = locQry;
+        this.collocatedQry = collocatedQry;
+        this.distributedJoins = distributedJoins;
+
+        assert (!F.isEmpty(sql) && !F.isEmpty(batchArgs)) ^ !F.isEmpty(sqlBatch);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int[] call() throws Exception {
+        IgniteCache<?, ?> cache = ignite.cache(cacheName);
+
+        // Don't create caches on server nodes in order to avoid of data rebalancing.
+        boolean start = ignite.configuration().isClientMode();
+
+        if (cache == null && cacheName == null)
+            cache = ((IgniteKernal)ignite).context().cache().getOrStartPublicCache(start, !loc && locQry);
+
+        if (cache == null) {
+            if (cacheName == null)
+                throw new SQLException("Failed to execute query. No suitable caches found.");
+            else
+                throw new SQLException("Cache not found [cacheName=" + cacheName + ']');
+        }
+
+        int batchSize = F.isEmpty(sql) ? sqlBatch.size() : batchArgs.size();
+
+        int[] updCntrs = new int[batchSize];
+
+        int idx = 0;
+
+        try {
+            if (F.isEmpty(sql)) {
+                for (; idx < batchSize; idx++)
+                    updCntrs[idx] = doSingleUpdate(cache, sqlBatch.get(idx), null);
+            }
+            else {
+                for (; idx < batchSize; idx++)
+                    updCntrs[idx] = doSingleUpdate(cache, sql, batchArgs.get(idx));
+            }
+        }
+        catch (Exception ex) {
+            throw new BatchUpdateException(Arrays.copyOf(updCntrs, idx), ex);
+        }
+
+        return updCntrs;
+    }
+
+    /**
+     * Performs update.
+     *
+     * @param cache Cache.
+     * @param sqlText SQL text.
+     * @param args Parameters.
+     * @return Update counter.
+     * @throws SQLException If failed.
+     */
+    private Integer doSingleUpdate(IgniteCache<?, ?> cache, String sqlText, List<Object> args) throws SQLException {
+        SqlFieldsQuery qry = new JdbcSqlFieldsQuery(sqlText, false);
+
+        qry.setPageSize(fetchSize);
+        qry.setLocal(locQry);
+        qry.setCollocated(collocatedQry);
+        qry.setDistributedJoins(distributedJoins);
+        qry.setSchema(schemaName);
+        qry.setArgs(args == null ? null : args.toArray());
+
+        QueryCursorImpl<List<?>> qryCursor = (QueryCursorImpl<List<?>>)cache.withKeepBinary().query(qry);
+
+        if (qryCursor.isQuery())
+            throw new SQLException(getError("Query produced result set", qry));
+
+        List<List<?>> rows = qryCursor.getAll();
+
+        if (F.isEmpty(rows))
+            return SUCCESS_NO_INFO;
+
+        if (rows.size() != 1)
+            throw new SQLException(getError("Expected single row for update operation result", qry));
+
+        List<?> row = rows.get(0);
+
+        if (F.isEmpty(row) || row.size() != 1)
+            throw new SQLException(getError("Expected row size of 1 for update operation", qry));
+
+        Object objRes = row.get(0);
+
+        if (!(objRes instanceof Long))
+            throw new SQLException(getError("Unexpected update result type", qry));
+
+        Long longRes = (Long)objRes;
+
+        if (longRes > Integer.MAX_VALUE) {
+            IgniteLogger log = ignite.log();
+
+            if (log != null)
+                log.warning(getError("Query updated row counter (" + longRes + ") exceeds integer range", qry));
+
+            return Integer.MAX_VALUE;
+        }
+
+        return longRes.intValue();
+    }
+
+    /**
+     * Formats error message with query details.
+     *
+     * @param msg Error message.
+     * @param qry Query.
+     * @return Result.
+     */
+    private String getError(String msg, SqlFieldsQuery qry) {
+        return msg + " [qry='" + qry.getSql() + "', params=" + Arrays.deepToString(qry.getArgs()) + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
index 98a2563..b369b0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java
@@ -1063,7 +1063,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData {
 
     /** {@inheritDoc} */
     @Override public boolean supportsBatchUpdates() throws SQLException {
-        return false;
+        return true;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
index 16030f7..38dfe02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
@@ -39,6 +39,7 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.List;
 
 /**
  * JDBC prepared statement implementation.
@@ -50,6 +51,9 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     /** H2's parsed statement to retrieve metadata from. */
     PreparedStatement nativeStatement;
 
+    /** Batch arguments. */
+    private List<List<Object>> batchArgs;
+
     /**
      * Creates new prepared statement.
      *
@@ -66,7 +70,8 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     @Override public void addBatch(String sql) throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Adding new SQL command to batch not supported for prepared statement.");
+        throw new SQLFeatureNotSupportedException("Adding new SQL command to batch is not supported for prepared " +
+            "statement (use addBatch() to add new set of arguments)");
     }
 
     /** {@inheritDoc} */
@@ -185,7 +190,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     @Override public void clearBatch() throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+        batchArgs = null;
     }
 
     /** {@inheritDoc} */
@@ -207,14 +212,26 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     @Override public void addBatch() throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+        if (batchArgs == null)
+            batchArgs = new ArrayList<>();
+
+        batchArgs.add(args);
+
+        args = null;
     }
 
     /** {@inheritDoc} */
     @Override public int[] executeBatch() throws SQLException {
-        throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+        ensureNotClosed();
+
+        List<List<Object>> batchArgs = this.batchArgs;
+
+        this.batchArgs = null;
+
+        return doBatchUpdate(sql, null, batchArgs);
     }
 
+
     /** {@inheritDoc} */
     @Override public void setCharacterStream(int paramIdx, Reader x, int len) throws SQLException {
         ensureNotClosed();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index 89a80ca..19c20a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -74,7 +74,7 @@ public class JdbcStatement implements Statement {
     /** Current updated items count. */
     long updateCnt = -1;
 
-    /** Batch statements. */
+    /** Batch of statements. */
     private List<String> batch;
 
     /**
@@ -187,7 +187,7 @@ public class JdbcStatement implements Statement {
 
     /**
      * @param rows query result.
-     * @return update counter, if found
+     * @return update counter, if found.
      * @throws SQLException if getting an update counter from result proved to be impossible.
      */
     private static long updateCounterFromQueryResult(List<List<?>> rows) throws SQLException {
@@ -461,7 +461,60 @@ public class JdbcStatement implements Statement {
     @Override public int[] executeBatch() throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+        List<String> batch = this.batch;
+
+        this.batch = null;
+
+        return doBatchUpdate(null, batch, null);
+    }
+
+    /**
+     * Runs batch of update commands.
+     *
+     * @param command SQL command.
+     * @param batch Batch of SQL commands.
+     * @param batchArgs Batch of SQL parameters.
+     * @return Number of affected rows.
+     * @throws SQLException If failed.
+     */
+    protected int[] doBatchUpdate(String command, List<String> batch, List<List<Object>> batchArgs)
+        throws SQLException {
+        rs = null;
+
+        updateCnt = -1;
+
+        if ((F.isEmpty(command) || F.isEmpty(batchArgs)) && F.isEmpty(batch))
+            throw new SQLException("Batch is empty.");
+
+        Ignite ignite = conn.ignite();
+
+        UUID nodeId = conn.nodeId();
+
+        boolean loc = nodeId == null;
+
+        if (!conn.isDmlSupported())
+            throw new SQLException("Failed to query Ignite: DML operations are supported in versions 1.8.0 and newer");
+
+        JdbcBatchUpdateTask task = new JdbcBatchUpdateTask(loc ? ignite : null, conn.cacheName(),
+            conn.schemaName(), command, batch, batchArgs, loc, getFetchSize(), conn.isLocalQuery(),
+            conn.isCollocatedQuery(), conn.isDistributedJoins());
+
+        try {
+            int[] res = loc ? task.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(task);
+
+            updateCnt = F.isEmpty(res)? -1 : res[res.length - 1];
+
+            return res;
+        }
+        catch (IgniteSQLException e) {
+            throw e.toJdbcException();
+        }
+        catch (SQLException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new SQLException("Failed to query Ignite.", e);
+        }
     }
 
     /** {@inheritDoc} */