You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/05/07 15:21:46 UTC

[ignite] 38/41: GG-17385 [IGNITE-11499] SQL: DML internal batch size is 1 by default to prevent deadlock

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch gg-18540
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 9666cbe16841fa78b1e5d59f8c0d1578cd0060b6
Author: tledkov <tl...@gridgain.com>
AuthorDate: Mon May 6 14:10:02 2019 +0300

    GG-17385 [IGNITE-11499] SQL: DML internal batch size is 1 by default to prevent deadlock
---
 .../jdbc/thin/JdbcThinConnectionSelfTest.java      |  33 +++-
 .../apache/ignite/cache/query/SqlFieldsQuery.java  |  35 ++++
 .../internal/jdbc/thin/ConnectionProperties.java   |  15 ++
 .../jdbc/thin/ConnectionPropertiesImpl.java        |  29 +++-
 .../ignite/internal/jdbc/thin/JdbcThinTcpIo.java   |   8 +-
 .../odbc/jdbc/JdbcConnectionContext.java           |  11 +-
 .../processors/odbc/jdbc/JdbcRequestHandler.java   |   9 +-
 .../internal/processors/odbc/jdbc/JdbcUtils.java   |  24 +++
 .../processors/odbc/odbc/OdbcRequestHandler.java   |   1 +
 .../processors/query/SqlClientContext.java         |  17 +-
 .../processors/query/h2/H2TableDescriptor.java     |   4 +-
 .../processors/query/h2/IgniteH2Indexing.java      |   6 +-
 .../processors/query/h2/QueryParameters.java       |  27 +++-
 .../processors/query/DmlBatchSizeDeadlockTest.java | 178 +++++++++++++++++++++
 .../query/IgniteSqlNotNullConstraintTest.java      |  40 ++---
 .../IgniteBinaryCacheQueryTestSuite2.java          |   2 +
 16 files changed, 389 insertions(+), 50 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
