You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/17 15:50:19 UTC
[ignite] branch master updated: IGNITE-9171: SQL: redesigned lazy
mode. This closes #5473.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e6a5690 IGNITE-9171: SQL: redesigned lazy mode. This closes #5473.
e6a5690 is described below
commit e6a56902c38ca001cba954af37787fa31cc750d1
Author: tledkov-gridgain <tl...@gridgain.com>
AuthorDate: Sun Feb 17 18:50:10 2019 +0300
IGNITE-9171: SQL: redesigned lazy mode. This closes #5473.
---
.../jdbc/thin/JdbcThinStatementCancelSelfTest.java | 4 +-
.../org/apache/ignite/IgniteSystemProperties.java | 7 +-
.../ignite/cache/query/QueryRetryException.java} | 22 +-
.../apache/ignite/cache/query/SqlFieldsQuery.java | 8 +-
.../processors/cache/GridCacheEventManager.java | 2 +-
.../processors/odbc/ClientListenerProcessor.java | 4 +-
.../h2/twostep/messages/GridQueryFailResponse.java | 12 +-
.../apache/ignite/internal/util/IgniteUtils.java | 5 +-
.../processors/query/h2/ConnectionManager.java | 5 +-
.../processors/query/h2/H2ConnectionWrapper.java | 5 +-
.../processors/query/h2/H2FieldsIterator.java | 5 +-
.../internal/processors/query/h2/H2Utils.java | 16 +
.../processors/query/h2/IgniteH2Indexing.java | 60 +-
.../processors/query/h2/opt/GridH2Table.java | 198 ++++-
.../processors/query/h2/opt/QueryContext.java | 21 -
.../query/h2/opt/QueryContextRegistry.java | 7 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 538 ++++++-------
.../query/h2/twostep/GridReduceQueryExecutor.java | 61 +-
.../query/h2/twostep/MapNodeResults.java | 12 +-
.../query/h2/twostep/MapQueryLazyWorker.java | 200 -----
.../query/h2/twostep/MapQueryLazyWorkerKey.java | 97 ---
.../query/h2/twostep/MapQueryResult.java | 200 +++--
.../query/h2/twostep/MapQueryResults.java | 137 ++--
.../query/h2/twostep/PartitionReservation.java | 17 +-
.../h2/twostep/PartitionReservationManager.java | 18 +-
.../query/h2/twostep/ReduceResultPage.java | 22 +-
.../apache/ignite/client/FunctionalQueryTest.java | 71 +-
.../GridCacheLazyQueryPartitionsReleaseTest.java | 2 -
...tributedQueryStopOnCancelOrTimeoutSelfTest.java | 46 +-
...eCacheQueryAbstractDistributedJoinSelfTest.java | 5 +
...cheQueryNodeRestartDistributedJoinSelfTest.java | 16 +-
...opOnCancelOrTimeoutDistributedJoinSelfTest.java | 26 +-
.../DynamicColumnsAbstractConcurrentSelfTest.java | 28 +-
.../cache/index/DynamicIndexAbstractSelfTest.java | 37 +-
.../cache/index/H2ConnectionLeaksSelfTest.java | 2 +-
...niteCacheLocalQueryCancelOrTimeoutSelfTest.java | 3 +-
...actQueryTableLockAndConnectionPoolSelfTest.java | 854 +++++++++++++++++++++
...TableLockAndConnectionPoolLazyModeOffTest.java} | 16 +-
...yTableLockAndConnectionPoolLazyModeOnTest.java} | 16 +-
.../processors/query/LazyQuerySelfTest.java | 392 ----------
.../processors/query/h2/QueryDataPageScanTest.java | 2 +-
.../DisappearedCacheCauseRetryMessageSelfTest.java | 15 +-
.../h2/twostep/RetryCauseMessageSelfTest.java | 26 -
.../processors/query/oom/AbstractQueryOOMTest.java | 429 +++++++++++
.../query/oom/IgniteQueryOOMTestSuite.java} | 13 +-
.../oom/QueryOOMWithQueryParallelismTest.java} | 15 +-
.../oom/QueryOOMWithoutQueryParallelismTest.java} | 15 +-
.../IgniteBinaryCacheQueryTestSuite.java | 6 +-
.../IgniteBinaryCacheQueryLazyTestSuite.java} | 21 +-
.../IgniteBinaryCacheQueryLazyTestSuite2.java} | 21 +-
.../Cache/Query/CacheQueriesTest.cs | 6 +-
.../Client/Cache/SqlQueryTest.cs | 4 +-
.../nodejs/spec/query/SqlFieldsQuery.spec.js | 23 +-
modules/platforms/php/tests/SqlFieldsQueryTest.php | 32 +-
.../benchmark-native-sql-select-join.properties | 6 +-
.../config/benchmark-native-sql-select.properties | 6 +-
.../apache/ignite/yardstick/jdbc/JdbcUtils.java | 62 +-
.../jdbc/NativeSqlJoinQueryRangeBenchmark.java | 80 +-
58 files changed, 2487 insertions(+), 1492 deletions(-)
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java
index 6bc1bb0..4d54456 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java
@@ -439,9 +439,9 @@ public class JdbcThinStatementCancelSelfTest extends JdbcThinAbstractSelfTest {
IgniteInternalFuture cancelRes = cancel(stmt);
GridTestUtils.assertThrows(log, () -> {
- stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)");
+ stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30) OR _key < " + MAX_ROWS);
stmt.addBatch("update Long set _val = _val + 1 where awaitLatchCancelled() = 0");
- stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)");
+ stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30) OR _key < " + MAX_ROWS);
stmt.addBatch("update Long set _val = _val + 1 where shouldNotBeCalledInCaseOfCancellation()");
stmt.executeBatch();
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 8438968..64982d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -492,7 +492,12 @@ public final class IgniteSystemProperties {
/** Disable fallback to H2 SQL parser if the internal SQL parser fails to parse the statement. */
public static final String IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK = "IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK";
- /** Force all SQL queries to be processed lazily regardless of what clients request. */
+ /**
+ * Force all SQL queries to be processed lazily regardless of what clients request.
+ *
+ * @deprecated Since version 2.8.
+ */
+ @Deprecated
public static final String IGNITE_SQL_FORCE_LAZY_RESULT_SET = "IGNITE_SQL_FORCE_LAZY_RESULT_SET";
/** Disable SQL system views. */
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryRetryException.java
similarity index 62%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/core/src/main/java/org/apache/ignite/cache/query/QueryRetryException.java
index 0a6181e..10afa00 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryRetryException.java
@@ -15,15 +15,21 @@
* limitations under the License.
*/
-package org.apache.ignite.testsuites;
+package org.apache.ignite.cache.query;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+import org.apache.ignite.IgniteException;
/**
- * Special test suite with ignored tests for Binary mode.
+ * The exception is thrown if a query was cancelled or timed out while executing.
*/
-@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
-}
+public class QueryRetryException extends IgniteException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param tableName Table name.
+ */
+ public QueryRetryException(String tableName) {
+ super("Table was modified concurrently (please retry the query): " + tableName);
+ }
+}
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index fe283e9..aa66716 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -51,6 +51,10 @@ public class SqlFieldsQuery extends Query<List<?>> {
/** */
private static final long serialVersionUID = 0L;
+ /** Do not remove. For tests only. */
+ @SuppressWarnings("NonConstantFieldWithUpperCaseName")
+ private static boolean DFLT_LAZY;
+
/** SQL Query. */
private String sql;
@@ -73,8 +77,8 @@ public class SqlFieldsQuery extends Query<List<?>> {
/** */
private boolean replicatedOnly;
- /** */
- private boolean lazy;
+ /** Lazy mode is default since Ignite v.2.8. */
+ private boolean lazy = DFLT_LAZY;
/** Partitions for query */
private int[] parts;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 726a6c8..501da08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -392,7 +392,7 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
return false;
return cctx0 != null && cctx0.userCache() && cctx0.gridEvents().isRecordable(type)
- && !cctx0.config().isEventsDisabled();
+ && cctx0.config() != null && !cctx0.config().isEventsDisabled();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
index 5870f40..c988f33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
@@ -284,9 +284,9 @@ public class ClientListenerProcessor extends GridProcessorAdapter {
}
if (connCtx.handler().isCancellationCommand(cmdType)) {
- proceedMessageReceived(ses, msg);
-
CANCEL_COUNTER.incrementAndGet();
+
+ proceedMessageReceived(ses, msg);
}
else {
connCtx.handler().registerRequest(reqId, cmdType);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
index ef26d2a..7d66e00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages;
import java.nio.ByteBuffer;
import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.QueryRetryException;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -34,6 +35,9 @@ public class GridQueryFailResponse implements Message {
/** Cancelled by originator failure type. */
public static final byte CANCELLED_BY_ORIGINATOR = 1;
+ /** Execution error. Query should be retried. */
+ public static final byte RETRY_QUERY = 2;
+
/** */
private static final long serialVersionUID = 0L;
@@ -60,7 +64,13 @@ public class GridQueryFailResponse implements Message {
public GridQueryFailResponse(long qryReqId, Throwable err) {
this.qryReqId = qryReqId;
this.errMsg = err.getMessage();
- this.failCode = err instanceof QueryCancelledException ? CANCELLED_BY_ORIGINATOR : GENERAL_ERROR;
+
+ if (err instanceof QueryCancelledException)
+ this.failCode = CANCELLED_BY_ORIGINATOR;
+ else if (err instanceof QueryRetryException)
+ this.failCode = RETRY_QUERY;
+ else
+ this.failCode = GENERAL_ERROR;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 8fb0a70..c683827 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -4096,13 +4096,14 @@ public abstract class IgniteUtils {
* @param log Logger to log possible checked exception with (optional).
*/
public static void close(@Nullable AutoCloseable rsrc, @Nullable IgniteLogger log) {
- if (rsrc != null)
+ if (rsrc != null) {
try {
rsrc.close();
}
catch (Exception e) {
- warn(log, "Failed to close resource: " + e.getMessage());
+ warn(log, "Failed to close resource: " + e.getMessage(), e);
}
+ }
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
index 18a842b..db67edf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
@@ -407,6 +407,9 @@ public class ConnectionManager {
if (connCleanupTask != null)
connCleanupTask.close();
+ // Needs to be released before SHUTDOWN.
+ closeConnections();
+
try (Connection c = connectionNoCache(QueryUtils.SCHEMA_INFORMATION); Statement s = c.createStatement()) {
s.execute("SHUTDOWN");
}
@@ -419,8 +422,6 @@ public class ConnectionManager {
sysConn = null;
}
-
- closeConnections();
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
index 9803b5d..074d62a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
@@ -126,9 +126,8 @@ public class H2ConnectionWrapper implements AutoCloseable {
return S.toString(H2ConnectionWrapper.class, this);
}
- /** Closes wrapped connection */
- @Override
- public void close() {
+ /** Closes wrapped connection. */
+ @Override public void close() {
U.closeQuiet(conn);
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
index 64647cb..cdafb65 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
@@ -49,6 +49,8 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> {
throws IgniteCheckedException {
super(data, forUpdate);
+ assert detachedConn != null;
+
this.mvccTracker = mvccTracker;
this.detachedConn = detachedConn;
}
@@ -68,8 +70,7 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> {
super.onClose();
}
finally {
- if (detachedConn != null)
- detachedConn.recycle();
+ detachedConn.recycle();
if (mvccTracker != null)
mvccTracker.onDone();
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index 4a73a1b..a9abc2b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -357,10 +357,26 @@ public class H2Utils {
* @param enforceJoinOrder Enforce join order of tables.
*/
public static void setupConnection(Connection conn, boolean distributedJoins, boolean enforceJoinOrder) {
+ setupConnection(conn,distributedJoins, enforceJoinOrder, false);
+ }
+
+ /**
+ * @param conn Connection to use.
+ * @param distributedJoins If distributed joins are enabled.
+ * @param enforceJoinOrder Enforce join order of tables.
+ * @param lazy Lazy query execution mode.
+ */
+ public static void setupConnection(
+ Connection conn,
+ boolean distributedJoins,
+ boolean enforceJoinOrder,
+ boolean lazy
+ ) {
Session s = session(conn);
s.setForceJoinOrder(enforceJoinOrder);
s.setJoinBatchEnabled(distributedJoins);
+ s.setLazyQueryExecution(lazy);
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 0a60018..ea7c3a2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -119,7 +119,6 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
-import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
@@ -869,31 +868,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
- final MapQueryLazyWorker lazyWorker = MapQueryLazyWorker.currentWorker();
-
- if (cancel != null) {
- cancel.set(new Runnable() {
- @Override public void run() {
- if (lazyWorker != null) {
- lazyWorker.submit(new Runnable() {
- @Override public void run() {
- cancelStatement(stmt);
- }
- });
- }
- else
- cancelStatement(stmt);
- }
- });
- }
+ if (cancel != null)
+ cancel.set(() -> cancelStatement(stmt));
Session ses = session(conn);
if (timeoutMillis > 0)
ses.setQueryTimeout(timeoutMillis);
-
- if (lazyWorker != null)
- ses.setLazyQueryExecution(true);
+ else
+ ses.setQueryTimeout(0);
try {
return stmt.executeQuery();
@@ -905,13 +888,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
throw new IgniteCheckedException("Failed to execute SQL query. " + e.getMessage(), e);
}
- finally {
- if (timeoutMillis > 0)
- ses.setQueryTimeout(0);
-
- if (lazyWorker != null)
- ses.setLazyQueryExecution(false);
- }
}
/**
@@ -991,20 +967,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
long time = U.currentTimeMillis() - start;
- long longQryExecTimeout = ctx.config().getLongQueryWarningTimeout();
-
- if (time > longQryExecTimeout) {
- ResultSet plan = executeSqlQuery(conn, preparedStatementWithParams(conn, "EXPLAIN " + sql,
- params, false), 0, null);
+ if (time > ctx.config().getLongQueryWarningTimeout()) {
+ // In lazy mode we have to use separate connection to gather plan to print warning.
+ // Otherwise the all tables are unlocked by this query.
+ try (Connection planConn = connMgr.connectionNoCache(conn.getSchema())) {
+ ResultSet plan = executeSqlQuery(planConn, preparedStatementWithParams(planConn, "EXPLAIN " + sql,
+ params, false), 0, null);
- plan.next();
+ plan.next();
- // Add SQL explain result message into log.
- String msg = "Query execution is too long [time=" + time + " ms, sql='" + sql + '\'' +
- ", plan=" + U.nl() + plan.getString(1) + U.nl() + ", parameters=" +
- (params == null ? "[]" : Arrays.deepToString(params.toArray())) + "]";
+ // Add SQL explain result message into log.
+ String msg = "Query execution is too long [time=" + time + " ms, sql='" + sql + '\'' +
+ ", plan=" + U.nl() + plan.getString(1) + U.nl() + ", parameters=" +
+ (params == null ? "[]" : Arrays.deepToString(params.toArray())) + "]";
- LT.warn(log, msg);
+ LT.warn(log, msg);
+ }
}
return rs;
@@ -2131,7 +2109,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (log.isDebugEnabled())
log.debug("Stopping cache query index...");
- mapQryExec.cancelLazyWorkers();
+ mapQryExec.stop();
qryCtxRegistry.clearSharedOnLocalNodeStop();
@@ -2285,8 +2263,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public void onKernalStop() {
- mapQryExec.cancelLazyWorkers();
-
connMgr.onKernalStop();
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 173e4a4..e04fab1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -24,11 +24,13 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.cache.query.QueryRetryException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -40,7 +42,6 @@ import org.apache.ignite.internal.processors.query.h2.H2TableDescriptor;
import org.apache.ignite.internal.processors.query.h2.IndexRebuildPartialClosure;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndexBase;
-import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import org.apache.ignite.internal.util.typedef.F;
import org.h2.command.ddl.CreateTableData;
import org.h2.command.dml.Insert;
@@ -56,6 +57,7 @@ import org.h2.result.SortOrder;
import org.h2.schema.SchemaObject;
import org.h2.table.Column;
import org.h2.table.IndexColumn;
+import org.h2.table.Table;
import org.h2.table.TableBase;
import org.h2.table.TableType;
import org.h2.value.DataType;
@@ -70,6 +72,9 @@ public class GridH2Table extends TableBase {
/** Insert hack flag. */
private static final ThreadLocal<Boolean> INSERT_HACK = new ThreadLocal<>();
+ /** Exclusive lock constant. */
+ private static final long EXCLUSIVE_LOCK = -1;
+
/** Cache context info. */
private final GridCacheContextInfo cacheInfo;
@@ -92,10 +97,14 @@ public class GridH2Table extends TableBase {
private final ReentrantReadWriteLock lock;
/** */
- private boolean destroyed;
+ private volatile boolean destroyed;
- /** */
- private final ConcurrentMap<Session, Boolean> sessions = new ConcurrentHashMap<>();
+ /**
+ * Map of sessions locks.
+ * Session -> EXCLUSIVE_LOCK (-1L) - for exclusive locks.
+ * Session -> (table version) - for shared locks.
+ */
+ private final ConcurrentMap<Session, SessionLock> sessions = new ConcurrentHashMap<>();
/** */
private final IndexColumn affKeyCol;
@@ -121,6 +130,9 @@ public class GridH2Table extends TableBase {
/** Columns with thread-safe access. */
private volatile Column[] safeColumns;
+ /** Table version. The version is changed when exclusive lock is acquired (DDL operation is started). */
+ private final AtomicLong ver = new AtomicLong();
+
/**
* Creates table.
*
@@ -350,10 +362,17 @@ public class GridH2Table extends TableBase {
/** {@inheritDoc} */
@Override public boolean lock(Session ses, boolean exclusive, boolean force) {
// In accordance with base method semantics, we'll return true if we were already exclusively locked.
- Boolean res = sessions.get(ses);
+ SessionLock sesLock = sessions.get(ses);
+
+ if (sesLock != null) {
+ if (sesLock.isExclusive())
+ return true;
+
+ if (ver.get() != sesLock.version())
+ throw new QueryRetryException(getName());
- if (res != null)
- return res;
+ return false;
+ }
// Acquire the lock.
lock(exclusive);
@@ -365,25 +384,53 @@ public class GridH2Table extends TableBase {
}
// Mutate state.
- sessions.put(ses, exclusive);
+ sessions.put(ses, exclusive ? SessionLock.exclusiveLock() : SessionLock.sharedLock(ver.longValue()));
ses.addLock(this);
return false;
}
+ /** {@inheritDoc} */
+ @Override public void unlock(Session ses) {
+ SessionLock sesLock = sessions.remove(ses);
+
+ if (sesLock.locked)
+ unlock(sesLock.isExclusive());
+ }
+
/**
- * @return Table identifier.
+ * @param ses H2 session.
*/
- public QueryTable identifier() {
- return identifier;
+ private void readLockInternal(Session ses) {
+ SessionLock sesLock = sessions.get(ses);
+
+ assert sesLock != null && !sesLock.isExclusive()
+ : "Invalid table lock [name=" + getName() + ", lock=" + sesLock.ver + ']';
+
+ if (!sesLock.locked) {
+ lock(false);
+
+ sesLock.locked = true;
+ }
}
/**
- * @return Table identifier as string.
+ * Release table lock.
+ *
+ * @param ses H2 session.
*/
- public String identifierString() {
- return identifierStr;
+ private void unlockReadInternal(Session ses) {
+ SessionLock sesLock = sessions.get(ses);
+
+ assert sesLock != null && !sesLock.isExclusive()
+ : "Invalid table unlock [name=" + getName() + ", lock=" + sesLock.ver + ']';
+
+ if (sesLock.locked) {
+ sesLock.locked = false;
+
+ unlock(false);
+ }
}
/**
@@ -396,7 +443,7 @@ public class GridH2Table extends TableBase {
Lock l = exclusive ? lock.writeLock() : lock.readLock();
try {
- if (!exclusive || !GridMapQueryExecutor.FORCE_LAZY)
+ if (!exclusive)
l.lockInterruptibly();
else {
for (;;) {
@@ -405,6 +452,8 @@ public class GridH2Table extends TableBase {
else
Thread.yield();
}
+
+ ver.incrementAndGet();
}
}
catch (InterruptedException e) {
@@ -426,6 +475,33 @@ public class GridH2Table extends TableBase {
}
/**
+ * @param ses H2 session.
+ */
+ private void checkVersion(Session ses) {
+ SessionLock sesLock = sessions.get(ses);
+
+ assert sesLock != null && !sesLock.isExclusive()
+ : "Invalid table check version [name=" + getName() + ", lock=" + sesLock.ver + ']';
+
+ if (ver.longValue() != sesLock.version())
+ throw new QueryRetryException(getName());
+ }
+
+ /**
+ * @return Table identifier.
+ */
+ public QueryTable identifier() {
+ return identifier;
+ }
+
+ /**
+ * @return Table identifier as string.
+ */
+ public String identifierString() {
+ return identifierStr;
+ }
+
+ /**
* Check if table is not destroyed.
*/
private void ensureNotDestroyed() {
@@ -485,8 +561,6 @@ public class GridH2Table extends TableBase {
try {
ensureNotDestroyed();
- assert sessions.isEmpty() : sessions;
-
destroyed = true;
for (int i = 1, len = idxs.size(); i < len; i++)
@@ -507,16 +581,6 @@ public class GridH2Table extends TableBase {
this.rmIndex = rmIndex;
}
- /** {@inheritDoc} */
- @Override public void unlock(Session ses) {
- Boolean exclusive = sessions.remove(ses);
-
- if (exclusive == null)
- return;
-
- unlock(exclusive);
- }
-
/**
* Gets index by index.
*
@@ -1213,4 +1277,84 @@ public class GridH2Table extends TableBase {
return true;
}
+
+ /**
+ * @param s H2 session.
+ */
+ public static void unlockTables(Session s) {
+ for (Table t : s.getLocks()) {
+ if (t instanceof GridH2Table)
+ ((GridH2Table)t).unlockReadInternal(s);
+ }
+ }
+
+ /**
+ * @param s H2 session.
+ */
+ public static void readLockTables(Session s) {
+ for (Table t : s.getLocks()) {
+ if (t instanceof GridH2Table)
+ ((GridH2Table)t).readLockInternal(s);
+ }
+ }
+
+ /**
+ * @param s H2 session.
+ */
+ public static void checkTablesVersions(Session s) {
+ for (Table t : s.getLocks()) {
+ if (t instanceof GridH2Table)
+ ((GridH2Table)t).checkVersion(s);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class SessionLock {
+ /** Version. */
+ final long ver;
+
+ /** Locked by current thread flag. */
+ boolean locked;
+
+ /**
+ * Constructor for shared lock.
+ *
+ * @param ver Table version.
+ */
+ private SessionLock(long ver) {
+ this.ver = ver;
+ locked = true;
+ }
+
+ /**
+ * @param ver Locked table version.
+ * @return Shared lock instance.
+ */
+ static SessionLock sharedLock(long ver) {
+ return new SessionLock(ver);
+ }
+
+ /**
+ * @return Exclusive lock instance.
+ */
+ static SessionLock exclusiveLock() {
+ return new SessionLock(EXCLUSIVE_LOCK);
+ }
+
+ /**
+ * @return {@code true} if exclusive lock.
+ */
+ boolean isExclusive() {
+ return ver == EXCLUSIVE_LOCK;
+ }
+
+ /**
+ * @return Table version of the first lock.
+ */
+ long version() {
+ return ver;
+ }
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
index ca0c940..a697bf3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.h2.opt;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
-import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -44,9 +43,6 @@ public class QueryContext {
/** */
private final PartitionReservation reservations;
- /** */
- private MapQueryLazyWorker lazyWorker;
-
/**
* Constructor.
*
@@ -110,23 +106,6 @@ public class QueryContext {
return filter;
}
- /**
- * @return Lazy worker, if any, or {@code null} if none.
- */
- public MapQueryLazyWorker lazyWorker() {
- return lazyWorker;
- }
-
- /**
- * @param lazyWorker Lazy worker, if any, or {@code null} if none.
- * @return {@code this}.
- */
- public QueryContext lazyWorker(MapQueryLazyWorker lazyWorker) {
- this.lazyWorker = lazyWorker;
-
- return this;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(QueryContext.class, this);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java
index 8275d6e..34acbbf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java
@@ -58,8 +58,6 @@ public class QueryContextRegistry {
* Drops current thread local context.
*/
public void clearThreadLocal() {
- assert locCtx.get() != null;
-
locCtx.remove();
}
@@ -140,10 +138,7 @@ public class QueryContextRegistry {
if (ctx == null)
return false;
- if (ctx.lazyWorker() != null)
- ctx.lazyWorker().stop(nodeStop);
- else
- ctx.clearContext(nodeStop);
+ ctx.clearContext(nodeStop);
return true;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index e6bb564..e132b1a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -32,14 +31,13 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.QueryRetryException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheQueryExecutedEvent;
@@ -63,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture;
@@ -88,16 +87,15 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
-import org.apache.ignite.thread.IgniteThread;
+import org.h2.api.ErrorCode;
import org.h2.command.Prepared;
import org.h2.jdbc.JdbcResultSet;
+import org.h2.jdbc.JdbcSQLException;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_FORCE_LAZY_RESULT_SET;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
-
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.isDataPageScanEnabled;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
@@ -107,9 +105,6 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2V
@SuppressWarnings("ForLoopReplaceableByForEach")
public class GridMapQueryExecutor {
/** */
- public static final boolean FORCE_LAZY = IgniteSystemProperties.getBoolean(IGNITE_SQL_FORCE_LAZY_RESULT_SET);
-
- /** */
private IgniteLogger log;
/** */
@@ -127,15 +122,6 @@ public class GridMapQueryExecutor {
/** */
private final GridSpinBusyLock busyLock;
- /** Lazy workers. */
- private final ConcurrentHashMap<MapQueryLazyWorkerKey, MapQueryLazyWorker> lazyWorkers = new ConcurrentHashMap<>();
-
- /** Busy lock for lazy workers. */
- private final GridSpinBusyLock lazyWorkerBusyLock = new GridSpinBusyLock();
-
- /** Lazy worker stop guard. */
- private final AtomicBoolean lazyWorkerStopGuard = new AtomicBoolean();
-
/**
* @param busyLock Busy lock.
*/
@@ -191,18 +177,11 @@ public class GridMapQueryExecutor {
}
/**
- * Cancel active lazy queries and prevent submit of new queries.
+ * Stop query map executor, cleanup resources.
*/
- public void cancelLazyWorkers() {
- if (!lazyWorkerStopGuard.compareAndSet(false, true))
- return;
-
- lazyWorkerBusyLock.block();
-
- for (MapQueryLazyWorker worker : lazyWorkers.values())
- worker.stop(false);
-
- lazyWorkers.clear();
+ public void stop() {
+ for (MapNodeResults res : qryRess.values())
+ res.cancelAll();
}
/**
@@ -240,13 +219,6 @@ public class GridMapQueryExecutor {
}
/**
- * @return Busy lock for lazy workers to guard their operations with.
- */
- GridSpinBusyLock busyLock() {
- return busyLock;
- }
-
- /**
* @param node Node.
* @param msg Message.
*/
@@ -288,6 +260,7 @@ public class GridMapQueryExecutor {
/**
* @param node Node.
* @param req Query request.
+ * @throws IgniteCheckedException On error.
*/
private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException {
int[] qryParts = req.queryPartitions();
@@ -296,11 +269,15 @@ public class GridMapQueryExecutor {
final int[] parts = qryParts == null ? partsMap == null ? null : partsMap.get(ctx.localNodeId()) : qryParts;
+ final GridDhtTxLocalAdapter tx;
+
+ GridH2SelectForUpdateTxDetails txReq = req.txDetails();
+
boolean distributedJoins = req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS);
boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
- boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
+ final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY) && txReq == null;
Boolean dataPageScanEnabled = req.isDataPageScanEnabled();
@@ -311,10 +288,6 @@ public class GridMapQueryExecutor {
final Object[] params = req.parameters();
- final GridDhtTxLocalAdapter tx;
-
- GridH2SelectForUpdateTxDetails txReq = req.txDetails();
-
try {
if (txReq != null) {
// Prepare to run queries.
@@ -380,62 +353,36 @@ public class GridMapQueryExecutor {
final int segment = i;
- if (lazy) {
- onQueryRequest0(node,
- req.requestId(),
- segment,
- req.schemaName(),
- req.queries(),
- cacheIds,
- req.topologyVersion(),
- partsMap,
- parts,
- req.pageSize(),
- distributedJoins,
- enforceJoinOrder,
- false, // Replicated is always false here (see condition above).
- req.timeout(),
- params,
- true,
- req.mvccSnapshot(),
- tx,
- txReq,
- lockFut,
- runCntr,
- dataPageScanEnabled);
- }
- else {
- ctx.closure().callLocal(
- new Callable<Void>() {
- @Override public Void call() {
- onQueryRequest0(node,
- req.requestId(),
- segment,
- req.schemaName(),
- req.queries(),
- cacheIds,
- req.topologyVersion(),
- partsMap,
- parts,
- req.pageSize(),
- distributedJoins,
- enforceJoinOrder,
- false,
- req.timeout(),
- params,
- false,
- req.mvccSnapshot(),
- tx,
- txReq,
- lockFut,
- runCntr,
- dataPageScanEnabled);
-
- return null;
- }
+ ctx.closure().callLocal(
+ new Callable<Void>() {
+ @Override public Void call() {
+ onQueryRequest0(node,
+ req.requestId(),
+ segment,
+ req.schemaName(),
+ req.queries(),
+ cacheIds,
+ req.topologyVersion(),
+ partsMap,
+ parts,
+ req.pageSize(),
+ distributedJoins,
+ enforceJoinOrder,
+ false,
+ req.timeout(),
+ params,
+ lazy,
+ req.mvccSnapshot(),
+ tx,
+ txReq,
+ lockFut,
+ runCntr,
+ dataPageScanEnabled);
+
+ return null;
}
- , QUERY_POOL);
- }
+ },
+ QUERY_POOL);
}
onQueryRequest0(node,
@@ -474,6 +421,10 @@ public class GridMapQueryExecutor {
* @param parts Explicit partitions for current node.
* @param pageSize Page size.
* @param distributedJoins Query distributed join mode.
+ * @param enforceJoinOrder Enforce join order H2 flag.
+ * @param replicated Replicated only flag.
+ * @param timeout Query timeout.
+ * @param params Query parameters.
* @param lazy Streaming flag.
* @param mvccSnapshot MVCC snapshot.
* @param tx Transaction.
@@ -504,78 +455,20 @@ public class GridMapQueryExecutor {
@Nullable final GridH2SelectForUpdateTxDetails txDetails,
@Nullable final CompoundLockFuture lockFut,
@Nullable final AtomicInteger runCntr,
- Boolean dataPageScanEnabled
- ) {
- MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
-
+ Boolean dataPageScanEnabled) {
// In presence of TX, we also must always have matching details.
assert tx == null || txDetails != null;
- boolean inTx = (tx != null);
-
- if (lazy && worker == null) {
- // Lazy queries must be re-submitted to dedicated workers.
- MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
- worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this, qryCtxRegistry);
+ assert !lazy || txDetails == null : "Lazy execution of SELECT FOR UPDATE queries is not supported.";
- worker.submit(new Runnable() {
- @Override public void run() {
- onQueryRequest0(
- node,
- reqId,
- segmentId,
- schemaName,
- qrys,
- cacheIds,
- topVer,
- partsMap,
- parts,
- pageSize,
- distributedJoins,
- enforceJoinOrder,
- replicated,
- timeout,
- params,
- true,
- mvccSnapshot,
- tx,
- txDetails,
- lockFut,
- runCntr,
- dataPageScanEnabled);
- }
- });
-
- if (lazyWorkerBusyLock.enterBusy()) {
- try {
- MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker);
-
- if (oldWorker != null)
- oldWorker.stop(false);
-
- IgniteThread thread = new IgniteThread(worker);
-
- thread.start();
- }
- finally {
- lazyWorkerBusyLock.leaveBusy();
- }
- }
- else
- log.info("Ignored query request (node is stopping) [nodeId=" + node.id() + ", reqId=" + reqId + ']');
-
- return;
- }
-
- if (lazy && txDetails != null)
- throw new IgniteSQLException("Lazy execution of SELECT FOR UPDATE queries is not supported.");
+ boolean inTx = (tx != null);
// Prepare to run queries.
GridCacheContext<?, ?> mainCctx = mainCacheContext(cacheIds);
MapNodeResults nodeRess = resultsForNode(node.id());
- MapQueryResults qr = null;
+ MapQueryResults qryResults = null;
PartitionReservation reserved = null;
@@ -593,21 +486,12 @@ public class GridMapQueryExecutor {
);
if (reserved.failed()) {
- // Unregister lazy worker because re-try may never reach this node again.
- if (lazy)
- stopAndUnregisterCurrentLazyWorker();
-
sendRetry(node, reqId, segmentId, reserved.error());
return;
}
}
- qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, MapQueryLazyWorker.currentWorker(), inTx);
-
- if (nodeRess.put(reqId, segmentId, qr) != null)
- throw new IllegalStateException();
-
// Prepare query context.
DistributedJoinContext distributedJoinCtx = null;
@@ -630,47 +514,61 @@ public class GridMapQueryExecutor {
reserved
);
- qctx.lazyWorker(worker);
+ qryResults = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, inTx, lazy, qctx);
- Connection conn = h2.connections().connectionForThread().connection(schemaName);
-
- H2Utils.setupConnection(conn, distributedJoins, enforceJoinOrder);
+ // qctx is set, we have to release reservations inside of it.
+ reserved = null;
qryCtxRegistry.setThreadLocal(qctx);
if (distributedJoinCtx != null)
qryCtxRegistry.setShared(node.id(), reqId, qctx);
- // qctx is set, we have to release reservations inside of it.
- reserved = null;
+ if (nodeRess.put(reqId, segmentId, qryResults) != null)
+ throw new IllegalStateException();
- try {
- if (nodeRess.cancelled(reqId)) {
- qryCtxRegistry.clearShared(node.id(), reqId);
+ if (nodeRess.cancelled(reqId)) {
+ qryCtxRegistry.clearShared(node.id(), reqId);
- nodeRess.cancelRequest(reqId);
+ nodeRess.cancelRequest(reqId);
- throw new QueryCancelledException();
- }
+ throw new QueryCancelledException();
+ }
- // Run queries.
- int qryIdx = 0;
+ // Run queries.
+ int qryIdx = 0;
- boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+ boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
- for (GridCacheSqlQuery qry : qrys) {
- ResultSet rs = null;
+ for (GridCacheSqlQuery qry : qrys) {
+ H2ConnectionWrapper connWrp = h2.connections().connectionForThread();
+
+ H2Utils.setupConnection(
+ connWrp.connection(schemaName),
+ distributedJoins,
+ enforceJoinOrder,
+ lazy
+ );
+
+ MapQueryResult res = new MapQueryResult(h2, mainCctx, node.id(), qry, params, connWrp, log);
+
+ qryResults.addResult(qryIdx, res);
+
+ try {
+ res.lock();
boolean removeMapping = false;
+ ResultSet rs = null;
// If we are not the target node for this replicated query, just ignore it.
if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
- String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params));
+ String sql = qry.query();
+ Collection<Object> params0 = F.asList(qry.parameters(params));
PreparedStatement stmt;
try {
- stmt = h2.connections().prepareStatement(conn, sql);
+ stmt = h2.connections().prepareStatement(connWrp.connection(), sql);
}
catch (SQLException e) {
throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e);
@@ -680,14 +578,21 @@ public class GridMapQueryExecutor {
if (GridSqlQueryParser.isForUpdateQuery(p)) {
sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx);
- stmt = h2.connections().prepareStatement(conn, sql);
+ stmt = h2.connections().prepareStatement(connWrp.connection(), sql);
}
H2Utils.bindParameters(stmt, params0);
int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx);
- rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qr.queryCancel(qryIdx), dataPageScanEnabled);
+ rs = h2.executeSqlQueryWithTimer(
+ stmt,
+ connWrp.connection(),
+ sql,
+ params0,
+ opTimeout,
+ qryResults.queryCancel(qryIdx),
+ dataPageScanEnabled);
if (inTx) {
ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future(
@@ -733,13 +638,10 @@ public class GridMapQueryExecutor {
assert rs instanceof JdbcResultSet : rs.getClass();
}
- qr.addResult(qryIdx, qry, node.id(), rs, params);
-
- if (qr.cancelled()) {
- qr.result(qryIdx).close();
+ res.openResult(rs);
+ if (qryResults.cancelled())
throw new QueryCancelledException();
- }
if (inTx) {
if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) {
@@ -748,12 +650,21 @@ public class GridMapQueryExecutor {
}
}
+ final GridQueryNextPageResponse msg = prepareNextPage(
+ nodeRess,
+ node,
+ qryResults,
+ qryIdx,
+ segmentId,
+ pageSize,
+ removeMapping,
+ dataPageScanEnabled
+ );
+
// Send the first page.
if (lockFut == null)
- sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping, dataPageScanEnabled);
+ sendNextPage(node, msg);
else {
- GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping, dataPageScanEnabled);
-
if (msg != null) {
lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
@Override public void apply(IgniteInternalFuture<Void> future) {
@@ -773,50 +684,68 @@ public class GridMapQueryExecutor {
qryIdx++;
}
+ finally {
+ try {
+ res.unlockTables();
+ }
+ finally {
+ res.unlock();
+ }
+ }
+ } // for map queries
- // All request results are in the memory in result set already, so it's ok to release partitions.
- if (!lazy)
- releaseReservations();
- }
- catch (Throwable e){
- releaseReservations();
-
- throw e;
- }
+ if (!lazy)
+ qryResults.releaseQueryContext();
}
catch (Throwable e) {
- if (qr != null) {
- nodeRess.remove(reqId, segmentId, qr);
+ if (qryResults != null) {
+ nodeRess.remove(reqId, segmentId, qryResults);
- qr.cancel(false);
+ qryResults.close();
}
+ else
+ releaseReservations();
- // Unregister worker after possible cancellation.
- if (lazy)
- stopAndUnregisterCurrentLazyWorker();
+ if (e instanceof QueryCancelledException)
+ sendError(node, reqId, e);
+ else {
+ JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
- GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
+ if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
+ sendQueryCancel(node, reqId);
+ else {
+ GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
- if (retryErr != null) {
- final String retryCause = String.format(
- "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
- "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
- );
+ if (retryErr != null) {
+ final String retryCause = String.format(
+ "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
+ "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
+ );
- sendRetry(node, reqId, segmentId, retryCause);
- }
- else {
- U.error(log, "Failed to execute local query.", e);
+ sendRetry(node, reqId, segmentId, retryCause);
+ }
+ else {
+ QueryRetryException qryRetryErr = X.cause(e, QueryRetryException.class);
- sendError(node, reqId, e);
+ if (qryRetryErr != null)
+ sendError(node, reqId, qryRetryErr);
+ else {
+ U.error(log, "Failed to execute local query.", e);
+
+ sendError(node, reqId, e);
- if (e instanceof Error)
- throw (Error)e;
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
+ }
}
}
finally {
if (reserved != null)
reserved.release();
+
+ qryCtxRegistry.clearThreadLocal();
}
}
@@ -947,6 +876,14 @@ public class GridMapQueryExecutor {
/**
* @param node Node.
* @param qryReqId Query request ID.
+ */
+ private void sendQueryCancel(ClusterNode node, long qryReqId) {
+ sendError(node, qryReqId, new QueryCancelledException());
+ }
+
+ /**
+ * @param node Node.
+ * @param qryReqId Query request ID.
* @param err Error.
*/
private void sendError(ClusterNode node, long qryReqId, Throwable err) {
@@ -1003,40 +940,91 @@ public class GridMapQueryExecutor {
* @param req Request.
*/
private void onNextPageRequest(final ClusterNode node, final GridQueryNextPageRequest req) {
+ long reqId = req.queryRequestId();
+
final MapNodeResults nodeRess = qryRess.get(node.id());
if (nodeRess == null) {
- sendError(node, req.queryRequestId(), new CacheException("No node result found for request: " + req));
+ sendError(node, reqId, new CacheException("No node result found for request: " + req));
return;
}
- else if (nodeRess.cancelled(req.queryRequestId())) {
- sendError(node, req.queryRequestId(), new QueryCancelledException());
+ else if (nodeRess.cancelled(reqId)) {
+ sendQueryCancel(node, reqId);
return;
}
- final MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
+ final MapQueryResults qryResults = nodeRess.get(reqId, req.segmentId());
- if (qr == null)
- sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
- else if (qr.cancelled())
- sendError(node, req.queryRequestId(), new QueryCancelledException());
- else {
- Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags());
+ if (qryResults == null)
+ sendError(node, reqId, new CacheException("No query result found for request: " + req));
+ else if (qryResults.cancelled())
+ sendQueryCancel(node, reqId);
+ else
+ try {
+ QueryContext qctxReduce = qryCtxRegistry.getThreadLocal();
+
+ if (qctxReduce != null)
+ qryCtxRegistry.clearThreadLocal();
+
+ qryCtxRegistry.setThreadLocal(qryResults.queryContext());
+
+ MapQueryResult res = qryResults.result(req.query());
+
+ assert res != null;
+
+ try {
+ // Session isn't set for lazy=false queries.
+ // Also session == null when result already closed.
+ res.lock();
+ res.lockTables();
+ res.checkTablesVersions();
+
+ Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags());
- MapQueryLazyWorker lazyWorker = qr.lazyWorker();
+ GridQueryNextPageResponse msg = prepareNextPage(
+ nodeRess,
+ node,
+ qryResults,
+ req.query(),
+ req.segmentId(),
+ req.pageSize(),
+ false,
+ dataPageScanEnabled);
- if (lazyWorker != null) {
- lazyWorker.submit(new Runnable() {
- @Override public void run() {
- sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false, dataPageScanEnabled);
+ sendNextPage(node, msg);
+ }
+ finally {
+ qryCtxRegistry.clearThreadLocal();
+
+ if (qctxReduce != null)
+ qryCtxRegistry.setThreadLocal(qctxReduce);
+
+ try {
+ res.unlockTables();
+ }
+ finally {
+ res.unlock();
}
- });
+ }
+ }
+ catch (Exception e) {
+ QueryRetryException retryEx = X.cause(e, QueryRetryException.class);
+
+ if (retryEx != null)
+ sendError(node, reqId, retryEx);
+ else {
+ JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
+
+ if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
+ sendQueryCancel(node, reqId);
+ else
+ sendError(node, reqId, e);
+ }
+
+ qryResults.cancel();
}
- else
- sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false, dataPageScanEnabled);
- }
}
/**
@@ -1051,8 +1039,15 @@ public class GridMapQueryExecutor {
* @return Next page.
* @throws IgniteCheckedException If failed.
*/
- private GridQueryNextPageResponse prepareNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
- int pageSize, boolean removeMapping, Boolean dataPageScanEnabled) throws IgniteCheckedException {
+ private GridQueryNextPageResponse prepareNextPage(
+ MapNodeResults nodeRess,
+ ClusterNode node,
+ MapQueryResults qr,
+ int qry,
+ int segmentId,
+ int pageSize,
+ boolean removeMapping,
+ Boolean dataPageScanEnabled) throws IgniteCheckedException {
MapQueryResult res = qr.result(qry);
assert res != null;
@@ -1067,14 +1062,14 @@ public class GridMapQueryExecutor {
boolean last = res.fetchNextPage(rows, pageSize, dataPageScanEnabled);
if (last) {
- res.close();
+ qr.closeResult(qry);
if (qr.isAllClosed()) {
nodeRess.remove(qr.queryRequestId(), segmentId, qr);
- // Release reservations if the last page fetched, all requests are closed and this is a lazy worker.
- if (MapQueryLazyWorker.currentWorker() != null)
- releaseReservations();
+ // Clear context, release reservations
+ if (qr.isLazy())
+ qr.releaseQueryContext();
}
}
@@ -1097,21 +1092,11 @@ public class GridMapQueryExecutor {
}
/**
- * @param nodeRess Results.
* @param node Node.
- * @param qr Query results.
- * @param qry Query.
- * @param segmentId Index segment ID.
- * @param pageSize Page size.
- * @param removeMapping Remove mapping flag.
- * @param dataPageScanEnabled If data page scan is enabled.
+ * @param msg Message to send.
*/
- private void sendNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
- int pageSize, boolean removeMapping, Boolean dataPageScanEnabled) {
+ private void sendNextPage(ClusterNode node, GridQueryNextPageResponse msg) {
try {
- GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, removeMapping,
- dataPageScanEnabled);
-
if (msg != null) {
if (node.isLocal())
h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
@@ -1130,6 +1115,7 @@ public class GridMapQueryExecutor {
* @param node Node.
* @param reqId Request ID.
* @param segmentId Index segment ID.
+ * @param retryCause Description of the retry cause.
*/
private void sendRetry(ClusterNode node, long reqId, int segmentId, String retryCause) {
try {
@@ -1153,34 +1139,4 @@ public class GridMapQueryExecutor {
U.warn(log, "Failed to send retry message: " + e.getMessage());
}
}
-
- /**
- * Unregister lazy worker if needed (i.e. if we are currently in lazy worker thread).
- */
- public void stopAndUnregisterCurrentLazyWorker() {
- MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
-
- if (worker != null) {
- worker.stop(false);
-
- // Just stop is not enough as worker may be registered, but not started due to exception.
- unregisterLazyWorker(worker);
- }
- }
-
- /**
- * Unregister lazy worker.
- *
- * @param worker Worker.
- */
- public void unregisterLazyWorker(MapQueryLazyWorker worker) {
- lazyWorkers.remove(worker.key(), worker);
- }
-
- /**
- * @return Number of registered lazy workers.
- */
- public int registeredLazyWorkers() {
- return lazyWorkers.size();
- }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f139974..b2c5170 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -43,6 +43,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.QueryRetryException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
@@ -65,9 +66,11 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedUpdateRun;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
@@ -164,6 +167,9 @@ public class GridReduceQueryExecutor {
/** Partition mapper. */
private ReducePartitionMapper mapper;
+ /** Default query timeout. */
+ private long dfltQueryTimeout;
+
/**
* Constructor.
*
@@ -182,6 +188,8 @@ public class GridReduceQueryExecutor {
this.ctx = ctx;
this.h2 = h2;
+ dfltQueryTimeout = IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, DFLT_RETRY_TIMEOUT);
+
log = ctx.log(GridReduceQueryExecutor.class);
mapper = new ReducePartitionMapper(ctx, log);
@@ -282,11 +290,20 @@ public class GridReduceQueryExecutor {
*/
private void fail(ReduceQueryRun r, UUID nodeId, String msg, byte failCode) {
if (r != null) {
- CacheException e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
- ", errMsg=" + msg + ']');
+ CacheException e;
- if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR)
- e.addSuppressed(new QueryCancelledException());
+ if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR) {
+ e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
+ ", errMsg=" + msg + ']', new QueryCancelledException());
+ }
+ else if (failCode == GridQueryFailResponse.RETRY_QUERY) {
+ e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
+ ", errMsg=" + msg + ']', new QueryRetryException(msg));
+ }
+ else {
+ e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
+ ", errMsg=" + msg + ']');
+ }
r.setStateOnException(nodeId, e);
}
@@ -521,6 +538,8 @@ public class GridReduceQueryExecutor {
dataPageScanEnabled
);
+ ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachedConn = h2.connections().detachThreadConnection();
+
Collection<ClusterNode> nodes;
// Explicit partition mapping for unstable topology.
@@ -678,7 +697,7 @@ public class GridReduceQueryExecutor {
if (isReplicatedOnly)
flags |= GridH2QueryRequest.FLAG_REPLICATED;
- if (lazy && mapQrys.size() == 1)
+ if (lazy)
flags |= GridH2QueryRequest.FLAG_LAZY;
flags = setDataPageScanEnabled(flags, dataPageScanEnabled);
@@ -807,7 +826,10 @@ public class GridReduceQueryExecutor {
cancel,
dataPageScanEnabled);
- resIter = new H2FieldsIterator(res, mvccTracker, false, null);
+ resIter = new H2FieldsIterator(res, mvccTracker, false, detachedConn);
+
+ // don't recycle at final block
+ detachedConn = null;
mvccTracker = null; // To prevent callback inside finally block;
}
@@ -874,6 +896,9 @@ public class GridReduceQueryExecutor {
throw resEx;
}
finally {
+ if (detachedConn != null)
+ detachedConn.recycle();
+
if (release) {
releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins(), mvccTracker);
@@ -1039,7 +1064,7 @@ public class GridReduceQueryExecutor {
* @return {@code true} if exception is caused by cancel.
*/
private boolean wasCancelled(CacheException e) {
- return X.hasSuppressed(e, QueryCancelledException.class);
+ return X.cause(e, QueryCancelledException.class) != null;
}
/**
@@ -1049,19 +1074,22 @@ public class GridReduceQueryExecutor {
* @param r Query run.
* @param qryReqId Query id.
* @param distributedJoins Distributed join flag.
+ * @param mvccTracker MVCC tracker.
*/
- public void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId,
+ void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId,
boolean distributedJoins, MvccQueryTracker mvccTracker) {
- // For distributedJoins need always send cancel request to cleanup resources.
if (distributedJoins)
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
- else {
- for (ReduceIndex idx : r.indexes()) {
- if (!idx.fetchedAll()) {
+
+ for (ReduceIndex idx : r.indexes()) {
+ if (!idx.fetchedAll()) {
+ if (!distributedJoins) // cancel request has been already sent for distributed join.
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
- break;
- }
+ r.setStateOnException(ctx.localNodeId(),
+ new CacheException("Query is canceled.", new QueryCancelledException()));
+
+ break;
}
}
@@ -1362,14 +1390,13 @@ public class GridReduceQueryExecutor {
* @param qryTimeout Query timeout.
* @return Query retry timeout.
*/
- private static long retryTimeout(long qryTimeout) {
+ private long retryTimeout(long qryTimeout) {
if (qryTimeout > 0)
return qryTimeout;
- return IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, DFLT_RETRY_TIMEOUT);
+ return dfltQueryTimeout;
}
-
/**
* Prepare map query based on original sql.
*
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
index 48116d3..2348a3d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
@@ -17,12 +17,11 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
-import java.util.concurrent.ConcurrentHashMap;
-
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
@@ -89,7 +88,7 @@ class MapNodeResults {
MapQueryResults removed = res.remove(key);
if (removed != null)
- removed.cancel(true);
+ removed.cancel();
}
}
@@ -144,11 +143,10 @@ class MapNodeResults {
*/
public void cancelAll() {
for (MapQueryResults ress : res.values())
- ress.cancel(true);
+ ress.cancel();
// Cancel update requests
for (GridQueryCancel upd: updCancels.values())
upd.cancel();
}
-
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
deleted file mode 100644
index bc1f896..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.LongAdder;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Worker for lazy query execution.
- */
-public class MapQueryLazyWorker extends GridWorker {
- /** Lazy thread flag. */
- private static final ThreadLocal<MapQueryLazyWorker> LAZY_WORKER = new ThreadLocal<>();
-
- /** Active lazy worker count (for testing purposes). */
- private static final LongAdder ACTIVE_CNT = new LongAdder();
-
- /** Task to be executed. */
- private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>();
-
- /** Key. */
- private final MapQueryLazyWorkerKey key;
-
- /** Map query executor. */
- private final GridMapQueryExecutor exec;
-
- /** Query context registry. */
- private final QueryContextRegistry qryCtxRegistry;
-
- /** Latch decremented when worker finishes. */
- private final CountDownLatch stopLatch = new CountDownLatch(1);
-
- /** Map query result. */
- private volatile MapQueryResult res;
-
- /**
- * Constructor.
- *
- * @param instanceName Instance name.
- * @param key Lazy worker key.
- * @param log Logger.
- * @param exec Map query executor.
- * @param qryCtxRegistry Query context registry.
- */
- public MapQueryLazyWorker(@Nullable String instanceName, MapQueryLazyWorkerKey key, IgniteLogger log,
- GridMapQueryExecutor exec, QueryContextRegistry qryCtxRegistry) {
- super(instanceName, workerName(instanceName, key), log);
-
- this.key = key;
- this.exec = exec;
- this.qryCtxRegistry = qryCtxRegistry;
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- LAZY_WORKER.set(this);
-
- ACTIVE_CNT.increment();
-
- try {
- while (!isCancelled()) {
- Runnable task = tasks.take();
-
- if (task != null) {
- if (!exec.busyLock().enterBusy())
- return;
-
- try {
- task.run();
- }
- finally {
- exec.busyLock().leaveBusy();
- }
- }
- }
- }
- finally {
- if (res != null)
- res.close();
-
- LAZY_WORKER.set(null);
-
- ACTIVE_CNT.decrement();
-
- exec.unregisterLazyWorker(this);
- }
- }
-
- /**
- * Submit task to worker.
- *
- * @param task Task to be executed.
- */
- public void submit(Runnable task) {
- tasks.add(task);
- }
-
- /**
- * @return Worker key.
- */
- public MapQueryLazyWorkerKey key() {
- return key;
- }
-
- /**
- * Stop the worker.
- * @param nodeStop Node is stopping.
- */
- public void stop(final boolean nodeStop) {
- if (MapQueryLazyWorker.currentWorker() == null)
- submit(new Runnable() {
- @Override public void run() {
- stop(nodeStop);
- }
- });
- else {
- QueryContext qctx = qryCtxRegistry.getThreadLocal();
-
- if (qctx != null) {
- qctx.clearContext(nodeStop);
-
- qryCtxRegistry.clearThreadLocal();
- }
-
- isCancelled = true;
-
- stopLatch.countDown();
- }
- }
-
- /**
- * Await worker stop.
- */
- public void awaitStop() {
- try {
- U.await(stopLatch);
- }
- catch (IgniteInterruptedCheckedException e) {
- throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e);
- }
- }
-
- /**
- * @param res Map query result.
- */
- public void result(MapQueryResult res) {
- this.res = res;
- }
-
- /**
- * @return Current worker or {@code null} if call is performed not from lazy worker thread.
- */
- @Nullable public static MapQueryLazyWorker currentWorker() {
- return LAZY_WORKER.get();
- }
-
- /**
- * @return Active workers count.
- */
- public static int activeCount() {
- return ACTIVE_CNT.intValue();
- }
-
- /**
- * Construct worker name.
- *
- * @param instanceName Instance name.
- * @param key Key.
- * @return Name.
- */
- private static String workerName(String instanceName, MapQueryLazyWorkerKey key) {
- return "query-lazy-worker_" + instanceName + "_" + key.nodeId() + "_" + key.queryRequestId() + "_" +
- key.segment();
- }
-}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java
deleted file mode 100644
index a0f5ebb..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep;
-
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.util.UUID;
-
-/**
- * Key to identify lazy worker.
- */
-public class MapQueryLazyWorkerKey {
- /** Client node ID. */
- private final UUID nodeId;
-
- /** Query request ID. */
- private final long qryReqId;
-
- /** Segment. */
- private final int segment;
-
- /**
- * Constructor.
- *
- * @param nodeId Node ID.
- * @param qryReqId Query request ID.
- * @param segment Segment.
- */
- public MapQueryLazyWorkerKey(UUID nodeId, long qryReqId, int segment) {
- this.nodeId = nodeId;
- this.qryReqId = qryReqId;
- this.segment = segment;
- }
-
- /**
- * @return Node id.
- */
- public UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @return Query request ID.
- */
- public long queryRequestId() {
- return qryReqId;
- }
-
- /**
- * @return Segment.
- */
- public int segment() {
- return segment;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = nodeId.hashCode();
-
- res = 31 * res + (int)(qryReqId ^ (qryReqId >>> 32));
- res = 31 * res + segment;
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- if (obj != null && obj instanceof MapQueryLazyWorkerKey) {
- MapQueryLazyWorkerKey other = (MapQueryLazyWorkerKey)obj;
-
- return F.eq(qryReqId, other.qryReqId) && F.eq(nodeId, other.nodeId) && F.eq(segment, other.segment);
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MapQueryLazyWorkerKey.class, this);
- }
-}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index c823023..c14cf7f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -22,16 +22,23 @@ import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.engine.Session;
import org.h2.jdbc.JdbcResultSet;
import org.h2.result.LazyResult;
import org.h2.result.ResultInterface;
@@ -65,12 +72,6 @@ class MapQueryResult {
private final IgniteH2Indexing h2;
/** */
- private final ResultInterface res;
-
- /** */
- private final ResultSet rs;
-
- /** */
private final GridCacheContext<?, ?> cctx;
/** */
@@ -80,13 +81,16 @@ class MapQueryResult {
private final UUID qrySrcNodeId;
/** */
- private final int cols;
+ private volatile Result res;
/** */
- private int page;
+ private final IgniteLogger log;
+
+ /** */
+ private final Object[] params;
/** */
- private final int rowCnt;
+ private int page;
/** */
private boolean cpNeeded;
@@ -94,51 +98,40 @@ class MapQueryResult {
/** */
private volatile boolean closed;
- /** */
- private final Object[] params;
+ /** H2 session. */
+ private final Session ses;
- /** Lazy worker. */
- private final MapQueryLazyWorker lazyWorker;
+ /** Detached connection. Used for lazy execution to prevent connection sharing. */
+ private ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachedConn;
+
+ /** */
+ private final ReentrantLock lock = new ReentrantLock();
/**
- * @param rs Result set.
+ * @param h2 H2 indexing.
* @param cctx Cache context.
* @param qrySrcNodeId Query source node.
* @param qry Query.
* @param params Query params.
- * @param lazyWorker Lazy worker.
+ * @param conn H2 connection wrapper.
+ * @param log Logger.
*/
- MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable GridCacheContext cctx,
- UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) {
+ MapQueryResult(IgniteH2Indexing h2, @Nullable GridCacheContext cctx,
+ UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, H2ConnectionWrapper conn, IgniteLogger log) {
this.h2 = h2;
this.cctx = cctx;
this.qry = qry;
this.params = params;
this.qrySrcNodeId = qrySrcNodeId;
this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
- this.lazyWorker = lazyWorker;
-
- if (rs != null) {
- this.rs = rs;
+ this.log = log;
- try {
- res = (ResultInterface)RESULT_FIELD.get(rs);
- }
- catch (IllegalAccessException e) {
- throw new IllegalStateException(e); // Must not happen.
- }
-
- rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount();
- cols = res.getVisibleColumnCount();
- }
- else {
- this.rs = null;
- this.res = null;
- this.cols = -1;
- this.rowCnt = -1;
+ ses = H2Utils.session(conn.connection());
+ }
- closed = true;
- }
+ /** */
+ void openResult(ResultSet rs) {
+ res = new Result(rs);
}
/**
@@ -152,14 +145,18 @@ class MapQueryResult {
* @return Row count.
*/
int rowCount() {
- return rowCnt;
+ assert res != null;
+
+ return res.rowCnt;
}
/**
* @return Column ocunt.
*/
int columnCount() {
- return cols;
+ assert res != null;
+
+ return res.cols;
}
/**
@@ -175,12 +172,14 @@ class MapQueryResult {
* @param dataPageScanEnabled If data page scan is enabled.
* @return {@code true} If there are no more rows available.
*/
- synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize, Boolean dataPageScanEnabled) {
- assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
+ boolean fetchNextPage(List<Value[]> rows, int pageSize, Boolean dataPageScanEnabled) {
+ assert lock.isHeldByCurrentThread();
if (closed)
return true;
+ assert res != null;
+
boolean readEvt = cctx != null && cctx.name() != null && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
page++;
@@ -189,10 +188,10 @@ class MapQueryResult {
try {
for (int i = 0; i < pageSize; i++) {
- if (!res.next())
+ if (!res.res.next())
return true;
- Value[] row = res.currentRow();
+ Value[] row = res.res.currentRow();
if (cpNeeded) {
boolean copied = false;
@@ -241,10 +240,13 @@ class MapQueryResult {
row(row)));
}
- rows.add(res.currentRow());
+ rows.add(res.res.currentRow());
}
- return !res.hasNext();
+ if (detachedConn == null && res.res.hasNext())
+ detachedConn = h2.connections().detachThreadConnection();
+
+ return !res.res.hasNext();
}
finally {
CacheDataTree.setDataPageScanEnabled(false);
@@ -267,31 +269,101 @@ class MapQueryResult {
/**
* Close the result.
*/
- public void close() {
- if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) {
- lazyWorker.submit(new Runnable() {
- @Override public void run() {
- close();
- }
- });
-
- lazyWorker.awaitStop();
+ void close() {
+ assert lock.isHeldByCurrentThread();
+ if (closed)
return;
- }
- synchronized (this) {
- assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
+ closed = true;
+
+ if (res != null)
+ res.close();
+
+ if (detachedConn != null)
+ detachedConn.recycle();
+
+ detachedConn = null;
+ }
+
+ /** */
+ public void lock() {
+ if (!lock.isHeldByCurrentThread())
+ lock.lock();
+ }
- if (closed)
- return;
+ /** */
+ public void lockTables() {
+ if (ses.isLazyQueryExecution() && !closed)
+ GridH2Table.readLockTables(ses);
+ }
- closed = true;
+ /** */
+ public void unlock() {
+ if (lock.isHeldByCurrentThread())
+ lock.unlock();
+ }
- U.closeQuiet(rs);
+ /** */
+ public void unlockTables() {
+ if (ses.isLazyQueryExecution())
+ GridH2Table.unlockTables(ses);
+ }
+
+ /**
+ *
+ */
+ public void checkTablesVersions() {
+ if (ses.isLazyQueryExecution())
+ GridH2Table.checkTablesVersions(ses);
+ }
+
+ /** */
+ private class Result {
+ /** */
+ private final ResultInterface res;
+
+ /** */
+ private final ResultSet rs;
+
+ /** */
+ private final int cols;
+
+ /** */
+ private final int rowCnt;
+
+ /**
+ * Constructor.
+ *
+ * @param rs H2 result set.
+ */
+ Result(ResultSet rs) {
+ if (rs != null) {
+ this.rs = rs;
+
+ try {
+ res = (ResultInterface)RESULT_FIELD.get(rs);
+ }
+ catch (IllegalAccessException e) {
+ throw new IllegalStateException(e); // Must not happen.
+ }
+
+ rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount();
+ cols = res.getVisibleColumnCount();
+ }
+ else {
+ this.rs = null;
+ this.res = null;
+ this.cols = -1;
+ this.rowCnt = -1;
+
+ closed = true;
+ }
+ }
- if (lazyWorker != null)
- lazyWorker.stop(false);
+ /** */
+ void close() {
+ U.close(rs, log);
}
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index acb4f4e..d61e09a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -17,20 +17,19 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import java.sql.ResultSet;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
import org.jetbrains.annotations.Nullable;
/**
* Mapper query results.
*/
class MapQueryResults {
- /** H@ indexing. */
+ /** H2 indexing. */
private final IgniteH2Indexing h2;
/** */
@@ -45,8 +44,8 @@ class MapQueryResults {
/** */
private final GridCacheContext<?, ?> cctx;
- /** Lazy worker. */
- private final MapQueryLazyWorker lazyWorker;
+ /** Lazy mode. */
+ private final boolean lazy;
/** */
private volatile boolean cancelled;
@@ -54,22 +53,28 @@ class MapQueryResults {
/** {@code SELECT FOR UPDATE} flag. */
private final boolean forUpdate;
+ /** Query context. */
+ private final QueryContext qctx;
+
/**
* Constructor.
+ *
* @param h2 Indexing instance.
* @param qryReqId Query request ID.
* @param qrys Number of queries.
* @param cctx Cache context.
- * @param lazyWorker Lazy worker (if any).
* @param forUpdate {@code SELECT FOR UPDATE} flag.
+ * @param lazy Lazy flag.
+ * @param qctx Query context.
*/
MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, @Nullable GridCacheContext<?, ?> cctx,
- @Nullable MapQueryLazyWorker lazyWorker, boolean forUpdate) {
+ boolean forUpdate, boolean lazy, QueryContext qctx) {
this.forUpdate = forUpdate;
this.h2 = h2;
this.qryReqId = qryReqId;
this.cctx = cctx;
- this.lazyWorker = lazyWorker;
+ this.lazy = lazy;
+ this.qctx = qctx;
results = new AtomicReferenceArray<>(qrys);
cancels = new GridQueryCancel[qrys];
@@ -97,27 +102,12 @@ class MapQueryResults {
}
/**
- * @return Lazy worker.
- */
- MapQueryLazyWorker lazyWorker() {
- return lazyWorker;
- }
-
- /**
* Add result.
- * @param qry Query result index.
- * @param q Query object.
- * @param qrySrcNodeId Query source node.
- * @param rs Result set.
- * @param params Query arguments.
+ * @param qryIdx Query result index.
+ * @param res Result.
*/
- void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
- MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params, lazyWorker);
-
- if (lazyWorker != null)
- lazyWorker.result(res);
-
- if (!results.compareAndSet(qry, null, res))
+ void addResult(int qryIdx, MapQueryResult res) {
+ if (!results.compareAndSet(qryIdx, null, res))
throw new IllegalStateException();
}
@@ -138,29 +128,66 @@ class MapQueryResults {
/**
* Cancels the query.
*/
- void cancel(boolean forceQryCancel) {
- if (cancelled)
- return;
-
- cancelled = true;
-
- for (int i = 0; i < results.length(); i++) {
- MapQueryResult res = results.get(i);
-
- if (res != null) {
- res.close();
+ void cancel() {
+ synchronized (this) {
+ if (cancelled)
+ return;
- continue;
- }
+ cancelled = true;
- // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable).
- if (forceQryCancel) {
+ for (int i = 0; i < results.length(); i++) {
GridQueryCancel cancel = cancels[i];
if (cancel != null)
cancel.cancel();
}
}
+
+ // The closing result set is synchronized by themselves.
+ // Include to synchronize block may be cause deadlock on <this> and MapQueryResult#lock.
+ close();
+ }
+
+ /**
+ * Wrap MapQueryResult#close to synchronize close vs cancel.
+ * We have do it because connection returns to pool after close ResultSet but the whole MapQuery
+ * (that may contains several queries) may be canceled later.
+ *
+ * @param idx Map query (result) index.
+ */
+ void closeResult(int idx) {
+ MapQueryResult res = results.get(idx);
+
+ if (res != null && !res.closed()) {
+ try {
+ // Session isn't set for lazy=false queries.
+ // Also session == null when result already closed.
+ res.lock();
+ res.lockTables();
+
+ synchronized (this) {
+ res.close();
+
+ // The statement of the closed result must not be canceled
+ // because statement & connection may be reused.
+ cancels[idx] = null;
+ }
+ }
+ finally {
+ res.unlock();
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public void close() {
+ for (int i = 0; i < results.length(); i++)
+ closeResult(i);
+
+ if (lazy)
+ releaseQueryContext();
}
/**
@@ -183,4 +210,28 @@ class MapQueryResults {
public boolean isForUpdate() {
return forUpdate;
}
+
+ /**
+ * @return Query context.
+ */
+ public QueryContext queryContext() {
+ return qctx;
+ }
+
+ /**
+ * Release query context.
+ */
+ public void releaseQueryContext() {
+ h2.queryContextRegistry().clearThreadLocal();
+
+ if (qctx.distributedJoinContext() == null)
+ qctx.clearContext(false);
+ }
+
+ /**
+ * @return Lazy flag.
+ */
+ public boolean isLazy() {
+ return lazy;
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
index a3d3167..8af5b65 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
@@ -46,21 +46,12 @@ public class PartitionReservation {
}
/**
- * Constructor for failed reservation.
- *
- * @param err Error message.
- */
- public PartitionReservation(String err) {
- this(null, err);
- }
-
- /**
* Base constructor.
*
* @param reserved Reserved partitions.
* @param err Error message.
*/
- private PartitionReservation(@Nullable List<GridReservable> reserved, @Nullable String err) {
+ public PartitionReservation(@Nullable List<GridReservable> reserved, @Nullable String err) {
this.reserved = reserved;
this.err = err;
}
@@ -86,7 +77,9 @@ public class PartitionReservation {
if (!releaseGuard.compareAndSet(false, true))
return;
- for (int i = 0; i < reserved.size(); i++)
- reserved.get(i).release();
+ if (reserved != null) {
+ for (int i = 0; i < reserved.size(); i++)
+ reserved.get(i).release();
+ }
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
index b720b57..dfaef24 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
@@ -109,7 +109,8 @@ public class PartitionReservationManager {
// Cache was not found, probably was not deployed yet.
if (cctx == null) {
- return new PartitionReservation(String.format("Failed to reserve partitions for query (cache is not " +
+ return new PartitionReservation(reserved,
+ String.format("Failed to reserve partitions for query (cache is not " +
"found on local node) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]",
ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i)));
}
@@ -125,7 +126,8 @@ public class PartitionReservationManager {
if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits.
if (r != REPLICATED_RESERVABLE) {
if (!r.reserve())
- return new PartitionReservation(String.format("Failed to reserve partitions for query (group " +
+ return new PartitionReservation(reserved,
+ String.format("Failed to reserve partitions for query (group " +
"reservation failed) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
"cacheName=%s]",ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i), cctx.name()));
@@ -144,7 +146,8 @@ public class PartitionReservationManager {
GridDhtPartitionState partState = part != null ? part.state() : null;
if (partState != OWNING)
- return new PartitionReservation(String.format("Failed to reserve partitions for " +
+ return new PartitionReservation(reserved,
+ String.format("Failed to reserve partitions for " +
"query (partition of REPLICATED cache is not in OWNING state) [" +
"localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
"cacheName=%s, part=%s, partFound=%s, partState=%s]",
@@ -179,7 +182,8 @@ public class PartitionReservationManager {
if (partState == LOST)
ignoreLostPartitionIfPossible(cctx, part);
else {
- return new PartitionReservation(String.format("Failed to reserve partitions " +
+ return new PartitionReservation(reserved,
+ String.format("Failed to reserve partitions " +
"for query (partition of PARTITIONED cache is not found or not in OWNING " +
"state) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
"cacheName=%s, part=%s, partFound=%s, partState=%s]",
@@ -197,7 +201,8 @@ public class PartitionReservationManager {
}
if (!part.reserve()) {
- return new PartitionReservation(String.format("Failed to reserve partitions for query " +
+ return new PartitionReservation(reserved,
+ String.format("Failed to reserve partitions for query " +
"(partition of PARTITIONED cache cannot be reserved) [" +
"localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
"cacheName=%s, part=%s, partFound=%s, partState=%s]",
@@ -224,7 +229,8 @@ public class PartitionReservationManager {
if (partState == LOST)
ignoreLostPartitionIfPossible(cctx, part);
else {
- return new PartitionReservation(String.format("Failed to reserve partitions for " +
+ return new PartitionReservation(reserved,
+ String.format("Failed to reserve partitions for " +
"query (partition of PARTITIONED cache is not in OWNING state after " +
"reservation) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, " +
"cacheId=%s, cacheName=%s, part=%s, partState=%s]",
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
index 4437dd6..1a25c50 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
@@ -17,6 +17,15 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.RandomAccess;
+import java.util.UUID;
+import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
@@ -26,15 +35,6 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.value.Value;
-import javax.cache.CacheException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.fillArray;
/**
@@ -73,9 +73,11 @@ public class ReduceResultPage {
Collection<?> plainRows = res.plainRows();
if (plainRows != null) {
+ assert plainRows instanceof RandomAccess : "instance of " + plainRows.getClass();
+
rowsInPage = plainRows.size();
- if (rowsInPage == 0 || ((ArrayList<Value[]>)plainRows).get(0).length == res.columns())
+ if (rowsInPage == 0 || ((List<Value[]>)plainRows).get(0).length == res.columns())
rows = (Iterator<Value[]>)plainRows.iterator();
else {
// If it's a result of SELECT FOR UPDATE (we can tell by difference in number
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/FunctionalQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/client/FunctionalQueryTest.java
index d10ed1a..42b2ccf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/FunctionalQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/FunctionalQueryTest.java
@@ -109,23 +109,37 @@ public class FunctionalQueryTest {
}
}
- // Fields query
- SqlFieldsQuery qry = new SqlFieldsQuery("select id, name from Person where id >= ?")
- .setArgs(minId)
- .setPageSize(pageSize);
+ checkSqlFieldsQuery(cache, minId, pageSize, expSize, exp, true);
+ checkSqlFieldsQuery(cache, minId, pageSize, expSize, exp, false);
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param minId Minimal ID.
+ * @param pageSize Page size.
+ * @param expSize The size of the expected results.
+ * @param exp Expected results.
+ * @param lazy Lazy mode flag.
+ */
+ private void checkSqlFieldsQuery(ClientCache<Integer, Person> cache, int minId, int pageSize, int expSize,
+ Map<Integer, Person> exp, boolean lazy) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("select id, name from Person where id >= ?")
+ .setArgs(minId)
+ .setPageSize(pageSize)
+ .setLazy(lazy);
- try (QueryCursor<List<?>> cur = cache.query(qry)) {
- List<List<?>> res = cur.getAll();
+ try (QueryCursor<List<?>> cur = cache.query(qry)) {
+ List<List<?>> res = cur.getAll();
- assertEquals(expSize, res.size());
+ assertEquals(expSize, res.size());
- Map<Integer, Person> act = res.stream().collect(Collectors.toMap(
- r -> Integer.parseInt(r.get(0).toString()),
- r -> new Person(Integer.parseInt(r.get(0).toString()), r.get(1).toString())
- ));
+ Map<Integer, Person> act = res.stream().collect(Collectors.toMap(
+ r -> Integer.parseInt(r.get(0).toString()),
+ r -> new Person(Integer.parseInt(r.get(0).toString()), r.get(1).toString())
+ ));
- assertEquals(exp, act);
- }
+ assertEquals(exp, act);
}
}
@@ -137,7 +151,7 @@ public class FunctionalQueryTest {
*/
@Test
public void testSql() throws Exception {
- try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
+ try (Ignite ignored = Ignition.start(Config.getServerConfiguration()); Ignite ignored2 = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER))
) {
client.query(
@@ -147,19 +161,32 @@ public class FunctionalQueryTest {
)).setSchema("PUBLIC")
).getAll();
- int key = 1;
- Person val = new Person(key, "Person 1");
+ final int KEY_COUNT = 10;
+
+ for (int i = 0; i < KEY_COUNT; ++i) {
+ int key = i;
+ Person val = new Person(key, "Person " + i);
- client.query(new SqlFieldsQuery(
- "INSERT INTO Person(id, name) VALUES(?, ?)"
- ).setArgs(val.getId(), val.getName()).setSchema("PUBLIC"))
- .getAll();
+ client.query(new SqlFieldsQuery(
+ "INSERT INTO Person(id, name) VALUES(?, ?)"
+ ).setArgs(val.getId(), val.getName()).setSchema("PUBLIC"))
+ .getAll();
+ }
Object cachedName = client.query(
- new SqlFieldsQuery("SELECT name from Person WHERE id=?").setArgs(key).setSchema("PUBLIC")
+ new SqlFieldsQuery("SELECT name from Person WHERE id=?").setArgs(1).setSchema("PUBLIC")
).getAll().iterator().next().iterator().next();
- assertEquals(val.getName(), cachedName);
+ assertEquals("Person 1", cachedName);
+
+ List<List<?>> rows = client.query(
+ new SqlFieldsQuery("SELECT * from Person WHERE id >= ?")
+ .setSchema("PUBLIC")
+ .setArgs(0)
+ .setPageSize(1)
+ ).getAll();
+
+ assertEquals(KEY_COUNT, rows.size());
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
index 51e8297..8e83b57 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
@@ -86,7 +86,6 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT
int partsFilled = fillAllPartitions(cache, aff);
SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person")
- .setLazy(true)
.setPageSize(1);
FieldsQueryCursor<List<?>> qryCursor = cache.query(qry);
@@ -134,7 +133,6 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT
int partsFilled = fillAllPartitions(cache, aff);
SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person")
- .setLazy(true)
.setPageSize(1);
FieldsQueryCursor<List<?>> qryCursor = cache.query(qry);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
index c1034f9..f762416 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.processors.GridProcessor;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -95,98 +96,97 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
/** */
@Test
public void testRemoteQueryExecutionTimeout() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true, true);
}
/** */
@Test
public void testRemoteQueryWithMergeTableTimeout() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, true);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, true, false);
}
/** */
@Test
public void testRemoteQueryExecutionCancel0() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false, true);
}
/** */
@Test
public void testRemoteQueryExecutionCancel1() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, false, true);
}
/** */
@Test
public void testRemoteQueryExecutionCancel2() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false, true);
}
/** */
@Test
public void testRemoteQueryExecutionCancel3() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false, true);
}
/** */
@Test
public void testRemoteQueryWithMergeTableCancel0() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false, false);
}
/** */
@Test
public void testRemoteQueryWithMergeTableCancel1() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false, false);
}
/** */
@Test
public void testRemoteQueryWithMergeTableCancel2() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false, false);
}
/** */
@Test
public void testRemoteQueryWithMergeTableCancel3() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false, false);
}
/** */
@Test
public void testRemoteQueryWithoutMergeTableCancel0() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false, false);
}
/** */
@Test
public void testRemoteQueryWithoutMergeTableCancel1() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false, false);
}
/** */
@Test
public void testRemoteQueryWithoutMergeTableCancel2() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false, false);
}
/** */
@Test
public void testRemoteQueryWithoutMergeTableCancel3() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false, false);
}
/** */
@Test
public void testRemoteQueryAlreadyFinishedStop() throws Exception {
- testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false);
+ testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false, false);
}
/** */
private void testQueryCancel(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit,
- boolean timeout) throws Exception {
+ boolean timeout, boolean checkCanceled) throws Exception {
try (Ignite client = startGrid("client")) {
-
IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
assertEquals(0, cache.localSize());
@@ -213,7 +213,8 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
qry.setTimeout(timeoutUnits, timeUnit);
cursor = cache.query(qry);
- } else {
+ }
+ else {
cursor = cache.query(qry);
client.scheduler().runLocal(new Runnable() {
@@ -223,13 +224,16 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
}, timeoutUnits, timeUnit);
}
- try(QueryCursor<List<?>> ignored = cursor) {
- cursor.iterator();
+ try (QueryCursor<List<?>> ignored = cursor) {
+ cursor.getAll();
+
+ if (checkCanceled)
+ fail("Query not canceled");
}
catch (CacheException ex) {
log().error("Got expected exception", ex);
- assertTrue("Must throw correct exception", ex.getCause() instanceof QueryCancelledException);
+ assertNotNull("Must throw correct exception", X.cause(ex, QueryCancelledException.class));
}
// Give some time to clean up.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
index b3a1a53..f1652c3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
@@ -64,6 +64,11 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
"where pr.companyId = co._key\n" +
"order by co._key, pr._key ";
+ protected static final String QRY_LONG = "select pe.id, co.id, pr._key\n" +
+ "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" +
+ "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
+ "order by pe.id desc";
+
/** */
protected static final int GRID_CNT = 2;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
index b33fe24..406eaa5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
+import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
/**
@@ -104,11 +105,11 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
assertEquals(broadcastQry, plan.contains("batched:broadcast"));
- final List<List<?>> pRes = grid(0).cache("pu").query(qry0).getAll();
+ final List<List<?>> goldenRes = grid(0).cache("pu").query(qry0).getAll();
Thread.sleep(3000);
- assertEquals(pRes, grid(0).cache("pu").query(qry0).getAll());
+ assertEquals(goldenRes, grid(0).cache("pu").query(qry0).getAll());
final SqlFieldsQuery qry1;
@@ -125,7 +126,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
final List<List<?>> rRes = grid(0).cache("co").query(qry1).getAll();
- assertFalse(pRes.isEmpty());
+ assertFalse(goldenRes.isEmpty());
assertFalse(rRes.isEmpty());
final AtomicInteger qryCnt = new AtomicInteger();
@@ -164,9 +165,12 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
qry.setPageSize(smallPageSize ? 30 : 1000);
try {
- assertEquals(pRes, cache.query(qry).getAll());
+ assertEquals(goldenRes, cache.query(qry).getAll());
}
catch (CacheException e) {
+ if (!smallPageSize)
+ log.error("Unexpected exception at the test", e);
+
assertTrue("On large page size must retry.", smallPageSize);
boolean failedOnRemoteFetch = false;
@@ -266,7 +270,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
}
}, restartThreadsNum, "restart-thread");
- Thread.sleep(duration);
+ GridTestUtils.waitForCondition(() -> fail.get(), duration);
info("Stopping...");
@@ -281,4 +285,4 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
info("Stopped.");
}
-}
+}
\ No newline at end of file
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
index 4de0497..0f24196 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.GridProcessor;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Test;
@@ -42,54 +43,54 @@ public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extend
/** */
@Test
public void testCancel1() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(grid(0), "pe", QRY_LONG, 1, TimeUnit.MILLISECONDS, false, true);
}
/** */
@Test
public void testCancel2() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(grid(0), "pe", QRY_LONG, 50, TimeUnit.MILLISECONDS, false, true);
}
/** */
@Test
public void testCancel3() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(grid(0), "pe", QRY_LONG, 100, TimeUnit.MILLISECONDS, false, false);
}
/** */
@Test
public void testCancel4() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, false);
+ testQueryCancel(grid(0), "pe", QRY_LONG, 500, TimeUnit.MILLISECONDS, false, false);
}
/** */
@Test
public void testTimeout1() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, true);
+ testQueryCancel(grid(0), "pe", QRY_LONG, 1, TimeUnit.MILLISECONDS, true, true);
}
/** */
@Test
public void testTimeout2() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, true);
+ testQueryCancel(grid(0), "pe", QRY_LONG, 50, TimeUnit.MILLISECONDS, true, true);
}
/** */
@Test
public void testTimeout3() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, true);
+ testQueryCancel(grid(0), "pe", QRY_LONG, 100, TimeUnit.MILLISECONDS, true, false);
}
/** */
@Test
public void testTimeout4() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, true);
+ testQueryCancel(grid(0), "pe", QRY_LONG, 500, TimeUnit.MILLISECONDS, true, false);
}
/** */
private void testQueryCancel(Ignite ignite, String cacheName, String sql, int timeoutUnits, TimeUnit timeUnit,
- boolean timeout) throws Exception {
+ boolean timeout, boolean checkCanceled) throws Exception {
SqlFieldsQuery qry = new SqlFieldsQuery(sql).setDistributedJoins(true);
IgniteCache<Object, Object> cache = ignite.cache(cacheName);
@@ -110,12 +111,15 @@ public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extend
}
try (QueryCursor<List<?>> ignored = cursor) {
- cursor.iterator();
+ cursor.getAll();
+
+ if (checkCanceled)
+ fail("Query not canceled");
}
catch (CacheException ex) {
log().error("Got expected exception", ex);
- assertTrue("Must throw correct exception", ex.getCause() instanceof QueryCancelledException);
+ assertNotNull("Must throw correct exception", X.cause(ex, QueryCancelledException.class));
}
// Give some time to clean up.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
index 5b3ba97..691cf5c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.StampedLock;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -39,6 +40,7 @@ import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.QueryRetryException;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -57,6 +59,7 @@ import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDi
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.junit.Test;
@@ -639,6 +642,8 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
*/
@Test
public void testQueryConsistencyMultithreaded() throws Exception {
+ final int KEY_COUNT = 5000;
+
// Start complex topology.
ignitionStart(serverConfiguration(1));
ignitionStart(serverConfiguration(2));
@@ -650,7 +655,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
run(cli, createSql);
- put(cli, 0, 5000);
+ put(cli, 0, KEY_COUNT);
final AtomicBoolean stopped = new AtomicBoolean();
@@ -698,17 +703,24 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
IgniteInternalFuture qryFut = multithreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
while (!stopped.get()) {
- Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
+ try {
+ Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
- IgniteCache<BinaryObject, BinaryObject> cache = node.cache(CACHE_NAME).withKeepBinary();
+ IgniteCache<BinaryObject, BinaryObject> cache = node.cache(CACHE_NAME).withKeepBinary();
- String valTypeName = ((IgniteEx)node).context().query().types(CACHE_NAME)
- .iterator().next().valueTypeName();
+ String valTypeName = ((IgniteEx)node).context().query().types(CACHE_NAME)
+ .iterator().next().valueTypeName();
- List<Cache.Entry<BinaryObject, BinaryObject>> res = cache.query(
- new SqlQuery<BinaryObject, BinaryObject>(valTypeName, "from " + TBL_NAME)).getAll();
+ List<Cache.Entry<BinaryObject, BinaryObject>> res = cache.query(
+ new SqlQuery<BinaryObject, BinaryObject>(valTypeName, "from " + TBL_NAME)).getAll();
- assertEquals(5000, res.size());
+ assertEquals(KEY_COUNT, res.size());
+ }
+ catch (Exception e) {
+ // Swallow retry exceptions.
+ if (X.cause(e, QueryRetryException.class) == null)
+ throw e;
+ }
}
return null;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java
index 1df9e90..4356383 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.QueryRetryException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cluster.ClusterNode;
@@ -45,6 +46,7 @@ import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgnitePredicate;
/**
@@ -393,28 +395,35 @@ public abstract class DynamicIndexAbstractSelfTest extends AbstractSchemaSelfTes
* @param expSize Expected size.
*/
protected static void assertSqlSimpleData(Ignite node, String sql, int expSize) {
- SqlQuery qry = new SqlQuery(typeName(ValueClass.class), sql).setArgs(SQL_ARG_1);
+ try {
+ SqlQuery qry = new SqlQuery(typeName(ValueClass.class), sql).setArgs(SQL_ARG_1);
- List<Cache.Entry<BinaryObject, BinaryObject>> res = node.cache(CACHE_NAME).withKeepBinary().query(qry).getAll();
+ List<Cache.Entry<BinaryObject, BinaryObject>> res = node.cache(CACHE_NAME).withKeepBinary().query(qry).getAll();
- Set<Long> ids = new HashSet<>();
+ Set<Long> ids = new HashSet<>();
- for (Cache.Entry<BinaryObject, BinaryObject> entry : res) {
- long id = entry.getKey().field(FIELD_KEY);
+ for (Cache.Entry<BinaryObject, BinaryObject> entry : res) {
+ long id = entry.getKey().field(FIELD_KEY);
- long field1 = entry.getValue().field(FIELD_NAME_1_ESCAPED);
- long field2 = entry.getValue().field(FIELD_NAME_2_ESCAPED);
+ long field1 = entry.getValue().field(FIELD_NAME_1_ESCAPED);
+ long field2 = entry.getValue().field(FIELD_NAME_2_ESCAPED);
- assertTrue(field1 >= SQL_ARG_1);
+ assertTrue(field1 >= SQL_ARG_1);
- assertEquals(id, field1);
- assertEquals(id, field2);
+ assertEquals(id, field1);
+ assertEquals(id, field2);
- assertTrue(ids.add(id));
- }
+ assertTrue(ids.add(id));
+ }
- assertEquals("Size mismatch [node=" + node.name() + ", exp=" + expSize + ", actual=" + res.size() +
- ", ids=" + ids + ']', expSize, res.size());
+ assertEquals("Size mismatch [node=" + node.name() + ", exp=" + expSize + ", actual=" + res.size() +
+ ", ids=" + ids + ']', expSize, res.size());
+ }
+ catch (Exception e) {
+ // Swallow QueryRetryException.
+ if (X.cause(e, QueryRetryException.class) == null)
+ throw e;
+ }
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
index d7f0470..f7c3171 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
@@ -191,7 +191,7 @@ public class H2ConnectionLeaksSelfTest extends AbstractIndexingCommonTest {
Map<Thread, ?> conns = perThreadConnections(i);
for(Thread t : conns.keySet())
- log.error("+++ Connection is not closed for thread: " + t.getName());
+ log.error("Connection is not closed for thread: " + t.getName());
}
fail("H2 JDBC connections leak detected. See the log above.");
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/local/IgniteCacheLocalQueryCancelOrTimeoutSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/local/IgniteCacheLocalQueryCancelOrTimeoutSelfTest.java
index 5c5c0eb..4631087 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/local/IgniteCacheLocalQueryCancelOrTimeoutSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/local/IgniteCacheLocalQueryCancelOrTimeoutSelfTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -147,7 +148,7 @@ public class IgniteCacheLocalQueryCancelOrTimeoutSelfTest extends GridCommonAbst
fail("Expecting timeout");
}
catch (Exception e) {
- assertTrue("Must throw correct exception", e.getCause() instanceof QueryCancelledException);
+ assertNotNull("Must throw correct exception", X.cause(e, QueryCancelledException.class));
}
// Test must exit gracefully.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/AbstractQueryTableLockAndConnectionPoolSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/AbstractQueryTableLockAndConnectionPoolSelfTest.java
new file mode 100644
index 0000000..36bdae3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/AbstractQueryTableLockAndConnectionPoolSelfTest.java
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryRetryException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests for query execution check cases for correct table lock/unlock.
+ */
+public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends AbstractIndexingCommonTest {
+ /** Keys count. */
+ private static final int KEY_CNT = 500;
+
+ /** Base query argument. */
+ private static final int BASE_QRY_ARG = 50;
+
+ /** Size for small pages. */
+ private static final int PAGE_SIZE_SMALL = 12;
+
+ /** Test duration. */
+ private static final long TEST_DUR = 10_000L;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Test local query execution.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSingleNode() throws Exception {
+ checkSingleNode(1);
+ }
+
+ /**
+ * Test local query execution.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSingleNodeWithParallelism() throws Exception {
+ checkSingleNode(4);
+ }
+
+ /**
+ * Test query execution with multiple topology nodes.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMultipleNodes() throws Exception {
+ checkMultipleNodes(1);
+ }
+
+ /**
+ * Test query execution with multiple topology nodes with query parallelism.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMultipleNodesWithParallelism() throws Exception {
+ checkMultipleNodes(4);
+ }
+
+ /**
+ * Test DDL operation on table with high load queries.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSingleNodeTablesLockQueryAndDDLMultithreaded() throws Exception {
+ final Ignite srv = startGrid(0);
+
+ populateBaseQueryData(srv, 1);
+
+ checkTablesLockQueryAndDDLMultithreaded(srv);
+
+ checkTablesLockQueryAndDropColumnMultithreaded(srv);
+ }
+
+ /**
+ * Test DDL operation on table with high load queries.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSingleNodeWithParallelismTablesLockQueryAndDDLMultithreaded() throws Exception {
+ final Ignite srv = startGrid(0);
+
+ populateBaseQueryData(srv, 4);
+
+ checkTablesLockQueryAndDDLMultithreaded(srv);
+
+ checkTablesLockQueryAndDropColumnMultithreaded(srv);
+ }
+
+ /**
+ * Test DDL operation on table with high load queries.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMultipleNodesWithTablesLockQueryAndDDLMultithreaded() throws Exception {
+ Ignite srv0 = startGrid(0);
+ Ignite srv1 = startGrid(1);
+ startGrid(2);
+
+ Ignite cli;
+
+ try {
+ Ignition.setClientMode(true);
+
+ cli = startGrid(3);
+ }
+ finally {
+ Ignition.setClientMode(false);
+ }
+
+ populateBaseQueryData(srv0, 1);
+
+ checkTablesLockQueryAndDDLMultithreaded(srv0);
+ checkTablesLockQueryAndDDLMultithreaded(srv1);
+ checkTablesLockQueryAndDDLMultithreaded(cli);
+
+ checkTablesLockQueryAndDropColumnMultithreaded(srv0);
+ checkTablesLockQueryAndDropColumnMultithreaded(srv1);
+ // TODO: +++ DDL DROP COLUMN CacheContext == null on CLI
+ // checkTablesLockQueryAndDropColumnMultithreaded(cli);
+ }
+
+ /**
+ * Test DDL operation on table with high load queries.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMultipleNodesWithParallelismTablesLockQueryAndDDLMultithreaded() throws Exception {
+ Ignite srv0 = startGrid(0);
+ Ignite srv1 = startGrid(1);
+ startGrid(2);
+
+ Ignite cli;
+
+ try {
+ Ignition.setClientMode(true);
+
+ cli = startGrid(3);
+ }
+ finally {
+ Ignition.setClientMode(false);
+ }
+
+ populateBaseQueryData(srv0, 4);
+
+ checkTablesLockQueryAndDDLMultithreaded(srv0);
+ checkTablesLockQueryAndDDLMultithreaded(srv1);
+ checkTablesLockQueryAndDDLMultithreaded(cli);
+
+ checkTablesLockQueryAndDropColumnMultithreaded(srv0);
+ checkTablesLockQueryAndDropColumnMultithreaded(srv1);
+ // TODO: +++ DDL DROP COLUMN CacheContext == null on CLI
+ // checkTablesLockQueryAndDropColumnMultithreaded(cli);
+ }
+
+ /**
+ * Test release reserved partition after query complete (results is bigger than one page).
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReleasePartitionReservationSeveralPagesResults() throws Exception {
+ checkReleasePartitionReservation(PAGE_SIZE_SMALL);
+ }
+
+ /**
+ * Test release reserved partition after query complete (results is placed on one page).
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReleasePartitionReservationOnePageResults() throws Exception {
+ checkReleasePartitionReservation(KEY_CNT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFetchFromRemovedTable() throws Exception {
+ Ignite srv = startGrid(0);
+
+ execute(srv, "CREATE TABLE TEST (id int primary key, val int)");
+
+ for (int i = 0; i < 10; ++i)
+ execute(srv, "INSERT INTO TEST VALUES (" + i + ", " + i + ")");
+
+ FieldsQueryCursor<List<?>> cur = execute(srv, new SqlFieldsQuery("SELECT * from TEST").setPageSize(1));
+
+ Iterator<List<?>> it = cur.iterator();
+
+ it.next();
+
+ execute(srv, "DROP TABLE TEST");
+
+ try {
+ while (it.hasNext())
+ it.next();
+
+ if (lazy())
+ fail("Retry exception must be thrown");
+ }
+ catch (Exception e) {
+ if (!lazy()) {
+ log.error("In lazy=false mode the query must be finished successfully", e);
+
+ fail("In lazy=false mode the query must be finished successfully");
+ }
+ else
+ assertNotNull(X.cause(e, QueryRetryException.class));
+ }
+ }
+
+ /**
+ * @param node Ignite node to execute query.
+ * @throws Exception If failed.
+ */
+ private void checkTablesLockQueryAndDDLMultithreaded(final Ignite node) throws Exception {
+ final AtomicBoolean end = new AtomicBoolean(false);
+
+ final int qryThreads = 10;
+
+ // Do many concurrent queries.
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ while(!end.get()) {
+ try {
+ FieldsQueryCursor<List<?>> cursor = execute(node, new SqlFieldsQuery(
+ "SELECT pers.id, pers.name " +
+ "FROM (SELECT DISTINCT p.id, p.name " +
+ "FROM \"pers\".PERSON as p) as pers " +
+ "JOIN \"pers\".PERSON p on p.id = pers.id " +
+ "JOIN (SELECT t.persId as persId, SUM(t.time) totalTime " +
+ "FROM \"persTask\".PersonTask as t GROUP BY t.persId) as task ON task.persId = pers.id")
+ .setLazy(lazy())
+ .setPageSize(PAGE_SIZE_SMALL));
+
+ cursor.getAll();
+ }
+ catch (Exception e) {
+ if(X.cause(e, QueryRetryException.class) == null) {
+ log.error("Unexpected exception", e);
+
+ fail("Unexpected exception. " + e);
+ }
+ else if (!lazy()) {
+ log.error("Unexpected exception", e);
+
+ fail("Unexpected QueryRetryException.");
+ }
+ }
+ }
+ }
+ }, qryThreads, "usr-qry");
+
+ long tEnd = U.currentTimeMillis() + TEST_DUR;
+
+ while (U.currentTimeMillis() < tEnd) {
+ execute(node, new SqlFieldsQuery("CREATE INDEX \"pers\".PERSON_NAME ON \"pers\".Person (name asc)")).getAll();
+ execute(node, new SqlFieldsQuery("DROP INDEX \"pers\".PERSON_NAME")).getAll();
+ }
+
+ // Test is OK in case DDL operations is passed on hi load queries pressure.
+ end.set(true);
+ fut.get();
+ }
+
+ /**
+ * @param node Ignite node to execute query.
+ * @throws Exception If failed.
+ */
+ private void checkTablesLockQueryAndDropColumnMultithreaded(final Ignite node) throws Exception {
+ final AtomicBoolean end = new AtomicBoolean(false);
+
+ final int qryThreads = 10;
+
+ // Do many concurrent queries.
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ while(!end.get()) {
+ try {
+ FieldsQueryCursor<List<?>> cursor = execute(node, new SqlFieldsQuery(
+ "SELECT pers.id, pers.name FROM \"pers\".PERSON")
+ .setLazy(lazy())
+ .setPageSize(PAGE_SIZE_SMALL));
+
+ cursor.getAll();
+ }
+ catch (Exception e) {
+ if (e.getMessage().contains("Failed to parse query. Column \"PERS.ID\" not found")) {
+ // Swallow exception when column is dropped.
+ }
+ else if(X.cause(e, QueryRetryException.class) == null) {
+ log.error("Unexpected exception", e);
+
+ fail("Unexpected exception. " + e);
+ }
+ else if (!lazy()) {
+ log.error("Unexpected exception", e);
+
+ fail("Unexpected QueryRetryException.");
+ }
+ }
+ }
+ }
+ }, qryThreads, "usr-qry");
+
+
+ long tEnd = U.currentTimeMillis() + TEST_DUR;
+
+ while (U.currentTimeMillis() < tEnd) {
+ execute(node, new SqlFieldsQuery("ALTER TABLE \"pers\".Person DROP COLUMN name")).getAll();
+ execute(node, new SqlFieldsQuery("ALTER TABLE \"pers\".Person ADD COLUMN name varchar")).getAll();
+ }
+
+ // Test is OK in case DDL operations is passed on hi load queries pressure.
+ end.set(true);
+ fut.get();
+ }
+
+ /**
+ * Test release reserved partition after query complete.
+ * In case partitions not released the `awaitPartitionMapExchange` fails by timeout.
+ *
+ * @param pageSize Results page size.
+ * @throws Exception If failed.
+ */
+ public void checkReleasePartitionReservation(int pageSize) throws Exception {
+ Ignite srv1 = startGrid(1);
+ startGrid(2);
+
+ populateBaseQueryData(srv1, 1);
+
+ FieldsQueryCursor<List<?>> cursor = execute(srv1, query(0).setPageSize(pageSize));
+
+ cursor.getAll();
+
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * Check local query execution.
+ *
+ * @param parallelism Query parallelism.
+ * @throws Exception If failed.
+ */
+ public void checkSingleNode(int parallelism) throws Exception {
+ Ignite srv = startGrid();
+
+ populateBaseQueryData(srv, parallelism);
+
+ checkBaseOperations(srv);
+ }
+
+ /**
+ * Check query execution with multiple topology nodes.
+ *
+ * @param parallelism Query parallelism.
+ * @throws Exception If failed.
+ */
+ public void checkMultipleNodes(int parallelism) throws Exception {
+ Ignite srv1 = startGrid(1);
+ Ignite srv2 = startGrid(2);
+
+ Ignite cli;
+
+ try {
+ Ignition.setClientMode(true);
+
+ cli = startGrid(3);
+ }
+ finally {
+ Ignition.setClientMode(false);
+ }
+
+ populateBaseQueryData(cli, parallelism);
+
+ checkBaseOperations(srv1);
+ checkBaseOperations(srv2);
+ checkBaseOperations(cli);
+
+ // Test originating node leave.
+ FieldsQueryCursor<List<?>> cursor = execute(cli, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+ Iterator<List<?>> iter = cursor.iterator();
+
+ for (int i = 0; i < 30; i++)
+ iter.next();
+
+ stopGrid(3);
+
+ // Test server node leave with active worker.
+ FieldsQueryCursor<List<?>> cursor2 = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+ try {
+ Iterator<List<?>> iter2 = cursor2.iterator();
+
+ for (int i = 0; i < 30; i++)
+ iter2.next();
+
+ stopGrid(2);
+ }
+ finally {
+ cursor2.close();
+ }
+ }
+
+ /**
+ * Check base operations.
+ *
+ * @param node Node.
+ * @throws Exception If failed.
+ */
+ private void checkBaseOperations(Ignite node) throws Exception {
+ checkQuerySplitToSeveralMapQueries(node);
+
+ // Get full data.
+ {
+ List<List<?>> rows = execute(node, baseQuery()).getAll();
+
+ assertBaseQueryResults(rows);
+ }
+
+ // Check QueryRetryException is thrown
+ {
+ List<List<?>> rows = new ArrayList<>();
+
+ FieldsQueryCursor<List<?>> cursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+ Iterator<List<?>> it = cursor.iterator();
+
+ for (int i = 0; i < 10; ++i)
+ rows.add(it.next());
+
+ execute(node, new SqlFieldsQuery("CREATE INDEX \"pers\".PERSON_NAME ON \"pers\".Person (name asc)")).getAll();
+ execute(node, new SqlFieldsQuery("DROP INDEX \"pers\".PERSON_NAME")).getAll();
+
+ try {
+ while (it.hasNext())
+ rows.add(it.next());
+
+ if (lazy())
+ fail("Retry exception must be thrown");
+ }
+ catch (Exception e) {
+ if (!lazy() || X.cause(e, QueryRetryException.class) == null) {
+ log.error("Invalid exception: ", e);
+
+ fail("QueryRetryException is expected");
+ }
+ }
+ }
+
+ // Get data in several pages.
+ {
+ List<List<?>> rows = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL)).getAll();
+
+ assertBaseQueryResults(rows);
+ }
+
+
+ // Test full iteration.
+ {
+ List<List<?>> rows = new ArrayList<>();
+
+ FieldsQueryCursor<List<?>> cursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+ for (List<?> row : cursor)
+ rows.add(row);
+
+ cursor.close();
+
+ assertBaseQueryResults(rows);
+ }
+
+ // Test partial iteration with cursor close.
+ try (FieldsQueryCursor<List<?>> partialCursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL))) {
+ Iterator<List<?>> iter = partialCursor.iterator();
+
+ for (int i = 0; i < 30; i++)
+ iter.next();
+ }
+
+
+ // Test execution of multiple queries at a time.
+ List<Iterator<List<?>>> iters = new ArrayList<>();
+
+ for (int i = 0; i < 200; i++)
+ iters.add(execute(node, randomizedQuery().setPageSize(PAGE_SIZE_SMALL)).iterator());
+
+ while (!iters.isEmpty()) {
+ Iterator<Iterator<List<?>>> iterIter = iters.iterator();
+
+ while (iterIter.hasNext()) {
+ Iterator<List<?>> iter = iterIter.next();
+
+ int i = 0;
+
+ while (iter.hasNext() && i < 20) {
+ iter.next();
+
+ i++;
+ }
+
+ if (!iter.hasNext())
+ iterIter.remove();
+ }
+ }
+
+ checkHoldQuery(node);
+
+ checkShortQuery(node);
+ }
+
+ /**
+ * @param node Ignite node.
+ * @throws Exception If failed.
+ */
+ public void checkHoldQuery(Ignite node) throws Exception {
+ ArrayList rows = new ArrayList<>();
+
+ Iterator<List<?>> it0 = execute(node, query(BASE_QRY_ARG).setPageSize(PAGE_SIZE_SMALL)).iterator();
+ rows.add(it0.next());
+
+ // Do many concurrent queries to Test full iteration.
+ GridTestUtils.runMultiThreaded(new Runnable() {
+ @Override public void run() {
+ for (int i = 0; i < 5; ++i) {
+ FieldsQueryCursor<List<?>> cursor = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1)
+ .setPageSize(PAGE_SIZE_SMALL));
+
+ cursor.getAll();
+ }
+ }
+ }, 5, "test-qry");
+
+ // Do the same query in the same thread.
+ {
+ FieldsQueryCursor<List<?>> cursor = execute(node, query(BASE_QRY_ARG)
+ .setPageSize(PAGE_SIZE_SMALL));
+
+ cursor.getAll();
+ }
+
+ while (it0.hasNext())
+ rows.add(it0.next());
+
+ assertBaseQueryResults(rows);
+ }
+
+ /**
+ * @param node Ignite node.
+ * @throws Exception If failed.
+ */
+ public void checkShortQuery(Ignite node) throws Exception {
+ ArrayList rows = new ArrayList<>();
+
+ FieldsQueryCursor<List<?>> cursor0 = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1).setPageSize(PAGE_SIZE_SMALL));
+
+ Iterator<List<?>> it = cursor0.iterator();
+
+ while (it.hasNext())
+ rows.add(it.next());
+
+ assertQueryResults(rows, KEY_CNT - PAGE_SIZE_SMALL + 1);
+ }
+
+ /**
+ * @param node Ignite node.
+ * @throws Exception If failed.
+ */
+ public void checkQuerySplitToSeveralMapQueries(Ignite node) throws Exception {
+ ArrayList rows = new ArrayList<>();
+
+ FieldsQueryCursor<List<?>> cursor0 = execute(node, new SqlFieldsQuery(
+ "SELECT pers.id, pers.name " +
+ "FROM (SELECT DISTINCT p.id, p.name " +
+ "FROM \"pers\".PERSON as p) as pers " +
+ "JOIN \"pers\".PERSON p on p.id = pers.id " +
+ "JOIN (SELECT t.persId as persId, SUM(t.time) totalTime " +
+ "FROM \"persTask\".PersonTask as t GROUP BY t.persId) as task ON task.persId = pers.id")
+ .setPageSize(PAGE_SIZE_SMALL));
+
+ Iterator<List<?>> it = cursor0.iterator();
+
+ while (it.hasNext())
+ rows.add(it.next());
+
+ assertQueryResults(rows, 0);
+ }
+
+ /**
+ * Populate base query data.
+ *
+ * @param node Node.
+ * @param parallelism Query parallelism.
+ */
+ private static void populateBaseQueryData(Ignite node, int parallelism) {
+ node.createCache(cacheConfiguration(parallelism, "pers", Person.class));
+ node.createCache(cacheConfiguration(parallelism, "persTask", PersonTask.class));
+
+ IgniteCache<Long, Object> pers = cache(node, "pers");
+
+ for (long i = 0; i < KEY_CNT; i++)
+ pers.put(i, new Person(i));
+
+ IgniteCache<Long, Object> comp = cache(node, "persTask");
+
+ for (long i = 0; i < KEY_CNT; i++)
+ comp.put(i, new PersonTask(i));
+ }
+
+ /**
+ * @return Query with randomized argument.
+ */
+ private static SqlFieldsQuery randomizedQuery() {
+ return query(ThreadLocalRandom.current().nextInt(KEY_CNT / 2));
+ }
+
+ /**
+ * @return Base query.
+ */
+ private static SqlFieldsQuery baseQuery() {
+ return query(BASE_QRY_ARG);
+ }
+
+ /**
+ * @param parallelism Query parallelism.
+ * @param name Cache name.
+ * @param valClass Value class.
+ * @return Default cache configuration.
+ */
+ private static CacheConfiguration<Long, Person> cacheConfiguration(int parallelism, String name, Class valClass) {
+ return new CacheConfiguration<Long, Person>()
+ .setName(name)
+ .setIndexedTypes(Long.class, valClass)
+ .setQueryParallelism(parallelism)
+ .setAffinity(new RendezvousAffinityFunction(false, 10));
+ }
+
+ /**
+ * Default query.
+ *
+ * @param arg Argument.
+ * @return Query.
+ */
+ private static SqlFieldsQuery query(long arg) {
+ return new SqlFieldsQuery(
+ "SELECT id, name FROM \"pers\".Person WHERE id >= " + arg);
+ }
+
+ /**
+ * Assert base query results.
+ *
+ * @param rows Result rows.
+ */
+ private static void assertBaseQueryResults(List<List<?>> rows) {
+ assertQueryResults(rows, BASE_QRY_ARG);
+ }
+
+ /**
+ * Assert base query results.
+ *
+ * @param rows Result rows.
+ * @param resSize Result size.
+ */
+ private static void assertQueryResults(List<List<?>> rows, int resSize) {
+ assertEquals(KEY_CNT - resSize, rows.size());
+
+ for (List<?> row : rows) {
+ Long id = (Long)row.get(0);
+ String name = (String)row.get(1);
+
+ assertTrue(id >= resSize);
+ assertEquals(nameForId(id), name);
+ }
+ }
+
+ /**
+ * Get cache for node.
+ *
+ * @param node Ignite node to get cache.
+ * @param name Cache name.
+ * @return Cache.
+ */
+ private static IgniteCache<Long, Object> cache(Ignite node, String name) {
+ return node.cache(name);
+ }
+
+ /**
+ * Execute query on the given cache.
+ *
+ * @param node Node.
+ * @param sql Query.
+ * @return Cursor.
+ */
+ private FieldsQueryCursor<List<?>> execute(Ignite node, String sql) {
+ return ((IgniteEx)node).context().query().querySqlFields(new SqlFieldsQuery(sql).setLazy(lazy()), false);
+ }
+
+ /**
+ * Execute query on the given cache.
+ *
+ * @param node Node.
+ * @param qry Query.
+ * @return Cursor.
+ */
+ private FieldsQueryCursor<List<?>> execute(Ignite node, SqlFieldsQuery qry) {
+ return ((IgniteEx)node).context().query().querySqlFields(qry.setLazy(lazy()), false);
+ }
+
+
+ /**
+ * @return Lazy mode.
+ */
+ protected abstract boolean lazy();
+
+ /**
+ * Get name for ID.
+ *
+ * @param id ID.
+ * @return Name.
+ */
+ private static String nameForId(long id) {
+ return "name-" + id;
+ }
+
+ /**
+ * Person class.
+ */
+ private static class Person {
+ /** ID. */
+ @QuerySqlField(index = true)
+ private long id;
+
+ /** Name. */
+ @QuerySqlField
+ private String name;
+
+ /**
+ * Constructor.
+ *
+ * @param id ID.
+ */
+ public Person(long id) {
+ this.id = id;
+ this.name = nameForId(id);
+ }
+
+ /**
+ * @return ID.
+ */
+ public long id() {
+ return id;
+ }
+
+ /**
+ * @return Name.
+ */
+ public String name() {
+ return name;
+ }
+ }
+
+ /**
+ * Company class.
+ */
+ private static class PersonTask {
+ /** ID. */
+ @QuerySqlField(index = true)
+ private long id;
+
+ @QuerySqlField(index = true)
+ private long persId;
+
+ /** Name. */
+ @QuerySqlField
+ private long time;
+
+ /**
+ * Constructor.
+ *
+ * @param id ID.
+ */
+ public PersonTask(long id) {
+ this.id = id;
+ this.persId = id;
+ this.time = id;
+ }
+
+ /**
+ * @return ID.
+ */
+ public long id() {
+ return id;
+ }
+
+ /**
+ * @return Name.
+ */
+ public long time() {
+ return time;
+ }
+ }
+}
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOffTest.java
similarity index 71%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOffTest.java
index 0a6181e..eabb076 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOffTest.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.ignite.testsuites;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+package org.apache.ignite.internal.processors.query;
/**
- * Special test suite with ignored tests for Binary mode.
+ * Tests for query execution with disabled lazy mode.
*/
-@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+public class IgniteQueryTableLockAndConnectionPoolLazyModeOffTest
+ extends AbstractQueryTableLockAndConnectionPoolSelfTest {
+ /** {@inheritDoc} */
+ @Override protected boolean lazy() {
+ return false;
+ }
}
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOnTest.java
similarity index 72%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOnTest.java
index 0a6181e..1027880 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOnTest.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.ignite.testsuites;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+package org.apache.ignite.internal.processors.query;
/**
- * Special test suite with ignored tests for Binary mode.
+ * Tests for lazy query execution.
*/
-@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+public class IgniteQueryTableLockAndConnectionPoolLazyModeOnTest
+ extends AbstractQueryTableLockAndConnectionPoolSelfTest {
+ /** {@inheritDoc} */
+ @Override protected boolean lazy() {
+ return true;
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
deleted file mode 100644
index 9128a55..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.cache.query.annotations.QuerySqlField;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.junit.Test;
-
-/**
- * Tests for lazy query execution.
- */
-public class LazyQuerySelfTest extends AbstractIndexingCommonTest {
- /** Keys ocunt. */
- private static final int KEY_CNT = 200;
-
- /** Base query argument. */
- private static final int BASE_QRY_ARG = 50;
-
- /** Size for small pages. */
- private static final int PAGE_SIZE_SMALL = 12;
-
- /** Cache name. */
- private static final String CACHE_NAME = "cache";
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
- }
-
- /**
- * Test local query execution.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testSingleNode() throws Exception {
- checkSingleNode(1);
- }
-
- /**
- * Test local query execution.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testSingleNodeWithParallelism() throws Exception {
- checkSingleNode(4);
- }
-
- /**
- * Test query execution with multiple topology nodes.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testMultipleNodes() throws Exception {
- checkMultipleNodes(1);
- }
-
- /**
- * Test query execution with multiple topology nodes with query parallelism.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testMultipleNodesWithParallelism() throws Exception {
- checkMultipleNodes(4);
- }
-
- /**
- * Check local query execution.
- *
- * @param parallelism Query parallelism.
- * @throws Exception If failed.
- */
- public void checkSingleNode(int parallelism) throws Exception {
- Ignite srv = startGrid();
-
- srv.createCache(cacheConfiguration(parallelism));
-
- populateBaseQueryData(srv);
-
- checkBaseOperations(srv);
- }
-
- /**
- * Check query execution with multiple topology nodes.
- *
- * @param parallelism Query parallelism.
- * @throws Exception If failed.
- */
- public void checkMultipleNodes(int parallelism) throws Exception {
- Ignite srv1 = startGrid(1);
- Ignite srv2 = startGrid(2);
-
- Ignite cli;
-
- try {
- Ignition.setClientMode(true);
-
- cli = startGrid(3);
- }
- finally {
- Ignition.setClientMode(false);
- }
-
- cli.createCache(cacheConfiguration(parallelism));
-
- populateBaseQueryData(cli);
-
- checkBaseOperations(srv1);
- checkBaseOperations(srv2);
- checkBaseOperations(cli);
-
- // Test originating node leave.
- FieldsQueryCursor<List<?>> cursor = execute(cli, baseQuery().setPageSize(PAGE_SIZE_SMALL));
-
- Iterator<List<?>> iter = cursor.iterator();
-
- for (int i = 0; i < 30; i++)
- iter.next();
-
- stopGrid(3);
-
- assertNoWorkers();
-
- // Test server node leave with active worker.
- cursor = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL));
-
- try {
- iter = cursor.iterator();
-
- for (int i = 0; i < 30; i++)
- iter.next();
-
- stopGrid(2);
- }
- finally {
- cursor.close();
- }
-
- assertNoWorkers();
- }
-
- /**
- * Check base operations.
- *
- * @param node Node.
- * @throws Exception If failed.
- */
- private void checkBaseOperations(Ignite node) throws Exception {
- // Get full data.
- List<List<?>> rows = execute(node, baseQuery()).getAll();
-
- assertBaseQueryResults(rows);
- assertNoWorkers();
-
- // Get data in several pages.
- rows = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL)).getAll();
-
- assertBaseQueryResults(rows);
- assertNoWorkers();
-
- // Test full iteration.
- rows = new ArrayList<>();
-
- FieldsQueryCursor<List<?>> cursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL));
-
- for (List<?> row : cursor)
- rows.add(row);
-
- assertBaseQueryResults(rows);
- assertNoWorkers();
-
- // Test partial iteration with cursor close.
- try (FieldsQueryCursor<List<?>> partialCursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL))) {
- Iterator<List<?>> iter = partialCursor.iterator();
-
- for (int i = 0; i < 30; i++)
- iter.next();
- }
-
- assertNoWorkers();
-
- // Test execution of multiple queries at a time.
- List<Iterator<List<?>>> iters = new ArrayList<>();
-
- for (int i = 0; i < 200; i++)
- iters.add(execute(node, randomizedQuery().setPageSize(PAGE_SIZE_SMALL)).iterator());
-
- while (!iters.isEmpty()) {
- Iterator<Iterator<List<?>>> iterIter = iters.iterator();
-
- while (iterIter.hasNext()) {
- Iterator<List<?>> iter = iterIter.next();
-
- int i = 0;
-
- while (iter.hasNext() && i < 20) {
- iter.next();
-
- i++;
- }
-
- if (!iter.hasNext())
- iterIter.remove();
- }
- }
-
- assertNoWorkers();
- }
-
- /**
- * Populate base query data.
- *
- * @param node Node.
- */
- private static void populateBaseQueryData(Ignite node) {
- IgniteCache<Long, Person> cache = cache(node);
-
- for (long i = 0; i < KEY_CNT; i++)
- cache.put(i, new Person(i));
- }
-
- /**
- * @return Query with randomized argument.
- */
- private static SqlFieldsQuery randomizedQuery() {
- return query(ThreadLocalRandom.current().nextInt(KEY_CNT / 2));
- }
-
- /**
- * @return Base query.
- */
- private static SqlFieldsQuery baseQuery() {
- return query(BASE_QRY_ARG);
- }
-
- /**
- * @param parallelism Query parallelism.
- * @return Default cache configuration.
- */
- private static CacheConfiguration<Long, Person> cacheConfiguration(int parallelism) {
- return new CacheConfiguration<Long, Person>().setName(CACHE_NAME).setIndexedTypes(Long.class, Person.class)
- .setQueryParallelism(parallelism);
- }
-
- /**
- * Default query.
- *
- * @param arg Argument.
- * @return Query.
- */
- private static SqlFieldsQuery query(long arg) {
- return new SqlFieldsQuery("SELECT id, name FROM Person WHERE id >= ?").setArgs(arg);
- }
-
- /**
- * Assert base query results.
- *
- * @param rows Result rows.
- */
- private static void assertBaseQueryResults(List<List<?>> rows) {
- assertEquals(KEY_CNT - BASE_QRY_ARG, rows.size());
-
- for (List<?> row : rows) {
- Long id = (Long)row.get(0);
- String name = (String)row.get(1);
-
- assertTrue(id >= BASE_QRY_ARG);
- assertEquals(nameForId(id), name);
- }
- }
-
- /**
- * Get cache for node.
- *
- * @param node Node.
- * @return Cache.
- */
- private static IgniteCache<Long, Person> cache(Ignite node) {
- return node.cache(CACHE_NAME);
- }
-
- /**
- * Execute query on the given cache.
- *
- * @param node Node.
- * @param qry Query.
- * @return Cursor.
- */
- private static FieldsQueryCursor<List<?>> execute(Ignite node, SqlFieldsQuery qry) {
- return cache(node).query(qry.setLazy(true));
- }
-
- /**
- * Make sure that are no active lazy workers.
- *
- * @throws Exception If failed.
- */
- private static void assertNoWorkers() throws Exception {
- assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- for (Ignite node : Ignition.allGrids()) {
- IgniteH2Indexing idx = (IgniteH2Indexing) ((IgniteKernal)node).context().query().getIndexing();
-
- if (idx.mapQueryExecutor().registeredLazyWorkers() != 0)
- return false;
- }
-
- return MapQueryLazyWorker.activeCount() == 0;
- }
- }, 1000L);
- }
-
- /**
- * Get name for ID.
- *
- * @param id ID.
- * @return Name.
- */
- private static String nameForId(long id) {
- return "name-" + id;
- }
-
- /**
- * Person class.
- */
- private static class Person {
- /** ID. */
- @QuerySqlField(index = true)
- private long id;
-
- /** Name. */
- @QuerySqlField
- private String name;
-
- /**
- * Constructor.
- *
- * @param id ID.
- */
- public Person(long id) {
- this.id = id;
- this.name = nameForId(id);
- }
-
- /**
- * @return ID.
- */
- public long id() {
- return id;
- }
-
- /**
- * @return Name.
- */
- public String name() {
- return name;
- }
- }
-}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
index 8e50bf8..18ea78d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
@@ -366,7 +366,7 @@ public class QueryDataPageScanTest extends GridCommonAbstractTest {
.setLazy(true)
.setDataPageScanEnabled(DirectPageScanIndexing.expectedDataPageScanEnabled)
.setArgs(1, expNestedLoops, DirectPageScanIndexing.expectedDataPageScanEnabled)
- .setPageSize(keysCnt / 2) // Must be less than keysCnt.
+ .setPageSize(keysCnt / 10) // Must be less than keysCnt.
)
) {
int nestedLoops = 0;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java
index 5ba8e68..4e276b2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java
@@ -44,18 +44,22 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTest
public class DisappearedCacheCauseRetryMessageSelfTest extends AbstractIndexingCommonTest {
/** */
private static final int NODES_COUNT = 2;
+
/** */
private static final String ORG = "org";
+
/** */
private IgniteCache<String, JoinSqlTestHelper.Person> personCache;
+
/** */
private IgniteCache<String, JoinSqlTestHelper.Organization> orgCache;
/** */
@Test
public void testDisappearedCacheCauseRetryMessage() {
-
- SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>(JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0");
+ SqlQuery<String, JoinSqlTestHelper.Person> qry =
+ new SqlQuery<String, JoinSqlTestHelper.Person>(JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL)
+ .setArgs("Organization #0");
qry.setDistributedJoins(true);
@@ -65,6 +69,9 @@ public class DisappearedCacheCauseRetryMessageSelfTest extends AbstractIndexingC
fail("No CacheException emitted.");
}
catch (CacheException e) {
+ if (!e.getMessage().contains("Failed to reserve partitions for query (cache is not found on local node) ["))
+ e.printStackTrace();
+
assertTrue(e.getMessage(), e.getMessage().contains("Failed to reserve partitions for query (cache is not found on local node) ["));
}
}
@@ -92,11 +99,11 @@ public class DisappearedCacheCauseRetryMessageSelfTest extends AbstractIndexingC
GridQueryCancelRequest req = (GridQueryCancelRequest) (gridMsg.message());
if (reqId == req.queryRequestId())
- orgCache = DisappearedCacheCauseRetryMessageSelfTest.this.ignite(0).getOrCreateCache(new CacheConfiguration<String, Organization>(ORG)
+ orgCache = DisappearedCacheCauseRetryMessageSelfTest.this.ignite(0)
+ .getOrCreateCache(new CacheConfiguration<String, Organization>(ORG)
.setCacheMode(CacheMode.REPLICATED)
.setQueryEntities(JoinSqlTestHelper.organizationQueryEntity())
);
-
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
index 65c37b0..f09cc5b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
@@ -402,31 +402,5 @@ public class RetryCauseMessageSelfTest extends AbstractIndexingCommonTest {
@Override public void onMessage(UUID nodeId, Object msg) {
startedExecutor.onMessage(nodeId, msg);
}
-
- /** {@inheritDoc} */
- @Override public void cancelLazyWorkers() {
- startedExecutor.cancelLazyWorkers();
- }
-
- /** {@inheritDoc} */
- @Override GridSpinBusyLock busyLock() {
- return startedExecutor.busyLock();
- }
-
- /** {@inheritDoc} */
- @Override public void stopAndUnregisterCurrentLazyWorker() {
- startedExecutor.stopAndUnregisterCurrentLazyWorker();
- }
-
- /** {@inheritDoc} */
- @Override public void unregisterLazyWorker(MapQueryLazyWorker worker) {
- startedExecutor.unregisterLazyWorker(worker);
- }
-
- /** {@inheritDoc} */
- @Override public int registeredLazyWorkers() {
- return startedExecutor.registeredLazyWorkers();
- }
}
-
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/AbstractQueryOOMTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/AbstractQueryOOMTest.java
new file mode 100644
index 0000000..1d9b4a7
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/AbstractQueryOOMTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.oom;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for OOME on query.
+ */
+@RunWith(JUnit4.class)
+public abstract class AbstractQueryOOMTest extends GridCommonAbstractTest {
+ /** */
+ private static final long KEY_CNT = 2_000_000L;
+
+ /** */
+ private static final String CACHE_NAME = "test_cache";
+
+ /** */
+ private static final String HAS_CACHE = "HAS_CACHE";
+
+ /** */
+ private static final int RMT_NODES_CNT = 3;
+
+ /** */
+ private static final long HANG_TIMEOUT = 15 * 60 * 1000;
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 30 * 60 * 1000; // 30 mins
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<String> additionalRemoteJvmArgs() {
+ return Arrays.asList("-Xmx128m");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isRemoteJvm(String igniteInstanceName) {
+ return igniteInstanceName.startsWith("remote");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true)))
+ .setCacheConfiguration(new CacheConfiguration()
+ .setName(CACHE_NAME)
+ .setNodeFilter(new TestNodeFilter())
+ .setBackups(0)
+ .setQueryParallelism(queryParallelism())
+ .setQueryEntities(Collections.singleton(new QueryEntity()
+ .setTableName("test")
+ .setKeyFieldName("ID")
+ .setValueType(Value.class.getName())
+ .addQueryField("ID", Long.class.getName(), null)
+ .addQueryField("INDEXED", Long.class.getName(), null)
+ .addQueryField("VAL", Long.class.getName(), null)
+ .addQueryField("STR", String.class.getName(), null)
+ .setIndexes(Collections.singleton(new QueryIndex("INDEXED"))))))
+ .setUserAttributes(igniteInstanceName.startsWith("remote") ? F.asMap(HAS_CACHE, true) : null)
+ .setClientMode(igniteInstanceName.startsWith("client"));
+ }
+
+ /**
+ * @return query parallelism value.
+ */
+ protected abstract int queryParallelism();
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ cleanPersistenceDir();
+
+ Ignite local = startGrid(0);
+
+ for (int i = 0; i < RMT_NODES_CNT; ++i)
+ startGrid("remote-" + i);
+
+ local.cluster().active(true);
+
+ try (IgniteDataStreamer streamer = local.dataStreamer(CACHE_NAME)) {
+ for (long i = 0; i < KEY_CNT; ++i) {
+ streamer.addData(i, new Value(i));
+
+ if (i % 100_000 == 0)
+ log.info("Populate " + i + " values");
+ }
+ }
+
+ awaitPartitionMapExchange(true, true, null);
+
+ local.cluster().active(false);
+
+ stopAllGrids(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ cleanPersistenceDir();
+
+ stopAllGrids();
+ }
+
+ /**
+ * beforeTest is not user to save the time fot muted tests.
+ * @throws Exception On error.
+ */
+ private void startTestGrid() throws Exception {
+ log.info("Restart cluster");
+
+ Ignite loc = startGrid(0);
+
+ for (int i = 0; i < RMT_NODES_CNT; ++i)
+ startGrid("remote-" + i);
+
+ loc.cluster().active(true);
+
+ stopGrid(0, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(false);
+
+ IgniteProcessProxy.killAll();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Test
+ public void testHeavyScanLazy() throws Exception {
+ startTestGrid();
+
+ checkQuery("SELECT * from test", KEY_CNT, true);
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+ @Test
+ public void testHeavyScanNonLazy() throws Exception {
+ startTestGrid();
+
+ checkQueryExpectOOM("SELECT * from test", false);
+ }
+
+ /**
+ * OOM on reduce. See IGNITE-9933
+ * @throws Exception On error.
+ */
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-9933")
+ @Test
+ public void testHeavySortByPkLazy() throws Exception {
+ startTestGrid();
+
+ checkQueryExpectOOM("SELECT * from test ORDER BY id", true);
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+ @Test
+ public void testHeavySortByPkNotLazy() throws Exception {
+ startTestGrid();
+
+ checkQueryExpectOOM("SELECT * from test ORDER BY id", false);
+ }
+
+ /**
+ * OOM on reduce. See IGNITE-9933
+ * @throws Exception On error.
+ */
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-9933")
+ @Test
+ public void testHeavySortByIndexLazy() throws Exception {
+ startTestGrid();
+
+ checkQueryExpectOOM("SELECT * from test ORDER BY indexed", true);
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+ @Test
+ public void testHeavySortByIndexNotLazy() throws Exception {
+ startTestGrid();
+
+ checkQueryExpectOOM("SELECT * from test ORDER BY indexed", false);
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+ @Test
+ public void testHeavySortByNotIndexLazy() throws Exception {
+ startTestGrid();
+
+ checkQueryExpectOOM("SELECT * from test ORDER BY STR", true);
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+ @Test
+ public void testHeavySortByNotIndexNotLazy() throws Exception {
+ startTestGrid();
+
+ checkQueryExpectOOM("SELECT * from test ORDER BY str", false);
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Test
+ public void testHeavyGroupByPkLazy() throws Exception {
+ startTestGrid();
+
+ checkQuery("SELECT id, sum(val) from test GROUP BY id", KEY_CNT, true, true);
+ }
+
+ /**
+ * @throws Exception On error.
+ */
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+ @Test
+ public void testHeavyGroupByPkNotLazy() throws Exception {
+
+ startTestGrid();
+
+ checkQueryExpectOOM("SELECT id, sum(val) from test GROUP BY id", false, true);
+ }
+
+
+ /**
+ * @param sql Query.
+ * @param lazy Lazy mode.
+ * @throws Exception On error.
+ */
+ private void checkQueryExpectOOM(String sql, boolean lazy) throws Exception {
+ checkQueryExpectOOM(sql, lazy, false);
+ }
+
+ /**
+ * @param sql Query.
+ * @param lazy Lazy mode.
+ * @param collocated Collocated GROUP BY mode.
+ * @throws Exception On error.
+ */
+ private void checkQueryExpectOOM(String sql, boolean lazy, boolean collocated) throws Exception {
+ final AtomicBoolean hangTimeout = new AtomicBoolean();
+ final AtomicBoolean hangCheckerEnd = new AtomicBoolean();
+
+ // Start grid hang checker.
+ // In some cases grid hangs (e.g. when OOME is thrown at the discovery thread).
+ IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
+ try {
+ long startTime = U.currentTimeMillis();
+
+ while (!hangCheckerEnd.get() && U.currentTimeMillis() - startTime < HANG_TIMEOUT)
+ U.sleep(1000);
+
+ if (hangCheckerEnd.get())
+ return;
+
+ hangTimeout.set(true);
+
+ log.info("Kill hung grids");
+
+ stopAllGrids();
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ fail("Unexpected interruption");
+ }
+ });
+
+ try {
+ checkQuery(sql, 0, lazy, collocated);
+ }
+ catch (Exception e) {
+ if (hangTimeout.get()) {
+ log.info("Grid hangs");
+
+ return;
+ }
+
+ if (e.getMessage().contains("Failed to execute SQL query. Out of memory"))
+ log.info("OOME is thrown");
+ else if (e.getMessage().contains("Failed to communicate with Ignite cluster"))
+ log.info("Node is down");
+ else
+ log.warning("Other error with OOME cause", e);
+ }
+ finally {
+ hangCheckerEnd.set(true);
+
+ fut.get();
+ }
+ }
+
+ /**
+ * @param sql Query.
+ * @param expectedRowCnt Expected row count.
+ * @param lazy Lazy mode.
+ * @throws Exception On failure.
+ */
+ public void checkQuery(String sql, long expectedRowCnt, boolean lazy) throws Exception {
+ checkQuery(sql, expectedRowCnt, lazy, false);
+ }
+
+ /**
+ * @param sql Query.
+ * @param expectedRowCnt Expected row count.
+ * @param lazy Lazy mode.
+ * @param collocated Collocated group by flag.
+ * @throws Exception On failure.
+ */
+ public void checkQuery(String sql, long expectedRowCnt, boolean lazy, boolean collocated) throws Exception {
+ try (Connection c = DriverManager.getConnection(
+ "jdbc:ignite:thin://127.0.0.1:10800..10850/\"test_cache\"" +
+ "?collocated=" + collocated +
+ "&lazy=" + lazy)) {
+ try (Statement stmt = c.createStatement()) {
+ log.info("Run heavy query: " + sql);
+
+ stmt.execute(sql);
+
+ ResultSet rs = stmt.getResultSet();
+
+ long cnt = 0;
+ while (rs.next())
+ cnt++;
+
+ assertEquals("Invalid row count:", expectedRowCnt, cnt);
+ }
+ }
+ }
+
+ /** */
+ public static class Value {
+ /** Secondary ID. */
+ @QuerySqlField(index = true)
+ private long indexed;
+
+ /** Secondary ID. */
+ @QuerySqlField
+ private long val;
+
+ /** String value. */
+ @QuerySqlField
+ private String str;
+
+ /**
+ * @param id ID.
+ */
+ public Value(long id) {
+ indexed = id / 10;
+ val = id;
+ str = "value " + id;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class TestNodeFilter implements IgnitePredicate<ClusterNode> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return node.attribute(HAS_CACHE) != null;
+ }
+ }
+}
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/IgniteQueryOOMTestSuite.java
similarity index 75%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/IgniteQueryOOMTestSuite.java
index 0a6181e..d57a607 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/IgniteQueryOOMTestSuite.java
@@ -15,15 +15,20 @@
* limitations under the License.
*/
-package org.apache.ignite.testsuites;
+package org.apache.ignite.internal.processors.query.oom;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
/**
- * Special test suite with ignored tests for Binary mode.
+ * Test suite for queries produces OOME in some cases.
*/
@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+@Suite.SuiteClasses({
+ //Query history.
+ QueryOOMWithoutQueryParallelismTest.class,
+ QueryOOMWithQueryParallelismTest.class,
+})
+public class IgniteQueryOOMTestSuite {
}
+
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithQueryParallelismTest.java
similarity index 72%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithQueryParallelismTest.java
index 0a6181e..4a0404c 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithQueryParallelismTest.java
@@ -15,15 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.testsuites;
+package org.apache.ignite.internal.processors.query.oom;
import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+import org.junit.runners.JUnit4;
/**
- * Special test suite with ignored tests for Binary mode.
+ * Tests for OOME on query.
*/
-@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+@RunWith(JUnit4.class)
+public class QueryOOMWithQueryParallelismTest extends AbstractQueryOOMTest {
+ /** {@inheritDoc} */
+ @Override protected int queryParallelism() {
+ return 4;
+ }
}
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithoutQueryParallelismTest.java
similarity index 72%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithoutQueryParallelismTest.java
index 0a6181e..2db821a 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithoutQueryParallelismTest.java
@@ -15,15 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.testsuites;
+package org.apache.ignite.internal.processors.query.oom;
import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+import org.junit.runners.JUnit4;
/**
- * Special test suite with ignored tests for Binary mode.
+ * Tests for OOME on query.
*/
-@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+@RunWith(JUnit4.class)
+public class QueryOOMWithoutQueryParallelismTest extends AbstractQueryOOMTest {
+ /** {@inheritDoc} */
+ @Override protected int queryParallelism() {
+ return 1;
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 91d783f..359eff5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -177,6 +177,8 @@ import org.apache.ignite.internal.processors.client.ClientConnectorConfiguration
import org.apache.ignite.internal.processors.database.baseline.IgniteStableBaselineBinObjFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.query.IgniteCachelessQueriesSelfTest;
import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest;
+import org.apache.ignite.internal.processors.query.IgniteQueryTableLockAndConnectionPoolLazyModeOffTest;
+import org.apache.ignite.internal.processors.query.IgniteQueryTableLockAndConnectionPoolLazyModeOnTest;
import org.apache.ignite.internal.processors.query.IgniteSqlDefaultValueTest;
import org.apache.ignite.internal.processors.query.IgniteSqlDistributedJoinSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
@@ -193,7 +195,6 @@ import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTe
import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
-import org.apache.ignite.internal.processors.query.LazyQuerySelfTest;
import org.apache.ignite.internal.processors.query.MultipleStatementsSqlQuerySelfTest;
import org.apache.ignite.internal.processors.query.RunningQueriesTest;
import org.apache.ignite.internal.processors.query.SqlIllegalSchemaSelfTest;
@@ -300,7 +301,8 @@ import org.junit.runners.Suite;
IgniteDynamicSqlRestoreTest.class,
// Queries tests.
- LazyQuerySelfTest.class,
+ IgniteQueryTableLockAndConnectionPoolLazyModeOnTest.class,
+ IgniteQueryTableLockAndConnectionPoolLazyModeOffTest.class,
IgniteSqlSplitterSelfTest.class,
SqlPushDownFunctionTest.class,
IgniteSqlSegmentedIndexSelfTest.class,
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite.java
similarity index 59%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite.java
index 0a6181e..09915bb 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite.java
@@ -15,15 +15,28 @@
* limitations under the License.
*/
-package org.apache.ignite.testsuites;
+package org.apache.ignite.testsuites.nightly;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testsuites.IgniteBinaryCacheQueryTestSuite;
+import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
/**
- * Special test suite with ignored tests for Binary mode.
+ * Test suite for cache queries with lazy mode.
*/
@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+@Suite.SuiteClasses({
+ IgniteBinaryCacheQueryTestSuite.class,
+})
+public class IgniteBinaryCacheQueryLazyTestSuite {
+ /**
+ * Setup lazy mode default.
+ */
+ @BeforeClass
+ public static void setupLazy() {
+ GridTestUtils.setFieldValue(SqlFieldsQuery.class, "DFLT_LAZY", true);
+ }
}
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite2.java
similarity index 59%
rename from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
rename to modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite2.java
index 0a6181e..164a725 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite2.java
@@ -15,15 +15,28 @@
* limitations under the License.
*/
-package org.apache.ignite.testsuites;
+package org.apache.ignite.testsuites.nightly;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testsuites.IgniteBinaryCacheQueryTestSuite2;
+import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
/**
- * Special test suite with ignored tests for Binary mode.
+ * Test suite for cache queries with lazy mode.
*/
@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+@Suite.SuiteClasses({
+ IgniteBinaryCacheQueryTestSuite2.class,
+})
+public class IgniteBinaryCacheQueryLazyTestSuite2 {
+ /**
+ * Setup lazy mode default.
+ */
+ @BeforeClass
+ public static void setupLazy() {
+ GridTestUtils.setFieldValue(SqlFieldsQuery.class, "DFLT_LAZY", true);
+ }
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
index ef5623e..bee6ffd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
@@ -765,11 +765,11 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
public void TestSqlQueryTimeout()
{
var cache = Cache();
- PopulateCache(cache, false, 20000, x => true);
+ PopulateCache(cache, false, 30000, x => true);
- var sqlQry = new SqlQuery(typeof(QueryPerson), "WHERE age < 500 AND name like '%1%'")
+ var sqlQry = new SqlQuery(typeof(QueryPerson), "WHERE age < 2000")
{
- Timeout = TimeSpan.FromMilliseconds(2)
+ Timeout = TimeSpan.FromMilliseconds(1)
};
// ReSharper disable once ReturnValueOfPureMethodIsNotUsed
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs
index 02d13f6..760a48d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs
@@ -150,9 +150,9 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
{
var cache = GetClientCache<Person>();
- cache.PutAll(Enumerable.Range(1, 30000).ToDictionary(x => x, x => new Person(x)));
+ cache.PutAll(Enumerable.Range(1, 1000).ToDictionary(x => x, x => new Person(x)));
- var qry = new SqlFieldsQuery("select * from Person where Name like '%ers%'")
+ var qry = new SqlFieldsQuery("select * from Person p0, Person p1, Person p2")
{
Timeout = TimeSpan.FromMilliseconds(1)
};
diff --git a/modules/platforms/nodejs/spec/query/SqlFieldsQuery.spec.js b/modules/platforms/nodejs/spec/query/SqlFieldsQuery.spec.js
index c81838a..bf75759 100644
--- a/modules/platforms/nodejs/spec/query/SqlFieldsQuery.spec.js
+++ b/modules/platforms/nodejs/spec/query/SqlFieldsQuery.spec.js
@@ -77,12 +77,31 @@ describe('sql fields query test suite >', () => {
catch(error => done.fail(error));
});
- it('get all with page size', (done) => {
+ it('get all with page size lazy true', (done) => {
Promise.resolve().
then(async () => {
let cache = igniteClient.getCache(CACHE_NAME);
const cursor = await cache.query(
- new SqlFieldsQuery(`SELECT * FROM ${TABLE_NAME}`).setPageSize(1));
+ new SqlFieldsQuery(`SELECT * FROM ${TABLE_NAME}`).setPageSize(1).setLazy(true));
+ const set = new Set();
+ for (let fields of await cursor.getAll()) {
+ expect(fields.length).toBe(2);
+ expect(generateValue(fields[0]) === fields[1]).toBe(true);
+ set.add(fields[0]);
+ expect(fields[0] >= 0 && fields[0] < ELEMENTS_NUMBER).toBe(true);
+ }
+ expect(set.size).toBe(ELEMENTS_NUMBER);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('get all with page size lazy false', (done) => {
+ Promise.resolve().
+ then(async () => {
+ let cache = igniteClient.getCache(CACHE_NAME);
+ const cursor = await cache.query(
+ new SqlFieldsQuery(`SELECT * FROM ${TABLE_NAME}`).setPageSize(1).setLazy(false));
const set = new Set();
for (let fields of await cursor.getAll()) {
expect(fields.length).toBe(2);
diff --git a/modules/platforms/php/tests/SqlFieldsQueryTest.php b/modules/platforms/php/tests/SqlFieldsQueryTest.php
index 3e74ca0..802f738 100644
--- a/modules/platforms/php/tests/SqlFieldsQueryTest.php
+++ b/modules/platforms/php/tests/SqlFieldsQueryTest.php
@@ -64,10 +64,22 @@ final class SqlFieldsQueryTestCase extends TestCase
$this->assertEquals($set->count(), self::ELEMENTS_NUMBER);
}
- public function testGetAllWithPageSize(): void
+ public function testGetAllWithPageSizeLazyTrue(): void
{
$cache = self::$cache;
- $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(1));
+ $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(1)->setLazy(true));
+ $set = new Set();
+ foreach ($cursor->getAll() as $fields) {
+ $this->checkCursorResult($fields);
+ $set->add($fields[0]);
+ }
+ $this->assertEquals($set->count(), self::ELEMENTS_NUMBER);
+ }
+
+ public function testGetAllWithPageSizeLazyFalse(): void
+ {
+ $cache = self::$cache;
+ $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(1)->setLazy(false));
$set = new Set();
foreach ($cursor->getAll() as $fields) {
$this->checkCursorResult($fields);
@@ -88,10 +100,22 @@ final class SqlFieldsQueryTestCase extends TestCase
$this->assertEquals($set->count(), self::ELEMENTS_NUMBER);
}
- public function testIterateCursorWithPageSize(): void
+ public function testIterateCursorWithPageSizeLazyTrue(): void
+ {
+ $cache = self::$cache;
+ $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(2)->setLazy(true));
+ $set = new Set();
+ foreach ($cursor as $fields) {
+ $this->checkCursorResult($fields);
+ $set->add($fields[0]);
+ }
+ $this->assertEquals($set->count(), self::ELEMENTS_NUMBER);
+ }
+
+ public function testIterateCursorWithPageSizeLazyFalse(): void
{
$cache = self::$cache;
- $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(2));
+ $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(2)->setLazy(false));
$set = new Set();
foreach ($cursor as $fields) {
$this->checkCursorResult($fields);
diff --git a/modules/yardstick/config/benchmark-native-sql-select-join.properties b/modules/yardstick/config/benchmark-native-sql-select-join.properties
index f32622a..dc280d1 100644
--- a/modules/yardstick/config/benchmark-native-sql-select-join.properties
+++ b/modules/yardstick/config/benchmark-native-sql-select-join.properties
@@ -24,8 +24,6 @@ JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false"
# Uncomment to enable concurrent garbage collection (GC) if you encounter long GC pauses.
JVM_OPTS=${JVM_OPTS}" \
--Xms8g \
--Xmx8g \
-Xloggc:./gc${now0}.log \
-XX:+PrintGCDetails \
-verbose:gc \
@@ -51,8 +49,8 @@ RESTART_SERVERS=true
# BENCHMARK_WRITER=
# The benchmark is applicable only for 1 server and 1 driver
-SERVER_HOSTS=""
-DRIVER_HOSTS=127.0.0.1
+SERVER_HOSTS="127.0.0.1,127.0.0.1"
+DRIVER_HOSTS="127.0.0.1,127.0.0.1,127.0.0.1"
# Remote username.
# REMOTE_USER=
diff --git a/modules/yardstick/config/benchmark-native-sql-select.properties b/modules/yardstick/config/benchmark-native-sql-select.properties
index 0f0b606..8da3541 100644
--- a/modules/yardstick/config/benchmark-native-sql-select.properties
+++ b/modules/yardstick/config/benchmark-native-sql-select.properties
@@ -24,8 +24,6 @@ JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false"
# Uncomment to enable concurrent garbage collection (GC) if you encounter long GC pauses.
JVM_OPTS=${JVM_OPTS}" \
--Xms8g \
--Xmx8g \
-Xloggc:./gc${now0}.log \
-XX:+PrintGCDetails \
-verbose:gc \
@@ -51,8 +49,8 @@ RESTART_SERVERS=true
# BENCHMARK_WRITER=
# The benchmark is applicable only for 1 server and 1 driver
-SERVER_HOSTS=127.0.0.1
-DRIVER_HOSTS=127.0.0.1
+SERVER_HOSTS="127.0.0.1,127.0.0.1"
+DRIVER_HOSTS="127.0.0.1,127.0.0.1,127.0.0.1"
# Remote username.
# REMOTE_USER=
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java
index 2b6c186..23fd04f 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java
@@ -17,6 +17,7 @@
package org.apache.ignite.yardstick.jdbc;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteEx;
@@ -31,43 +32,60 @@ import static org.yardstickframework.BenchmarkUtils.println;
public class JdbcUtils {
/**
* Common method to fill test stand with data.
+ *
* @param cfg Benchmark configuration.
* @param ignite Ignite node.
* @param range Data key range.
*/
- public static void fillData(BenchmarkConfiguration cfg, IgniteEx ignite, long range, CacheAtomicityMode atomicMode) {
- println(cfg, "Create table...");
+ public static void fillData(BenchmarkConfiguration cfg, IgniteEx ignite, long range,
+ CacheAtomicityMode atomicMode) {
+ IgniteSemaphore sem = ignite.semaphore("sql-setup", 1, true, true);
- StringBuilder qry = new StringBuilder("CREATE TABLE test_long (id LONG PRIMARY KEY, val LONG)" +
- " WITH \"wrap_value=true");
+ try {
+ if (sem.tryAcquire()) {
+ println(cfg, "Create table...");
- if (atomicMode != null)
- qry.append(", atomicity=").append(atomicMode.name());
+ StringBuilder qry = new StringBuilder("CREATE TABLE test_long (id LONG PRIMARY KEY, val LONG)" +
+ " WITH \"wrap_value=true");
- qry.append("\";");
+ if (atomicMode != null)
+ qry.append(", atomicity=").append(atomicMode.name());
- String qryStr = qry.toString();
+ qry.append("\";");
- println(cfg, "Creating table with schema: " + qryStr);
+ String qryStr = qry.toString();
- GridQueryProcessor qProc = ignite.context().query();
+ println(cfg, "Creating table with schema: " + qryStr);
- qProc.querySqlFields(
- new SqlFieldsQuery(qryStr), true);
+ GridQueryProcessor qProc = ignite.context().query();
- println(cfg, "Populate data...");
+ qProc.querySqlFields(
+ new SqlFieldsQuery(qryStr), true);
- for (long l = 1; l <= range; ++l) {
- qProc.querySqlFields(
- new SqlFieldsQuery("INSERT INTO test_long (id, val) VALUES (?, ?)")
- .setArgs(l, l + 1), true);
+ println(cfg, "Populate data...");
- if (l % 10000 == 0)
- println(cfg, "Populate " + l);
- }
+ for (long l = 1; l <= range; ++l) {
+ qProc.querySqlFields(
+ new SqlFieldsQuery("INSERT INTO test_long (id, val) VALUES (?, ?)")
+ .setArgs(l, l + 1), true);
+
+ if (l % 10000 == 0)
+ println(cfg, "Populate " + l);
+ }
+
+ qProc.querySqlFields(new SqlFieldsQuery("CREATE INDEX val_idx ON test_long (val)"), true);
- qProc.querySqlFields(new SqlFieldsQuery("CREATE INDEX val_idx ON test_long (val)"), true);
+ println(cfg, "Finished populating data");
+ }
+ else {
+ // Acquire (wait setup by other client) and immediately release/
+ println(cfg, "Waits for setup...");
- println(cfg, "Finished populating data");
+ sem.acquire();
+ }
+ }
+ finally {
+ sem.release();
+ }
}
}
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlJoinQueryRangeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlJoinQueryRangeBenchmark.java
index f660b8c..191d6a0 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlJoinQueryRangeBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlJoinQueryRangeBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.ignite.yardstick.jdbc;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteEx;
@@ -50,7 +51,7 @@ public class NativeSqlJoinQueryRangeBenchmark extends IgniteAbstractBenchmark {
qry = new SqlFieldsQuery("SELECT * FROM person p join organization o on p.orgId=o.id WHERE p.id = ? " +
"order by p.id");
- qry.setArgs(ThreadLocalRandom.current().nextLong(args.range()) + 1);
+ qry.setArgs(ThreadLocalRandom.current().nextLong(args.range()));
expRsSize = 1;
}
@@ -63,7 +64,7 @@ public class NativeSqlJoinQueryRangeBenchmark extends IgniteAbstractBenchmark {
qry = new SqlFieldsQuery("SELECT * FROM person p join organization o on p.orgId=o.id WHERE p.id BETWEEN ? AND ?" +
"order by p.id");
- long id = ThreadLocalRandom.current().nextLong(args.range() - args.sqlRange()) + 1;
+ long id = ThreadLocalRandom.current().nextLong(args.range() - args.sqlRange());
long maxId = id + args.sqlRange() - 1;
qry.setArgs(id, maxId);
@@ -74,14 +75,16 @@ public class NativeSqlJoinQueryRangeBenchmark extends IgniteAbstractBenchmark {
long rsSize = 0;
try (FieldsQueryCursor<List<?>> cursor = ((IgniteEx)ignite()).context().query()
- .querySqlFields(qry, false)) {
+ .querySqlFields(qry, false)) {
for (List<?> r : cursor)
rsSize++;
}
- if (rsSize != expRsSize)
- throw new Exception("Invalid result set size [actual=" + rsSize + ", expected=" + expRsSize + ']');
+ if (rsSize != expRsSize) {
+ throw new Exception("Invalid result set size [actual=" + rsSize + ", expected=" + expRsSize
+ + ", qry=" + qry + ']');
+ }
return true;
}
@@ -90,46 +93,61 @@ public class NativeSqlJoinQueryRangeBenchmark extends IgniteAbstractBenchmark {
@Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
super.setUp(cfg);
- qry = ((IgniteEx)ignite()).context().query();
+ IgniteSemaphore sem = ignite().semaphore("sql-setup", 1, true, true);
+
+ try {
+ if (sem.tryAcquire()) {
+ qry = ((IgniteEx)ignite()).context().query();
+
+ StringBuilder withExpr = new StringBuilder(" WITH \"AFFINITY_KEY=orgId,");
- StringBuilder withExpr = new StringBuilder(" WITH \"AFFINITY_KEY=orgId,");
+ if (args.atomicMode() != null)
+ withExpr.append("atomicity=").append(args.atomicMode().name()).append(",");
- if (args.atomicMode() != null)
- withExpr.append("atomicity=").append(args.atomicMode().name()).append(",");
+ if (args.partitionedCachesNumber() == 1)
+ withExpr.append("template=replicated");
+ else
+ withExpr.append("template=partitioned");
- if (args.partitionedCachesNumber() == 1)
- withExpr.append("template=replicated");
- else
- withExpr.append("template=partitioned");
+ withExpr.append("\"");
- withExpr.append("\"");
+ qry.querySqlFields(
+ new SqlFieldsQuery("CREATE TABLE person (id long, orgId long, name varchar, PRIMARY KEY (id, orgId))" + withExpr), true);
- qry.querySqlFields(
- new SqlFieldsQuery("CREATE TABLE person (id long, orgId long, name varchar, PRIMARY KEY (id, orgId))" + withExpr), true);
+ withExpr = new StringBuilder(" WITH \"");
- withExpr = new StringBuilder(" WITH \"");
+ if (args.atomicMode() != null)
+ withExpr.append("atomicity=").append(args.atomicMode().name()).append(",");
- if (args.atomicMode() != null)
- withExpr.append("atomicity=").append(args.atomicMode().name()).append(",");
+ withExpr.append("template=partitioned");
- withExpr.append("template=partitioned");
+ withExpr.append("\"");
- withExpr.append("\"");
+ qry.querySqlFields(
+ new SqlFieldsQuery("CREATE TABLE organization (id long primary key, name varchar)" + withExpr), true);
- qry.querySqlFields(
- new SqlFieldsQuery("CREATE TABLE organization (id long primary key, name varchar)" + withExpr), true);
+ for (long k = 0; k <= args.range(); ++k) {
+ qry.querySqlFields(new SqlFieldsQuery("insert into person (id, orgId, name) values (?, ?, ?)")
+ .setArgs(k, k / 10, "person " + k), true).getAll();
- for (long k = 1; k <= args.range(); ++k) {
- qry.querySqlFields(new SqlFieldsQuery("insert into person (id, orgId, name) values (?, ?, ?)")
- .setArgs(k, k / 10, "person " + k), true).getAll();
+ if (k % 10 == 0) {
+ qry.querySqlFields(new SqlFieldsQuery("insert into organization (id, name) values (?, ?)")
+ .setArgs(k / 10, "organization " + k / 10), true).getAll();
+ }
- if (k % 10 == 0) {
- qry.querySqlFields(new SqlFieldsQuery("insert into organization (id, name) values (?, ?)")
- .setArgs(k / 10, "organization " + k / 10), true).getAll();
+ if (k % 10000 == 0)
+ println(cfg, "Populate " + k);
+ }
}
+ else {
+ // Acquire (wait setup by other client) and immediately release/
+ println(cfg, "Waits for setup...");
- if (k % 10000 == 0)
- println(cfg, "Populate " + k);
+ sem.acquire();
+ }
+ }
+ finally {
+ sem.release();
}
}
}