You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/05/07 15:21:46 UTC
[ignite] 38/41: GG-17385 [IGNITE-11499] SQL: DML internal batch
size is 1 by default to prevent deadlock
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch gg-18540
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 9666cbe16841fa78b1e5d59f8c0d1578cd0060b6
Author: tledkov <tl...@gridgain.com>
AuthorDate: Mon May 6 14:10:02 2019 +0300
GG-17385 [IGNITE-11499] SQL: DML internal batch size is 1 by default to prevent deadlock
---
.../jdbc/thin/JdbcThinConnectionSelfTest.java | 33 +++-
.../apache/ignite/cache/query/SqlFieldsQuery.java | 35 ++++
.../internal/jdbc/thin/ConnectionProperties.java | 15 ++
.../jdbc/thin/ConnectionPropertiesImpl.java | 29 +++-
.../ignite/internal/jdbc/thin/JdbcThinTcpIo.java | 8 +-
.../odbc/jdbc/JdbcConnectionContext.java | 11 +-
.../processors/odbc/jdbc/JdbcRequestHandler.java | 9 +-
.../internal/processors/odbc/jdbc/JdbcUtils.java | 24 +++
.../processors/odbc/odbc/OdbcRequestHandler.java | 1 +
.../processors/query/SqlClientContext.java | 17 +-
.../processors/query/h2/H2TableDescriptor.java | 4 +-
.../processors/query/h2/IgniteH2Indexing.java | 6 +-
.../processors/query/h2/QueryParameters.java | 27 +++-
.../processors/query/DmlBatchSizeDeadlockTest.java | 178 +++++++++++++++++++++
.../query/IgniteSqlNotNullConstraintTest.java | 40 ++---
.../IgniteBinaryCacheQueryTestSuite2.java | 2 +
16 files changed, 389 insertions(+), 50 deletions(-)
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
index 1dc35f6..56e0aa5 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
@@ -247,15 +247,42 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest {
}
/**
+ * Test update batch size property.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUpdateBatchSize() throws Exception {
+ assertInvalid(urlWithAffinityAwarenessFlagSemicolon + ";updateBatchSize=-1",
+ "Property cannot be lower than 1 [name=updateBatchSize, value=-1]");
+
+ try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlagSemicolon)) {
+ for (JdbcThinTcpIo io: ios(conn))
+ assertNull(io.connectionProperties().getUpdateBatchSize());
+ }
+
+ try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlagSemicolon
+ + ";updateBatchSize=1024")) {
+ for (JdbcThinTcpIo io: ios(conn))
+ assertEquals(1024, (int)io.connectionProperties().getUpdateBatchSize());
+ }
+
+ try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag +
+ "&updateBatchSize=1024")) {
+ for (JdbcThinTcpIo io: ios(conn))
+ assertEquals(1024, (int)io.connectionProperties().getUpdateBatchSize());
+ }
+ }
+
+ /**
* Test SQL hints.
*
* @throws Exception If failed.
*/
@Test
public void testSqlHints() throws Exception {
- try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag)) {
- assertHints(conn, false, false, false, false, false,
- false, affinityAwareness);
+ try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) {
+ assertHints(conn, false, false, false, false, false, false, affinityAwareness);
}
try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag + "&distributedJoins=true")) {
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 7ee7618..8be968e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -49,6 +49,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
/** */
private static final long serialVersionUID = 0L;
+ /** Default value of the update internal batch size. */
+ private static final int DFLT_UPDATE_BATCH_SIZE = 1;
+
/** Do not remove. For tests only. */
@SuppressWarnings("NonConstantFieldWithUpperCaseName")
private static boolean DFLT_LAZY;
@@ -88,6 +91,12 @@ public class SqlFieldsQuery extends Query<List<?>> {
private Boolean dataPageScanEnabled;
/**
+ * Update internal batch size. Default is 1 to prevent deadlock on update where keys sequence are different in
+ * several concurrent updates.
+ */
+ private int updateBatchSize = DFLT_UPDATE_BATCH_SIZE;
+
+ /**
* Copy constructs SQL fields query.
*
* @param qry SQL query.
@@ -104,6 +113,7 @@ public class SqlFieldsQuery extends Query<List<?>> {
parts = qry.parts;
schema = qry.schema;
dataPageScanEnabled = qry.dataPageScanEnabled;
+ updateBatchSize = qry.updateBatchSize;
}
/**
@@ -408,6 +418,31 @@ public class SqlFieldsQuery extends Query<List<?>> {
}
/**
+ * Gets update internal bach size.
+ * Default is 1 to prevent deadlock on update where keys sequence are different in
+ * several concurrent updates.
+ *
+ * @return Update internal batch size
+ */
+ public int getUpdateBatchSize() {
+ return updateBatchSize;
+ }
+
+ /**
+ * Sets update internal bach size.
+ * Default is 1 to prevent deadlock on update where keys sequence are different in
+ * several concurrent updates.
+ *
+ * @param updateBatchSize Update internal batch size.
+ * @return {@code this} for chaining.
+ */
+ public SqlFieldsQuery setUpdateBatchSize(int updateBatchSize) {
+ this.updateBatchSize = updateBatchSize;
+
+ return this;
+ }
+
+ /**
* @return Copy of this query.
*/
public SqlFieldsQuery copy() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
index 53df56e..2cab155 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
@@ -416,4 +416,19 @@ public interface ConnectionProperties {
* for this connection, if {@code false} then it's disabled.
*/
public void setAffinityAwareness(boolean affinityAwareness);
+
+ /**
+ * Note: Batch size of 1 prevents deadlock on update where keys sequence are different in several concurrent updates.
+ *
+ * @return update internal bach size.
+ */
+ @Nullable public Integer getUpdateBatchSize();
+
+ /**
+ * Note: Set to 1 to prevent deadlock on update where keys sequence are different in several concurrent updates.
+ *
+ * @param updateBatchSize update internal bach size.
+ * @throws SQLException On error.
+ */
+ public void setUpdateBatchSize(@Nullable Integer updateBatchSize) throws SQLException;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
index e9d5b0c..02ca1e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
@@ -194,6 +194,12 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
"Whether jdbc thin affinity awareness is enabled.",
false, false);
+ /** Update batch size (the size of internal batches are used for INSERT/UPDATE/DELETE operation). */
+ private IntegerProperty updateBatchSize = new IntegerProperty("updateBatchSize",
+ "Update bach size (the size of internal batches are used for INSERT/UPDATE/DELETE operation). " +
+ "Set to 1 to prevent deadlock on update where keys sequence are different " +
+ "in several concurrent updates.", null, false, 1, Integer.MAX_VALUE);
+
/** Properties array. */
private final ConnectionProperty [] propsArray = {
distributedJoins, enforceJoinOrder, collocated, replicatedOnly, autoCloseServerCursor,
@@ -204,7 +210,8 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
sslTrustAll, sslFactory,
user, passwd,
dataPageScanEnabled,
- affinityAwareness
+ affinityAwareness,
+ updateBatchSize
};
/** {@inheritDoc} */
@@ -520,6 +527,16 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
this.affinityAwareness.setValue(affinityAwareness);
}
+ /** {@inheritDoc} */
+ @Override public @Nullable Integer getUpdateBatchSize() {
+ return updateBatchSize.value();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setUpdateBatchSize(@Nullable Integer updateBatchSize) throws SQLException {
+ this.updateBatchSize.setValue(updateBatchSize);
+ }
+
/**
* @param url URL connection.
* @param props Environment properties.
@@ -1020,8 +1037,6 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
NumberProperty(String name, String desc, Number dfltVal, boolean required, Number min, Number max) {
super(name, desc, dfltVal, null, required);
- assert dfltVal != null;
-
val = dfltVal;
range = new Number[] {min, max};
@@ -1030,7 +1045,7 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
/** {@inheritDoc} */
@Override void init(String str) throws SQLException {
if (str == null)
- val = (int)dfltVal;
+ val = dfltVal != null ? (int)dfltVal : null;
else {
try {
setValue(parse(str));
@@ -1051,7 +1066,7 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
/** {@inheritDoc} */
@Override String valueObject() {
- return String.valueOf(val);
+ return val != null ? String.valueOf(val) : null;
}
/**
@@ -1102,8 +1117,8 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
/**
* @return Property value.
*/
- int value() {
- return val.intValue();
+ Integer value() {
+ return val != null ? val.intValue() : null;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index d741320..7c5759e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -23,8 +23,8 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.sql.SQLException;
import java.util.List;
-import java.util.UUID;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.QueryCancelledException;
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryFetchRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryMetadataRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcUtils;
import org.apache.ignite.internal.util.ipc.loopback.IpcClientTcpEndpoint;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -246,9 +247,12 @@ public class JdbcThinTcpIo {
if (ver.compareTo(VER_2_7_0) >= 0)
writer.writeString(connProps.nestedTxMode());
- if (ver.compareTo(VER_2_8_0) >= 0)
+ if (ver.compareTo(VER_2_8_0) >= 0) {
writer.writeByte(nullableBooleanToByte(connProps.isDataPageScanEnabled()));
+ JdbcUtils.writeNullableInteger(writer, connProps.getUpdateBatchSize());
+ }
+
if (!F.isEmpty(connProps.getUsername())) {
assert ver.compareTo(VER_2_5_0) >= 0 : "Authentication is supported since 2.5";
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index f713b53..34cdabe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -106,7 +106,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
* @param ctx Kernal Context.
* @param ses Session.
* @param busyLock Shutdown busy lock.
- * @param connId
+ * @param connId Connection ID.
* @param maxCursors Maximum allowed cursors.
*/
public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, long connId,
@@ -166,12 +166,15 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
}
}
-
Boolean dataPageScanEnabled = null;
+ Integer updateBatchSize = null;
- if (ver.compareTo(VER_2_8_0) >= 0)
+ if (ver.compareTo(VER_2_8_0) >= 0) {
dataPageScanEnabled = nullableBooleanFromByte(reader.readByte());
+ updateBatchSize = JdbcUtils.readNullableInteger(reader);
+ }
+
if (ver.compareTo(VER_2_5_0) >= 0) {
String user = null;
String passwd = null;
@@ -206,7 +209,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
handler = new JdbcRequestHandler(busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder,
collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, nestedTxMode,
- dataPageScanEnabled, actx, ver, this);
+ dataPageScanEnabled, updateBatchSize, actx, ver, this);
handler.start();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 362b0c8..42edcc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -171,6 +171,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
* @param autoCloseCursors Flag to automatically close server cursors.
* @param lazy Lazy query execution flag.
* @param skipReducerOnUpdate Skip reducer on update flag.
+ * @param dataPageScanEnabled Enable scan data page mode.
+ * @param updateBatchSize Size of internal batch for DML queries.
* @param actx Authentication context.
* @param protocolVer Protocol version.
* @param connCtx Jdbc connection context.
@@ -188,6 +190,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
boolean skipReducerOnUpdate,
NestedTxMode nestedTxMode,
@Nullable Boolean dataPageScanEnabled,
+ @Nullable Integer updateBatchSize,
AuthorizationContext actx,
ClientListenerProtocolVersion protocolVer,
JdbcConnectionContext connCtx
@@ -212,7 +215,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
replicatedOnly,
lazy,
skipReducerOnUpdate,
- dataPageScanEnabled
+ dataPageScanEnabled,
+ updateBatchSize
);
this.busyLock = busyLock;
@@ -968,6 +972,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
if (cliCtx.dataPageScanEnabled() != null)
qry.setDataPageScanEnabled(cliCtx.dataPageScanEnabled());
+
+ if (cliCtx.updateBatchSize() != null)
+ qry.setUpdateBatchSize(cliCtx.updateBatchSize());
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
index f07a295..1befe4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
+import org.jetbrains.annotations.Nullable;
/**
* Various JDBC utility methods.
@@ -104,4 +105,27 @@ public class JdbcUtils {
else
return Collections.emptyList();
}
+
+ /**
+ * Read nullable Integer.
+ *
+ * @param reader Binary reader.
+ * @return read value.
+ */
+ @Nullable public static Integer readNullableInteger(BinaryReaderExImpl reader) {
+ return reader.readBoolean() ? reader.readInt() : null;
+ }
+
+ /**
+ * Write nullable integer.
+ *
+ * @param writer Binary writer.
+ * @param val Integer value..
+ */
+ public static void writeNullableInteger(BinaryWriterExImpl writer, @Nullable Integer val) {
+ writer.writeBoolean(val != null);
+
+ if (val != null)
+ writer.writeInt(val);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
index 7d3f9bb..2dd7338 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
@@ -166,6 +166,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler {
replicatedOnly,
lazy,
skipReducerOnUpdate,
+ null,
null
);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
index 46e918e..a365e13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
@@ -60,6 +60,9 @@ public class SqlClientContext implements AutoCloseable {
/** Data page scan support for query execution. */
private final @Nullable Boolean dataPageScanEnabled;
+ /** Update internal batch size. */
+ private final @Nullable Integer updateBatchSize;
+
/** Monitor for stream operations. */
private final Object muxStreamer = new Object();
@@ -103,11 +106,15 @@ public class SqlClientContext implements AutoCloseable {
* @param replicatedOnly Replicated caches only flag.
* @param lazy Lazy query execution flag.
* @param skipReducerOnUpdate Skip reducer on update flag.
+ * @param dataPageScanEnabled Enable scan data page mode.
+ * @param updateBatchSize Size of internal batch for DML queries.
*/
public SqlClientContext(GridKernalContext ctx, Factory<GridWorker> orderedBatchWorkerFactory,
boolean distributedJoins, boolean enforceJoinOrder,
boolean collocated, boolean replicatedOnly, boolean lazy, boolean skipReducerOnUpdate,
- @Nullable Boolean dataPageScanEnabled) {
+ @Nullable Boolean dataPageScanEnabled,
+ @Nullable Integer updateBatchSize
+ ) {
this.ctx = ctx;
this.orderedBatchWorkerFactory = orderedBatchWorkerFactory;
this.distributedJoins = distributedJoins;
@@ -117,6 +124,7 @@ public class SqlClientContext implements AutoCloseable {
this.lazy = lazy;
this.skipReducerOnUpdate = skipReducerOnUpdate;
this.dataPageScanEnabled = dataPageScanEnabled;
+ this.updateBatchSize = updateBatchSize;
log = ctx.log(SqlClientContext.class.getName());
}
@@ -227,6 +235,13 @@ public class SqlClientContext implements AutoCloseable {
}
/**
+ * @return Update internal batch size.
+ */
+ public @Nullable Integer updateBatchSize() {
+ return updateBatchSize;
+ }
+
+ /**
* @return Streaming state flag (on or off).
*/
public boolean isStream() {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java
index 6d43f48..003776b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java
@@ -50,10 +50,10 @@ public class H2TableDescriptor {
/** PK index name. */
public static final String PK_IDX_NAME = "_key_PK";
- /** PK hashindex name */
+ /** PK hash index name. */
public static final String PK_HASH_IDX_NAME = "_key_PK_hash";
- /** Affinity key index name */
+ /** Affinity key index name. */
public static final String AFFINITY_KEY_IDX_NAME = "AFFINITY_KEY";
/** Indexing. */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 598072f..154a624 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1268,6 +1268,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param cancel Cancel.
* @param timeout Timeout.
* @return Fields query.
+ * @throws IgniteCheckedException On error.
*/
private QueryCursorImpl<List<?>> executeSelectForDml(
String schema,
@@ -1314,6 +1315,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param inTx Flag whether query is executed within transaction.
* @param timeout Timeout.
* @return Query result.
+ * @throws IgniteCheckedException On error.
*/
private Iterable<List<?>> executeSelect0(
QueryDescriptor qryDesc,
@@ -2349,7 +2351,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
List<List<List<?>>> cur = plan.createRows(argss);
//TODO: IGNITE-11176 - Need to support cancellation
- ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.pageSize());
+ ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.updateBatchSize());
}
finally {
DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
@@ -2623,7 +2625,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}, cancel);
}
- int pageSize = loc ? 0 : qryParams.pageSize();
+ int pageSize = qryParams.updateBatchSize();
//TODO: IGNITE-11176 - Need to support cancellation
return DmlUtils.processSelectResult(plan, cur, pageSize);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java
index 15e6abe..2b33f4a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java
@@ -55,6 +55,12 @@ public class QueryParameters {
private final List<Object[]> batchedArgs;
/**
+ * Update internal batch size.
+ * Default is 1 to prevent deadlock on update where keys sequence are different in several concurrent updates.
+ */
+ private final int updateBatchSize;
+
+ /**
* Create parameters from query.
*
* @param qry Query.
@@ -85,7 +91,8 @@ public class QueryParameters {
qry.isDataPageScanEnabled(),
nestedTxMode,
autoCommit,
- batchedArgs
+ batchedArgs,
+ qry.getUpdateBatchSize()
);
}
@@ -101,6 +108,7 @@ public class QueryParameters {
* @param nestedTxMode Nested TX mode.
* @param autoCommit Auto-commit flag.
* @param batchedArgs Batched arguments.
+ * @param updateBatchSize Update internal batch size.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
private QueryParameters(
@@ -112,7 +120,8 @@ public class QueryParameters {
Boolean dataPageScanEnabled,
NestedTxMode nestedTxMode,
boolean autoCommit,
- List<Object[]> batchedArgs
+ List<Object[]> batchedArgs,
+ int updateBatchSize
) {
this.args = args;
this.parts = parts;
@@ -123,6 +132,7 @@ public class QueryParameters {
this.nestedTxMode = nestedTxMode;
this.autoCommit = autoCommit;
this.batchedArgs = batchedArgs;
+ this.updateBatchSize = updateBatchSize;
}
/**
@@ -192,6 +202,16 @@ public class QueryParameters {
}
/**
+ * Gets update internal bach size.
+ * Default is 1 to prevent deadlock on update where keys sequance are different in several concurrent updates.
+ *
+ * @return Update internal batch size
+ */
+ public int updateBatchSize() {
+ return updateBatchSize;
+ }
+
+ /**
* Convert current batched arguments to a form with single arguments.
*
* @param args Arguments.
@@ -207,7 +227,8 @@ public class QueryParameters {
this.dataPageScanEnabled,
this.nestedTxMode,
this.autoCommit,
- null
+ null,
+ this.updateBatchSize
);
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/DmlBatchSizeDeadlockTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/DmlBatchSizeDeadlockTest.java
new file mode 100644
index 0000000..790ab72
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/DmlBatchSizeDeadlockTest.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests DML deadlock with different update batch size.
+ */
+public class DmlBatchSizeDeadlockTest extends AbstractIndexingCommonTest {
+ /** Keys count. */
+ private static final int KEY_CNT = 1000;
+
+ /** Test time to run. */
+ private static final int TEST_TIME = 20_000;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws IgniteCheckedException On error.
+ */
+ @Test
+ public void testDeadlockOnDmlAtomic() throws IgniteCheckedException {
+ checkDeadlockOnDml(CacheAtomicityMode.ATOMIC);
+ }
+
+ /**
+ * @throws IgniteCheckedException On error.
+ */
+ @Test
+ public void testDeadlockOnDmlTransactional() throws IgniteCheckedException {
+ checkDeadlockOnDml(CacheAtomicityMode.TRANSACTIONAL);
+ }
+
+ /**
+ * @param mode Atomicity mode.
+ * @throws IgniteCheckedException On failed.
+ */
+ public void checkDeadlockOnDml(CacheAtomicityMode mode) throws IgniteCheckedException {
+ IgniteCache<Long, Long> cache = createCache(mode);
+
+ final long tEnd = U.currentTimeMillis() + TEST_TIME;
+
+ final IgniteInternalFuture futAsc = GridTestUtils.runAsync(() -> {
+ while (U.currentTimeMillis() < tEnd) {
+ try {
+ sql("UPDATE test SET val = 2 ORDER BY id ASC");
+ }
+ catch (Exception e) {
+ IgniteSQLException esql = X.cause(e, IgniteSQLException.class);
+
+ if (esql == null || !esql.getMessage().contains("Failed to update some keys because they " +
+ "had been modified concurrently"))
+ throw e;
+ }
+ }
+ });
+
+ final IgniteInternalFuture futDesc = GridTestUtils.runAsync(() -> {
+ while (U.currentTimeMillis() < tEnd) {
+ while (U.currentTimeMillis() < tEnd) {
+ try {
+ sql("UPDATE test SET val = 3 ORDER BY id DESC");
+ }
+ catch (Exception e) {
+ IgniteSQLException esql = X.cause(e, IgniteSQLException.class);
+
+ if (esql == null || !esql.getMessage().contains("Failed to update some keys because they " +
+ "had been modified concurrently"))
+ throw e;
+ }
+ }
+ }
+ });
+
+ final IgniteInternalFuture futCache = GridTestUtils.runAsync(() -> {
+ while (U.currentTimeMillis() < tEnd) {
+ Map<Long, Long> map = new LinkedHashMap();
+
+ for (long i = KEY_CNT - 1; i >= 0; --i)
+ map.put(i, i);
+
+ cache.putAll(map);
+ }
+ });
+
+ boolean deadlock = !GridTestUtils.waitForCondition(
+ () -> futAsc.isDone() && futDesc.isDone() && futCache.isDone(),
+ TEST_TIME + 5000);
+
+ if (deadlock) {
+ futAsc.cancel();
+ futDesc.cancel();
+ futCache.cancel();
+
+ fail("Deadlock on DML");
+ }
+ }
+
+ /**
+ * @param mode Cache atomicity mode.
+ * @return Created test cache.
+ */
+ private IgniteCache<Long, Long> createCache(CacheAtomicityMode mode) {
+ IgniteCache<Long, Long> c = grid().createCache(new CacheConfiguration<Long, Long>()
+ .setName("test")
+ .setSqlSchema("TEST")
+ .setAtomicityMode(mode)
+ .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class)
+ .setTableName("test")
+ .addQueryField("id", Long.class.getName(), null)
+ .addQueryField("val", Long.class.getName(), null)
+ .setKeyFieldName("id")
+ .setValueFieldName("val")
+ ))
+ .setAffinity(new RendezvousAffinityFunction(false, 10)));
+
+ for (long i = 0; i < KEY_CNT; ++i)
+ c.put(i, i);
+
+ return c;
+ }
+
+ /**
+ * @param sql SQL query.
+ * @param args Query parameters.
+ * @return Results cursor.
+ */
+ private FieldsQueryCursor<List<?>> sql(String sql, Object... args) {
+ return grid().context().query().querySqlFields(new SqlFieldsQuery(sql)
+ .setSchema("TEST")
+ .setUpdateBatchSize(1)
+ .setArgs(args), false);
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java
index 8bd00f7..c19bd8b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlNotNullConstraintTest.java
@@ -779,27 +779,10 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
/** */
private void checkNotNullCheckDmlInsertValues(CacheAtomicityMode atomicityMode) throws Exception {
- executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR NOT NULL) WITH \"atomicity="
+ executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR NOT NULL, age INT) WITH \"atomicity="
+ atomicityMode.name() + "\"");
- GridTestUtils.assertThrows(log(), new Callable<Object>() {
- @Override public Object call() throws Exception {
- executeSql("INSERT INTO test(id, name) " +
- "VALUES (1, 'ok'), (2, NULLIF('a', 'a')), (3, 'ok')");
-
- return null;
- }
- }, IgniteSQLException.class, ERR_MSG);
-
- List<List<?>> result = executeSql("SELECT id, name FROM test ORDER BY id");
-
- assertEquals(0, result.size());
-
- executeSql("INSERT INTO test(id, name) VALUES (1, 'ok'), (2, 'ok2'), (3, 'ok3')");
-
- result = executeSql("SELECT id, name FROM test ORDER BY id");
-
- assertEquals(3, result.size());
+ checkNotNullInsertValues();
}
/** */
@@ -821,10 +804,16 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
executeSql("ALTER TABLE test ADD COLUMN name VARCHAR NOT NULL");
+ checkNotNullInsertValues();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkNotNullInsertValues() throws Exception {
GridTestUtils.assertThrows(log(), new Callable<Object>() {
@Override public Object call() throws Exception {
- executeSql("INSERT INTO test(id, name, age) " +
- "VALUES (1, 'ok', 1), (2, NULLIF('a', 'a'), 2), (3, 'ok', 3)");
+ executeSql("INSERT INTO test(id, name, age) VALUES (2, NULLIF('a', 'a'), 2)");
return null;
}
@@ -846,7 +835,7 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
public void testNotNullCheckDmlInsertFromSelect() throws Exception {
executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR, age INT)");
- executeSql("INSERT INTO test(id, name, age) VALUES (1, 'Macy', 25), (2, null, 25), (3, 'John', 30)");
+ executeSql("INSERT INTO test(id, name, age) VALUES (2, null, 25)");
GridTestUtils.assertThrows(log(), new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -860,6 +849,7 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
assertEquals(0, result.size());
+ executeSql("INSERT INTO test(id, name, age) VALUES (1, 'Macy', 25), (3, 'John', 30)");
executeSql("DELETE FROM test WHERE id = 2");
result = executeSql("INSERT INTO " + TABLE_PERSON + "(_key, name, age) " + "SELECT id, name, age FROM test");
@@ -905,9 +895,9 @@ public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
GridTestUtils.assertThrows(log(), new Callable<Object>() {
@Override public Object call() throws Exception {
- return executeSql("UPDATE dest" +
- " p SET (name) = " +
- "(SELECT name FROM src t WHERE p.id = t.id)");
+ return executeSql("UPDATE dest p " +
+ "SET (name) = (SELECT name FROM src t WHERE p.id = t.id) " +
+ "WHERE p.id = 2");
}
}, IgniteSQLException.class, ERR_MSG);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
index 416cc9e..dcb6dfb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.index.DynamicIndexReplicatedT
import org.apache.ignite.internal.processors.cache.query.ScanQueryOffheapExpiryPolicySelfTest;
import org.apache.ignite.internal.processors.database.baseline.IgniteChangingBaselineCacheQueryNodeRestartSelfTest;
import org.apache.ignite.internal.processors.database.baseline.IgniteStableBaselineCacheQueryNodeRestartsSelfTest;
+import org.apache.ignite.internal.processors.query.DmlBatchSizeDeadlockTest;
import org.apache.ignite.internal.processors.query.IgniteCacheGroupsCompareQueryTest;
import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlDistributedJoinSelfTest;
import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexMultiNodeSelfTest;
@@ -141,6 +142,7 @@ import org.junit.runners.Suite;
LocalQueryLazyTest.class,
LongRunningQueryTest.class,
+ DmlBatchSizeDeadlockTest.class
})
public class IgniteBinaryCacheQueryTestSuite2 {
}