index 1dc35f6..56e0aa5 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
@@ -247,15 +247,42 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
     }
 
     /**
+     * Test update batch size property.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUpdateBatchSize() throws Exception {
+        assertInvalid(urlWithAffinityAwarenessFlagSemicolon + ";updateBatchSize=-1",
+            "Property cannot be lower than 1 [name=updateBatchSize, value=-1]");
+
+        try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlagSemicolon)) {
+            for (JdbcThinTcpIo io: ios(conn))
+                assertNull(io.connectionProperties().getUpdateBatchSize());
+        }
+
+        try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlagSemicolon
+            + ";updateBatchSize=1024")) {
+            for (JdbcThinTcpIo io: ios(conn))
+                assertEquals(1024, (int)io.connectionProperties().getUpdateBatchSize());
+        }
+
+        try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag +
+            "&updateBatchSize=1024")) {
+            for (JdbcThinTcpIo io: ios(conn))
+                assertEquals(1024, (int)io.connectionProperties().getUpdateBatchSize());
+        }
+    }
+
+    /**
      * Test SQL hints.
      *
      * @throws Exception If failed.
      */
     @Test
     public void testSqlHints() throws Exception {
-        try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag)) {
-            assertHints(conn, false, false, false, false, false,
-                false, affinityAwareness);
+        try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) {
+            assertHints(conn, false, false, false, false, false, false, affinityAwareness);
         }
 
         try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag + "&distributedJoins=true")) {
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 7ee7618..8be968e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -49,6 +49,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Default value of the update internal batch size. */
+    private static final int DFLT_UPDATE_BATCH_SIZE = 1;
+
     /** Do not remove. For tests only. */
     @SuppressWarnings("NonConstantFieldWithUpperCaseName")
     private static boolean DFLT_LAZY;
@@ -88,6 +91,12 @@ public class SqlFieldsQuery extends Query<List<?>> {
     private Boolean dataPageScanEnabled;
 
     /**
+     * Update internal batch size. Default is 1 to prevent deadlock on update where keys sequence are different in
+     * several concurrent updates.
+     */
+    private int updateBatchSize = DFLT_UPDATE_BATCH_SIZE;
+
+    /**
      * Copy constructs SQL fields query.
      *
      * @param qry SQL query.
@@ -104,6 +113,7 @@ public class SqlFieldsQuery extends Query<List<?>> {
         parts = qry.parts;
         schema = qry.schema;
         dataPageScanEnabled = qry.dataPageScanEnabled;
+        updateBatchSize = qry.updateBatchSize;
     }
 
     /**
@@ -408,6 +418,31 @@ public class SqlFieldsQuery extends Query<List<?>> {
     }
 
     /**
+     * Gets update internal bach size.
+     * Default is 1 to prevent deadlock on update where keys sequence are different in
+     * several concurrent updates.
+     *
+     * @return Update internal batch size
+     */
+    public int getUpdateBatchSize() {
+        return updateBatchSize;
+    }
+
+    /**
+     * Sets update internal bach size.
+     * Default is 1 to prevent deadlock on update where keys sequence are different in
+     * several concurrent updates.
+     *
+     * @param updateBatchSize Update internal batch size.
+     * @return {@code this} for chaining.
+     */
+    public SqlFieldsQuery setUpdateBatchSize(int updateBatchSize) {
+        this.updateBatchSize = updateBatchSize;
+
+        return this;
+    }
+
+    /**
      * @return Copy of this query.
      */
     public SqlFieldsQuery copy() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
index 53df56e..2cab155 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
@@ -416,4 +416,19 @@ public interface ConnectionProperties {
      * for this connection, if {@code false} then it's disabled.
      */
     public void setAffinityAwareness(boolean affinityAwareness);
+
+    /**
+     * Note: Batch size of 1 prevents deadlock on update where keys sequence are different in several concurrent updates.
+     *
+     * @return update internal bach size.
+     */
+    @Nullable public Integer getUpdateBatchSize();
+
+    /**
+     * Note: Set to 1 to prevent deadlock on update where keys sequence are different in several concurrent updates.
+     *
+     * @param updateBatchSize update internal bach size.
+     * @throws SQLException On error.
+     */
+    public void setUpdateBatchSize(@Nullable Integer updateBatchSize) throws SQLException;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
index e9d5b0c..02ca1e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
@@ -194,6 +194,12 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
         "Whether jdbc thin affinity awareness is enabled.",
         false, false);
 
+    /** Update batch size (the size of internal batches are used for INSERT/UPDATE/DELETE operation). */
+    private IntegerProperty updateBatchSize = new IntegerProperty("updateBatchSize",
+        "Update bach size (the size of internal batches are used for INSERT/UPDATE/DELETE operation). " +
+            "Set to 1 to prevent deadlock on update where keys sequence are different " +
+            "in several concurrent updates.", null, false, 1, Integer.MAX_VALUE);
+
     /** Properties array. */
     private final ConnectionProperty [] propsArray = {
         distributedJoins, enforceJoinOrder, collocated, replicatedOnly, autoCloseServerCursor,
@@ -204,7 +210,8 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
         sslTrustAll, sslFactory,
         user, passwd,
         dataPageScanEnabled,
-        affinityAwareness
+        affinityAwareness,
+        updateBatchSize
     };
 
     /** {@inheritDoc} */
@@ -520,6 +527,16 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
         this.affinityAwareness.setValue(affinityAwareness);
     }
 
+    /** {@inheritDoc} */
+    @Override public @Nullable Integer getUpdateBatchSize() {
+        return updateBatchSize.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setUpdateBatchSize(@Nullable Integer updateBatchSize) throws SQLException {
+        this.updateBatchSize.setValue(updateBatchSize);
+    }
+
     /**
      * @param url URL connection.
      * @param props Environment properties.
@@ -1020,8 +1037,6 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
         NumberProperty(String name, String desc, Number dfltVal, boolean required, Number min, Number max) {
             super(name, desc, dfltVal, null, required);
 
-            assert dfltVal != null;
-
             val = dfltVal;
 
             range = new Number[] {min, max};
@@ -1030,7 +1045,7 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
         /** {@inheritDoc} */
         @Override void init(String str) throws SQLException {
             if (str == null)
-                val = (int)dfltVal;
+                val = dfltVal != null ? (int)dfltVal : null;
             else {
                 try {
                     setValue(parse(str));
@@ -1051,7 +1066,7 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
 
         /** {@inheritDoc} */
         @Override String valueObject() {
-            return String.valueOf(val);
+            return val != null ? String.valueOf(val) : null;
         }
 
         /**
@@ -1102,8 +1117,8 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
         /**
          * @return Property value.
          */
-        int value() {
-            return val.intValue();
+        Integer value() {
+            return val != null ? val.intValue() : null;
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index d741320..7c5759e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -23,8 +23,8 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.sql.SQLException;
 import java.util.List;
-import java.util.UUID;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.query.QueryCancelledException;
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryFetchRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryMetadataRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcUtils;
 import org.apache.ignite.internal.util.ipc.loopback.IpcClientTcpEndpoint;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -246,9 +247,12 @@ public class JdbcThinTcpIo {
         if (ver.compareTo(VER_2_7_0) >= 0)
             writer.writeString(connProps.nestedTxMode());
 
-        if (ver.compareTo(VER_2_8_0) >= 0)
+        if (ver.compareTo(VER_2_8_0) >= 0) {
             writer.writeByte(nullableBooleanToByte(connProps.isDataPageScanEnabled()));
 
+            JdbcUtils.writeNullableInteger(writer, connProps.getUpdateBatchSize());
+        }
+
         if (!F.isEmpty(connProps.getUsername())) {
             assert ver.compareTo(VER_2_5_0) >= 0 : "Authentication is supported since 2.5";
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index f713b53..34cdabe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -106,7 +106,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
      *  @param ctx Kernal Context.
      * @param ses Session.
      * @param busyLock Shutdown busy lock.
-     * @param connId
+     * @param connId Connection ID.
      * @param maxCursors Maximum allowed cursors.
      */
     public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, long connId,
@@ -166,12 +166,15 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
             }
         }
 
-
         Boolean dataPageScanEnabled = null;
+        Integer updateBatchSize = null;
 
-        if (ver.compareTo(VER_2_8_0) >= 0)
+        if (ver.compareTo(VER_2_8_0) >= 0) {
             dataPageScanEnabled = nullableBooleanFromByte(reader.readByte());
 
+            updateBatchSize = JdbcUtils.readNullableInteger(reader);
+        }
+
         if (ver.compareTo(VER_2_5_0) >= 0) {
             String user = null;
             String passwd = null;
@@ -206,7 +209,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
 
         handler = new JdbcRequestHandler(busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder,
             collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, nestedTxMode,
-            dataPageScanEnabled, actx, ver, this);
+            dataPageScanEnabled, updateBatchSize, actx, ver, this);
 
         handler.start();
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 362b0c8..42edcc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -171,6 +171,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
      * @param autoCloseCursors Flag to automatically close server cursors.
      * @param lazy Lazy query execution flag.
      * @param skipReducerOnUpdate Skip reducer on update flag.
+     * @param dataPageScanEnabled Enable scan data page mode.
+     * @param updateBatchSize Size of internal batch for DML queries.
      * @param actx Authentication context.
      * @param protocolVer Protocol version.
      * @param connCtx Jdbc connection context.
@@ -188,6 +190,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
         boolean skipReducerOnUpdate,
         NestedTxMode nestedTxMode,
         @Nullable Boolean dataPageScanEnabled,
+        @Nullable Integer updateBatchSize,
         AuthorizationContext actx,
         ClientListenerProtocolVersion protocolVer,
         JdbcConnectionContext connCtx
@@ -212,7 +215,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
             replicatedOnly,
             lazy,
             skipReducerOnUpdate,
-            dataPageScanEnabled
+            dataPageScanEnabled,
+            updateBatchSize
         );
 
         this.busyLock = busyLock;
@@ -968,6 +972,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
 
         if (cliCtx.dataPageScanEnabled() != null)
             qry.setDataPageScanEnabled(cliCtx.dataPageScanEnabled());
+
+        if (cliCtx.updateBatchSize() != null)
+            qry.setUpdateBatchSize(cliCtx.updateBatchSize());
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
index f07a295..1befe4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.ignite.internal.binary.BinaryReaderExImpl;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
 import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Various JDBC utility methods.
@@ -104,4 +105,27 @@ public class JdbcUtils {
         else
             return Collections.emptyList();
     }
+
+    /**
+     * Read nullable Integer.
+     *
+     * @param reader Binary reader.
+     * @return read value.
+     */
+    @Nullable public static Integer readNullableInteger(BinaryReaderExImpl reader) {
+        return reader.readBoolean() ? reader.readInt() : null;
+    }
+
+    /**
+     * Write nullable integer.
+     *
+     * @param writer Binary writer.
+     * @param val Integer value..
+     */
+    public static void writeNullableInteger(BinaryWriterExImpl writer, @Nullable Integer val) {
+        writer.writeBoolean(val != null);
+
+        if (val != null)
+            writer.writeInt(val);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
index 7d3f9bb..2dd7338 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
@@ -166,6 +166,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
             replicatedOnly,
             lazy,
             skipReducerOnUpdate,
+            null,
             null
         );
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
index 46e918e..a365e13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
@@ -60,6 +60,9 @@ public class SqlClientContext implements AutoCloseable {
     /** Data page scan support for query execution. */
     private final @Nullable Boolean dataPageScanEnabled;
 
+    /** Update internal batch size. */
+    private final @Nullable Integer updateBatchSize;
+
     /** Monitor for stream operations. */
     private final Object muxStreamer = new Object();
 
@@ -103,11 +106,15 @@ public class SqlClientContext implements AutoCloseable {
      * @param replicatedOnly Replicated caches only flag.
      * @param lazy Lazy query execution flag.
      * @param skipReducerOnUpdate Skip reducer on update flag.
+     * @param dataPageScanEnabled Enable scan data page mode.
+     * @param updateBatchSize Size of internal batch for DML queries.
      */
     public SqlClientContext(GridKernalContext ctx, Factory<GridWorker> orderedBatchWorkerFactory,
         boolean distributedJoins, boolean enforceJoinOrder,
         boolean collocated, boolean replicatedOnly, boolean lazy, boolean skipReducerOnUpdate,
-        @Nullable Boolean dataPageScanEnabled) {
+        @Nullable Boolean dataPageScanEnabled,
+        @Nullable Integer updateBatchSize
+        ) {
         this.ctx = ctx;
         this.orderedBatchWorkerFactory = orderedBatchWorkerFactory;
         this.distributedJoins = distributedJoins;
@@ -117,6 +124,7 @@ public class SqlClientContext implements AutoCloseable {
         this.lazy = lazy;
         this.skipReducerOnUpdate = skipReducerOnUpdate;
         this.dataPageScanEnabled = dataPageScanEnabled;
+        this.updateBatchSize = updateBatchSize;
 
         log = ctx.log(SqlClientContext.class.getName());
     }
@@ -227,6 +235,13 @@ public class SqlClientContext implements AutoCloseable {
     }
 
     /**
+     * @return Update internal batch size.
+     */
+    public @Nullable Integer updateBatchSize() {
+        return updateBatchSize;
+    }
+
+    /**
      * @return Streaming state flag (on or off).
      */
     public boolean isStream() {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java
index 6d43f48..003776b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java
@@ -50,10 +50,10 @@ public class H2TableDescriptor {
     /** PK index name. */
     public static final String PK_IDX_NAME = "_key_PK";
 
-    /** PK hashindex name */
+    /** PK hash index name. */
     public static final String PK_HASH_IDX_NAME = "_key_PK_hash";
 
-    /** Affinity key index name */
+    /** Affinity key index name. */
     public static final String AFFINITY_KEY_IDX_NAME = "AFFINITY_KEY";
 
     /** Indexing. */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 598072f..154a624 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1268,6 +1268,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param cancel Cancel.
      * @param timeout Timeout.
      * @return Fields query.
+     * @throws IgniteCheckedException On error.
      */
     private QueryCursorImpl<List<?>> executeSelectForDml(
         String schema,
@@ -1314,6 +1315,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param inTx Flag whether query is executed within transaction.
      * @param timeout Timeout.
      * @return Query result.
+     * @throws IgniteCheckedException On error.
      */
     private Iterable<List<?>> executeSelect0(
         QueryDescriptor qryDesc,
@@ -2349,7 +2351,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     List<List<List<?>>> cur = plan.createRows(argss);
 
                     //TODO: IGNITE-11176 - Need to support cancellation
-                    ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.pageSize());
+                    ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.updateBatchSize());
                 }
                 finally {
                     DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
@@ -2623,7 +2625,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             }, cancel);
         }
 
-        int pageSize = loc ? 0 : qryParams.pageSize();
+        int pageSize = qryParams.updateBatchSize();
 
         //TODO: IGNITE-11176 - Need to support cancellation
         return DmlUtils.processSelectResult(plan, cur, pageSize);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java
index 15e6abe..2b33f4a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java
@@ -55,6 +55,12 @@ public class QueryParameters {
     private final List<Object[]> batchedArgs;
 
     /**
+     * Update internal batch size.
+     * Default is 1 to prevent deadlock on update where keys sequence are different in several concurrent updates.
+     */
+    private final int updateBatchSize;
+
+    /**
      * Create parameters from query.
      *
      * @param qry Query.
@@ -85,7 +91,8 @@ public class QueryParameters {
             qry.isDataPageScanEnabled(),
             nestedTxMode,
             autoCommit,
-            batchedArgs
+            batchedArgs,
+            qry.getUpdateBatchSize()
         );
     }
 
@@ -101,6 +108,7 @@ public class QueryParameters {
      * @param nestedTxMode Nested TX mode.
      * @param autoCommit Auto-commit flag.
      * @param batchedArgs Batched arguments.
+     * @param updateBatchSize Update internal batch size.
      */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     private QueryParameters(
@@ -112,7 +120,8 @@ public class QueryParameters {
         Boolean dataPageScanEnabled,
         NestedTxMode nestedTxMode,
         boolean autoCommit,
-        List<Object[]> batchedArgs
+        List<Object[]> batchedArgs,
+        int updateBatchSize
     ) {
         this.args = args;
         this.parts = parts;
@@ -123,6 +132,7 @@ public class QueryParameters {
         this.nestedTxMode = nestedTxMode;
         this.autoCommit = autoCommit;
         this.batchedArgs = batchedArgs;
+        this.updateBatchSize = updateBatchSize;
     }
 
     /**
@@ -192,6 +202,16 @@ public class QueryParameters {
     }
 
     /**
+     * Gets update internal bach size.
+     * Default is 1 to prevent deadlock on update where keys sequance are different in several concurrent updates.
+     *
+     * @return Update internal batch size
+     */
+    public int updateBatchSize() {
+        return updateBatchSize;
+    }
+
+    /**
      * Convert current batched arguments to a form with single arguments.
      *
      * @param args Arguments.
@@ -207,7 +227,8 @@ public class QueryParameters {
             this.dataPageScanEnabled,
             this.nestedTxMode,
             this.autoCommit,
-            null
+            null,
+            this.updateBatchSize
         );
     }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/DmlBatchSizeDeadlockTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/DmlBatchSizeDeadlockTest.java
new file mode 100644
index 0000000..790ab72
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/DmlBatchSizeDeadlockTest.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests DML deadlock with different update batch size.
+ */
+public class DmlBatchSizeDeadlockTest extends AbstractIndexingCommonTest {
+    /** Keys count. */
+    private static final int KEY_CNT = 1000;
+
+    /** Test time to run. */
+    private static final int TEST_TIME = 20_000;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGrid();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws IgniteCheckedException On error.
+     */
+    @Test
+    public void testDeadlockOnDmlAtomic() throws IgniteCheckedException {
+        checkDeadlockOnDml(CacheAtomicityMode.ATOMIC);
+    }
+
+    /**
+     * @throws IgniteCheckedException On error.
+     */
+    @Test
+    public void testDeadlockOnDmlTransactional() throws IgniteCheckedException {
+        checkDeadlockOnDml(CacheAtomicityMode.TRANSACTIONAL);
+    }
+
+    /**
+     * @param mode Atomicity mode.
+     * @throws IgniteCheckedException On failed.
+     */
+    public void checkDeadlockOnDml(CacheAtomicityMode mode) throws IgniteCheckedException {
+        IgniteCache<Long, Long> cache = createCache(mode);
+
+        final long tEnd = U.currentTimeMillis() + TEST_TIME;
+
+        final IgniteInternalFuture futAsc = GridTestUtils.runAsync(() -> {
+            while (U.currentTimeMillis() < tEnd) {
+                try {
+                    sql("UPDATE test SET val = 2 ORDER BY id ASC");
+                }
+                catch (Exception e) {
+                    IgniteSQLException esql = X.cause(e, IgniteSQLException.class);
+
+                    if (esql == null || !esql.getMessage().contains("Failed to update some keys because they " +
+                        "had been modified concurrently"))
+                        throw e;
+                }
+            }
+        });
+
+        final IgniteInternalFuture futDesc = GridTestUtils.runAsync(() -> {
+            while (U.currentTimeMillis() < tEnd) {
+                while (U.currentTimeMillis() < tEnd) {
+                    try {
+                        sql("UPDATE test SET val = 3 ORDER BY id DESC");
+                    }
+                    catch (Exception e) {
+                        IgniteSQLException esql = X.cause(e, IgniteSQLException.class);
+
+                        if (esql == null || !esql.getMessage().contains("Failed to update some keys because they " +
+                            "had been modified concurrently"))
+                            throw e;
+                    }
+                }
+            }
+        });
+
+        final IgniteInternalFuture futCache = GridTestUtils.runAsync(() -> {
+            while (U.currentTimeMillis() < tEnd) {
+                Map<Long, Long> map = new LinkedHashMap();
+
+                for (long i = KEY_CNT - 1; i >= 0; --i)
+                    map.put(i, i);
+
+                cache.putAll(map);
+            }
+        });
+
+        boolean deadlock = !GridTestUtils.waitForCondition(
+            () -> futAsc.isDone() && futDesc.isDone() && futCache.isDone(),
+            TEST_TIME + 5000);
+
+        if (deadlock) {
+            futAsc.cancel();
+            futDesc.cancel();
+            futCache.cancel();
+
+            fail("Deadlock on DML");
+        }
+    }
+
+    /**
+     * @param mode Cache atomicity mode.
+     * @return Created test cache.
+     */
+    private IgniteCache<Long, Long> createCache(CacheAtomicityMode mode) {
+        IgniteCache<Long, Long> c = grid().createCache(new CacheConfiguration<Long, Long>()
+            .setName("test")
+            .setSqlSchema("TEST")
+            .setAtomicityMode(mode)
+            .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class)
+                .setTableName("test")
+                .addQueryField("id", Long.class.getName(), null)
+                .addQueryField("val", Long.class.getName(), null)
+                .setKeyFieldName("id")
+                .setValueFieldName("val")
+            ))
+            .setAffinity(new RendezvousAffinityFunction(false, 10)));
+
+        for (long i = 0; i < KEY_CNT; ++i)
+            c.put(i, i);
+
+        return c;
+    }
+
+    /**
+     * @param sql SQL query.
+     * @param args Query parameters.
+     * @return Results cursor.
+     */
+    private FieldsQueryCursor<List<?>> sql(String sql, Object... args) {
+        return grid().context().query().querySqlFields(new SqlFieldsQuery(sql)
+            .setSchema("TEST")
+            .setUpdateBatchSize(1)
+            .setArgs(args), false);
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java
index 8bd00f7..c19bd8b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java
@@ -779,27 +779,10 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
 
     /** */
     private void checkNotNullCheckDmlInsertValues(CacheAtomicityMode atomicityMode) throws Exception {
-        executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR NOT NULL) WITH \"atomicity="
+        executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR NOT NULL, age INT) WITH \"atomicity="
             + atomicityMode.name() + "\"");
 
-        GridTestUtils.assertThrows(log(), new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                executeSql("INSERT INTO test(id, name) " +
-                    "VALUES (1, 'ok'), (2, NULLIF('a', 'a')), (3, 'ok')");
-
-                return null;
-            }
-        }, IgniteSQLException.class, ERR_MSG);
-
-        List<List<?>> result = executeSql("SELECT id, name FROM test ORDER BY id");
-
-        assertEquals(0, result.size());
-
-        executeSql("INSERT INTO test(id, name) VALUES (1, 'ok'), (2, 'ok2'), (3, 'ok3')");
-
-        result = executeSql("SELECT id, name FROM test ORDER BY id");
-
-        assertEquals(3, result.size());
+        checkNotNullInsertValues();
     }
 
     /** */
@@ -821,10 +804,16 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
 
         executeSql("ALTER TABLE test ADD COLUMN name VARCHAR NOT NULL");
 
+        checkNotNullInsertValues();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkNotNullInsertValues() throws Exception {
         GridTestUtils.assertThrows(log(), new Callable<Object>() {
             @Override public Object call() throws Exception {
-                executeSql("INSERT INTO test(id, name, age) " +
-                    "VALUES (1, 'ok', 1), (2, NULLIF('a', 'a'), 2), (3, 'ok', 3)");
+                executeSql("INSERT INTO test(id, name, age) VALUES (2, NULLIF('a', 'a'), 2)");
 
                 return null;
             }
@@ -846,7 +835,7 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
     public void testNotNullCheckDmlInsertFromSelect() throws Exception {
         executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR, age INT)");
 
-        executeSql("INSERT INTO test(id, name, age) VALUES (1, 'Macy', 25), (2, null, 25), (3, 'John', 30)");
+        executeSql("INSERT INTO test(id, name, age) VALUES (2, null, 25)");
 
         GridTestUtils.assertThrows(log(), new Callable<Object>() {
             @Override public Object call() throws Exception {
@@ -860,6 +849,7 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
 
         assertEquals(0, result.size());
 
+        executeSql("INSERT INTO test(id, name, age) VALUES (1, 'Macy', 25), (3, 'John', 30)");
         executeSql("DELETE FROM test WHERE id = 2");
 
         result = executeSql("INSERT INTO " + TABLE_PERSON + "(_key, name, age) " + "SELECT id, name, age FROM test");
@@ -905,9 +895,9 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
 
         GridTestUtils.assertThrows(log(), new Callable<Object>() {
             @Override public Object call() throws Exception {
-                return executeSql("UPDATE dest" +
-                    " p SET (name) = " +
-                    "(SELECT name FROM src t WHERE p.id = t.id)");
+                return executeSql("UPDATE dest p " +
+                    "SET (name) = (SELECT name FROM src t WHERE p.id = t.id) " +
+                    "WHERE p.id = 2");
             }
         }, IgniteSQLException.class, ERR_MSG);
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
index 416cc9e..dcb6dfb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.index.DynamicIndexReplicatedT
 import org.apache.ignite.internal.processors.cache.query.ScanQueryOffheapExpiryPolicySelfTest;
 import org.apache.ignite.internal.processors.database.baseline.IgniteChangingBaselineCacheQueryNodeRestartSelfTest;
 import org.apache.ignite.internal.processors.database.baseline.IgniteStableBaselineCacheQueryNodeRestartsSelfTest;
+import org.apache.ignite.internal.processors.query.DmlBatchSizeDeadlockTest;
 import org.apache.ignite.internal.processors.query.IgniteCacheGroupsCompareQueryTest;
 import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlDistributedJoinSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexMultiNodeSelfTest;
@@ -141,6 +142,7 @@ import org.junit.runners.Suite;
     LocalQueryLazyTest.class,
 
     LongRunningQueryTest.class,
+    DmlBatchSizeDeadlockTest.class
 })
 public class IgniteBinaryCacheQueryTestSuite2 {
 }