You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/23 15:01:42 UTC
[38/50] [abbrv] ignite git commit: IGNITE-9960: SQL: Reverted
IGNITE-9171 and IGNITE-9864 until performance is fixed. This closes #5045.
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 9b7d268..f228111 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -40,6 +40,7 @@ import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -57,12 +58,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.CompoundLockFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -70,10 +71,8 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable;
import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
@@ -98,13 +97,13 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
-import org.h2.api.ErrorCode;
+import org.apache.ignite.thread.IgniteThread;
import org.h2.command.Prepared;
import org.h2.jdbc.JdbcResultSet;
-import org.h2.jdbc.JdbcSQLException;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_FORCE_LAZY_RESULT_SET;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
@@ -124,6 +123,9 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2V
@SuppressWarnings("ForLoopReplaceableByForEach")
public class GridMapQueryExecutor {
/** */
+ public static final boolean FORCE_LAZY = IgniteSystemProperties.getBoolean(IGNITE_SQL_FORCE_LAZY_RESULT_SET);
+
+ /** */
private IgniteLogger log;
/** */
@@ -147,8 +149,8 @@ public class GridMapQueryExecutor {
/** Busy lock for lazy workers. */
private final GridSpinBusyLock lazyWorkerBusyLock = new GridSpinBusyLock();
- /** Stop guard. */
- private final AtomicBoolean stopGuard = new AtomicBoolean();
+ /** Lazy worker stop guard. */
+ private final AtomicBoolean lazyWorkerStopGuard = new AtomicBoolean();
/**
* @param busyLock Busy lock.
@@ -205,21 +207,18 @@ public class GridMapQueryExecutor {
}
/**
- * Stop query map executor, cleanup resources.
+ * Cancel active lazy queries and prevent submit of new queries.
*/
- public void stop() {
- if (!stopGuard.compareAndSet(false, true))
+ public void cancelLazyWorkers() {
+ if (!lazyWorkerStopGuard.compareAndSet(false, true))
return;
- for (MapNodeResults res : qryRess.values())
- res.cancelAll();
-
- for (MapQueryLazyWorker w : lazyWorkers.values())
- w.stop(true);
-
lazyWorkerBusyLock.block();
- assert lazyWorkers.isEmpty() : "Not cleaned lazy workers: " + lazyWorkers.size();
+ for (MapQueryLazyWorker worker : lazyWorkers.values())
+ worker.stop(false);
+
+ lazyWorkers.clear();
}
/**
@@ -260,7 +259,7 @@ public class GridMapQueryExecutor {
* @return Busy lock for lazy workers to guard their operations with.
*/
GridSpinBusyLock busyLock() {
- return lazyWorkerBusyLock;
+ return busyLock;
}
/**
@@ -555,7 +554,6 @@ public class GridMapQueryExecutor {
/**
* @param node Node.
* @param req Query request.
- * @throws IgniteCheckedException On error.
*/
private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException {
int[] qryParts = req.queryPartitions();
@@ -568,14 +566,10 @@ public class GridMapQueryExecutor {
req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
- final GridDhtTxLocalAdapter tx;
-
- GridH2SelectForUpdateTxDetails txReq = req.txDetails();
-
final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
- final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY) && txReq == null;
+ final boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
final List<Integer> cacheIds = req.caches();
@@ -584,6 +578,10 @@ public class GridMapQueryExecutor {
final Object[] params = req.parameters();
+ final GridDhtTxLocalAdapter tx;
+
+ GridH2SelectForUpdateTxDetails txReq = req.txDetails();
+
try {
if (txReq != null) {
// Prepare to run queries.
@@ -738,11 +736,7 @@ public class GridMapQueryExecutor {
* @param parts Explicit partitions for current node.
* @param pageSize Page size.
* @param distributedJoinMode Query distributed join mode.
- * @param enforceJoinOrder Enforce join order flag.
- * @param replicated Replicated flag.
- * @param timeout Query timeout.
- * @param params Query params.
- * @param lazy Lazy query execution flag.
+ * @param lazy Streaming flag.
* @param mvccSnapshot MVCC snapshot.
* @param tx Transaction.
* @param txDetails TX details, if it's a {@code FOR UPDATE} request, or {@code null}.
@@ -771,24 +765,75 @@ public class GridMapQueryExecutor {
@Nullable final GridH2SelectForUpdateTxDetails txDetails,
@Nullable final CompoundLockFuture lockFut,
@Nullable final AtomicInteger runCntr) {
+ MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
+
// In presence of TX, we also must always have matching details.
assert tx == null || txDetails != null;
- assert !lazy || txDetails == null : "Lazy execution of SELECT FOR UPDATE queries is not supported.";
-
boolean inTx = (tx != null);
- MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
+ if (lazy && worker == null) {
+ // Lazy queries must be re-submitted to dedicated workers.
+ MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
+ worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
+
+ worker.submit(new Runnable() {
+ @Override public void run() {
+ onQueryRequest0(
+ node,
+ reqId,
+ segmentId,
+ schemaName,
+ qrys,
+ cacheIds,
+ topVer,
+ partsMap,
+ parts,
+ pageSize,
+ distributedJoinMode,
+ enforceJoinOrder,
+ replicated,
+ timeout,
+ params,
+ true,
+ mvccSnapshot,
+ tx,
+ txDetails,
+ lockFut,
+ runCntr);
+ }
+ });
+
+ if (lazyWorkerBusyLock.enterBusy()) {
+ try {
+ MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker);
+
+ if (oldWorker != null)
+ oldWorker.stop(false);
- if (lazy && worker == null)
- worker = createLazyWorker(node, reqId, segmentId);
+ IgniteThread thread = new IgniteThread(worker);
+
+ thread.start();
+ }
+ finally {
+ lazyWorkerBusyLock.leaveBusy();
+ }
+ }
+ else
+ log.info("Ignored query request (node is stopping) [nodeId=" + node.id() + ", reqId=" + reqId + ']');
+
+ return;
+ }
+
+ if (lazy && txDetails != null)
+ throw new IgniteSQLException("Lazy execution of SELECT FOR UPDATE queries is not supported.");
// Prepare to run queries.
GridCacheContext<?, ?> mainCctx = mainCacheContext(cacheIds);
MapNodeResults nodeRess = resultsForNode(node.id());
- MapQueryResults qryResults = null;
+ MapQueryResults qr = null;
List<GridReservable> reserved = new ArrayList<>();
@@ -802,7 +847,7 @@ public class GridMapQueryExecutor {
if (!F.isEmpty(err)) {
// Unregister lazy worker because re-try may never reach this node again.
if (lazy)
- worker.stop(false);
+ stopAndUnregisterCurrentLazyWorker();
sendRetry(node, reqId, segmentId, err);
@@ -810,7 +855,10 @@ public class GridMapQueryExecutor {
}
}
- qryResults = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, worker, inTx);
+ qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, MapQueryLazyWorker.currentWorker(), inTx);
+
+ if (nodeRess.put(reqId, segmentId, qr) != null)
+ throw new IllegalStateException();
// Prepare query context.
GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
@@ -824,207 +872,186 @@ public class GridMapQueryExecutor {
.pageSize(pageSize)
.topologyVersion(topVer)
.reservations(reserved)
- .mvccSnapshot(mvccSnapshot);
+ .mvccSnapshot(mvccSnapshot)
+ .lazyWorker(worker);
+
+ Connection conn = h2.connectionForSchema(schemaName);
+
+ H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
+
+ GridH2QueryContext.set(qctx);
// qctx is set, we have to release reservations inside of it.
reserved = null;
- if (worker != null)
- worker.queryContext(qctx);
+ try {
+ if (nodeRess.cancelled(reqId)) {
+ GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
- GridH2QueryContext.set(qctx);
+ nodeRess.cancelRequest(reqId);
- if (nodeRess.put(reqId, segmentId, qryResults) != null)
- throw new IllegalStateException();
+ throw new QueryCancelledException();
+ }
- Connection conn = h2.connectionForSchema(schemaName);
+ // Run queries.
+ int qryIdx = 0;
- H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder, lazy);
+ boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
- if (nodeRess.cancelled(reqId)) {
- GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
+ for (GridCacheSqlQuery qry : qrys) {
+ ResultSet rs = null;
- nodeRess.cancelRequest(reqId);
+ boolean removeMapping = false;
- throw new QueryCancelledException();
- }
+ // If we are not the target node for this replicated query, just ignore it.
+ if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
+ String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params));
- // Run queries.
- int qryIdx = 0;
+ PreparedStatement stmt;
- boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+ try {
+ stmt = h2.prepareStatement(conn, sql, true);
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e);
+ }
- for (GridCacheSqlQuery qry : qrys) {
- ResultSet rs = null;
+ Prepared p = GridSqlQueryParser.prepared(stmt);
- boolean removeMapping = false;
+ if (GridSqlQueryParser.isForUpdateQuery(p)) {
+ sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx);
+ stmt = h2.prepareStatement(conn, sql, true);
+ }
- // If we are not the target node for this replicated query, just ignore it.
- if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
- String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params));
+ h2.bindParameters(stmt, params0);
- PreparedStatement stmt;
+ int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx);
- try {
- stmt = h2.prepareStatement(conn, sql, true);
- }
- catch (SQLException e) {
- throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e);
- }
+ rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qr.queryCancel(qryIdx));
- Prepared p = GridSqlQueryParser.prepared(stmt);
+ if (inTx) {
+ ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future(
+ ctx.localNodeId(),
+ txDetails.version(),
+ mvccSnapshot,
+ txDetails.threadId(),
+ IgniteUuid.randomUuid(),
+ txDetails.miniId(),
+ parts,
+ tx,
+ opTimeout,
+ mainCctx,
+ rs
+ );
- if (GridSqlQueryParser.isForUpdateQuery(p)) {
- sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx);
- stmt = h2.prepareStatement(conn, sql, true);
- }
+ if (lockFut != null)
+ lockFut.register(enlistFut);
- h2.bindParameters(stmt, params0);
+ enlistFut.init();
- int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx);
+ enlistFut.get();
- rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qryResults.queryCancel(qryIdx));
+ rs.beforeFirst();
+ }
- if (inTx) {
- ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future(
- ctx.localNodeId(),
- txDetails.version(),
- mvccSnapshot,
- txDetails.threadId(),
- IgniteUuid.randomUuid(),
- txDetails.miniId(),
- parts,
- tx,
- opTimeout,
- mainCctx,
- rs
- );
-
- if (lockFut != null)
- lockFut.register(enlistFut);
-
- enlistFut.init();
-
- enlistFut.get();
-
- rs.beforeFirst();
- }
+ if (evt) {
+ ctx.event().record(new CacheQueryExecutedEvent<>(
+ node,
+ "SQL query executed.",
+ EVT_CACHE_QUERY_EXECUTED,
+ CacheQueryType.SQL.name(),
+ mainCctx.name(),
+ null,
+ qry.query(),
+ null,
+ null,
+ params,
+ node.id(),
+ null));
+ }
- if (evt) {
- ctx.event().record(new CacheQueryExecutedEvent<>(
- node,
- "SQL query executed.",
- EVT_CACHE_QUERY_EXECUTED,
- CacheQueryType.SQL.name(),
- mainCctx.name(),
- null,
- qry.query(),
- null,
- null,
- params,
- node.id(),
- null));
+ assert rs instanceof JdbcResultSet : rs.getClass();
}
- assert rs instanceof JdbcResultSet : rs.getClass();
- }
-
- qryResults.addResult(qryIdx, qry, node.id(), rs, params);
+ qr.addResult(qryIdx, qry, node.id(), rs, params);
- if (qryResults.cancelled()) {
- qryResults.result(qryIdx).close();
+ if (qr.cancelled()) {
+ qr.result(qryIdx).close();
- throw new QueryCancelledException();
- }
+ throw new QueryCancelledException();
+ }
- if (inTx) {
- if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) {
- if (removeMapping = tx.empty() && !tx.queryEnlisted())
- tx.rollbackAsync().get();
+ if (inTx) {
+ if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) {
+ if (removeMapping = tx.empty() && !tx.queryEnlisted())
+ tx.rollbackAsync().get();
+ }
}
- }
- // Send the first page.
- if (lockFut == null)
- sendNextPage(nodeRess, node, qryResults, qryIdx, segmentId, pageSize, removeMapping);
- else {
- GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qryResults, qryIdx, segmentId, pageSize, removeMapping);
-
- if (msg != null) {
- lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
- @Override public void apply(IgniteInternalFuture<Void> future) {
- try {
- if (node.isLocal())
- h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
- else
- ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
- }
- catch (Exception e) {
- U.error(log, e);
+ // Send the first page.
+ if (lockFut == null)
+ sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping);
+ else {
+ GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping);
+
+ if (msg != null) {
+ lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
+ @Override public void apply(IgniteInternalFuture<Void> future) {
+ try {
+ if (node.isLocal())
+ h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
+ else
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+ }
+ catch (Exception e) {
+ U.error(log, e);
+ }
}
- }
- });
+ });
+ }
}
+
+ qryIdx++;
}
- qryIdx++;
+ // All request results are in the memory in result set already, so it's ok to release partitions.
+ if (!lazy)
+ releaseReservations();
}
-
- // All request results are in the memory in result set already, so it's ok to release partitions.
- if (!lazy)
+ catch (Throwable e){
releaseReservations();
- else if (!qryResults.isAllClosed()) {
- if (MapQueryLazyWorker.currentWorker() == null) {
- final ObjectPoolReusable<H2ConnectionWrapper> detachedConn = h2.detachConnection();
- worker.start(H2Utils.session(conn), detachedConn);
-
- GridH2QueryContext.clearThreadLocal();
- }
+ throw e;
}
- else
- unregisterLazyWorker(worker);
}
catch (Throwable e) {
- if (qryResults != null) {
- nodeRess.remove(reqId, segmentId, qryResults);
+ if (qr != null) {
+ nodeRess.remove(reqId, segmentId, qr);
- qryResults.close();
+ qr.cancel(false);
}
- else
- releaseReservations();
- // Stop and unregister worker after possible cancellation.
+ // Unregister worker after possible cancellation.
if (lazy)
- worker.stop(false);
-
- if (e instanceof QueryCancelledException)
- sendError(node, reqId, e);
- else {
- JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
+ stopAndUnregisterCurrentLazyWorker();
- if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
- sendError(node, reqId, new QueryCancelledException());
- else {
- GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
+ GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
- if (retryErr != null) {
- final String retryCause = String.format(
- "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
- "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
- );
+ if (retryErr != null) {
+ final String retryCause = String.format(
+ "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
+ "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
+ );
- sendRetry(node, reqId, segmentId, retryCause);
- }
- else {
- U.error(log, "Failed to execute local query.", e);
+ sendRetry(node, reqId, segmentId, retryCause);
+ }
+ else {
+ U.error(log, "Failed to execute local query.", e);
- sendError(node, reqId, e);
+ sendError(node, reqId, e);
- if (e instanceof Error)
- throw (Error)e;
- }
- }
+ if (e instanceof Error)
+ throw (Error)e;
}
}
finally {
@@ -1033,25 +1060,10 @@ public class GridMapQueryExecutor {
for (int i = 0; i < reserved.size(); i++)
reserved.get(i).release();
}
-
- if (MapQueryLazyWorker.currentWorker() == null && GridH2QueryContext.get() != null)
- GridH2QueryContext.clearThreadLocal();
}
}
/**
- * @param node The node has sent map query request.
- * @param reqId Request ID.
- * @param segmentId Segment ID.
- * @return Lazy worker.
- */
- private MapQueryLazyWorker createLazyWorker(ClusterNode node, long reqId, int segmentId) {
- MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
-
- return new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
- }
-
- /**
* @param cacheIds Cache ids.
* @return Id of the first cache in list, or {@code null} if list is empty.
*/
@@ -1076,7 +1088,6 @@ public class GridMapQueryExecutor {
/**
* @param node Node.
* @param req DML request.
- * @throws IgniteCheckedException On error.
*/
private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) throws IgniteCheckedException {
int[] parts = req.queryPartitions();
@@ -1244,34 +1255,24 @@ public class GridMapQueryExecutor {
return;
}
- final MapQueryResults qryResults = nodeRess.get(req.queryRequestId(), req.segmentId());
+ final MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
- if (qryResults == null)
+ if (qr == null)
sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
- else if (qryResults.cancelled())
+ else if (qr.cancelled())
sendError(node, req.queryRequestId(), new QueryCancelledException());
else {
- MapQueryLazyWorker lazyWorker = qryResults.lazyWorker();
+ MapQueryLazyWorker lazyWorker = qr.lazyWorker();
if (lazyWorker != null) {
lazyWorker.submit(new Runnable() {
@Override public void run() {
- try {
- sendNextPage(nodeRess, node, qryResults, req.query(), req.segmentId(), req.pageSize(), false);
- }
- catch (Throwable e) {
- JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
-
- if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
- sendError(node, qryResults.queryRequestId(), new QueryCancelledException());
- else
- throw e;
- }
+ sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false);
}
});
}
else
- sendNextPage(nodeRess, node, qryResults, req.query(), req.segmentId(), req.pageSize(), false);
+ sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false);
}
}
@@ -1286,14 +1287,8 @@ public class GridMapQueryExecutor {
* @return Next page.
* @throws IgniteCheckedException If failed.
*/
- private GridQueryNextPageResponse prepareNextPage(
- MapNodeResults nodeRess,
- ClusterNode node,
- MapQueryResults qr,
- int qry,
- int segmentId,
- int pageSize,
- boolean removeMapping) throws IgniteCheckedException {
+ private GridQueryNextPageResponse prepareNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
+ int pageSize, boolean removeMapping) throws IgniteCheckedException {
MapQueryResult res = qr.result(qry);
assert res != null;
@@ -1314,11 +1309,8 @@ public class GridMapQueryExecutor {
nodeRess.remove(qr.queryRequestId(), segmentId, qr);
// Release reservations if the last page fetched, all requests are closed and this is a lazy worker.
- if (qr.lazyWorker() != null) {
+ if (MapQueryLazyWorker.currentWorker() != null)
releaseReservations();
-
- qr.lazyWorker().stop(false);
- }
}
}
@@ -1350,14 +1342,8 @@ public class GridMapQueryExecutor {
* @param removeMapping Remove mapping flag.
*/
@SuppressWarnings("unchecked")
- private void sendNextPage(
- MapNodeResults nodeRess,
- ClusterNode node,
- MapQueryResults qr,
- int qry,
- int segmentId,
- int pageSize,
- boolean removeMapping) {
+ private void sendNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
+ int pageSize, boolean removeMapping) {
try {
GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, removeMapping);
@@ -1379,7 +1365,6 @@ public class GridMapQueryExecutor {
* @param node Node.
* @param reqId Request ID.
* @param segmentId Index segment ID.
- * @param retryCause Description of the retry cause.
*/
private void sendRetry(ClusterNode node, long reqId, int segmentId, String retryCause) {
try {
@@ -1416,11 +1401,25 @@ public class GridMapQueryExecutor {
}
/**
+ * Unregister lazy worker if needed (i.e. if we are currently in lazy worker thread).
+ */
+ public void stopAndUnregisterCurrentLazyWorker() {
+ MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
+
+ if (worker != null) {
+ worker.stop(false);
+
+ // Just stop is not enough as worker may be registered, but not started due to exception.
+ unregisterLazyWorker(worker);
+ }
+ }
+
+ /**
* Unregister lazy worker.
*
* @param worker Worker.
*/
- void unregisterLazyWorker(MapQueryLazyWorker worker) {
+ public void unregisterLazyWorker(MapQueryLazyWorker worker) {
lazyWorkers.remove(worker.key(), worker);
}
@@ -1430,17 +1429,4 @@ public class GridMapQueryExecutor {
public int registeredLazyWorkers() {
return lazyWorkers.size();
}
-
- /**
- * @param worker Worker to register.
- */
- void registerLazyWorker(MapQueryLazyWorker worker) {
- MapQueryLazyWorker oldWorker = lazyWorkers.put(worker.key(), worker);
-
- if (oldWorker != null) {
- log.warning("Duplicates lazy worker: [key=" + worker.key() + ']');
-
- oldWorker.stop(false);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index d9c542b..62c5c78 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -287,16 +287,11 @@ public class GridReduceQueryExecutor {
*/
private void fail(ReduceQueryRun r, UUID nodeId, String msg, byte failCode) {
if (r != null) {
- CacheException e;
+ CacheException e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
+ ", errMsg=" + msg + ']');
- if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR) {
- e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
- ", errMsg=" + msg + ']', new QueryCancelledException());
- }
- else {
- e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
- ", errMsg=" + msg + ']');
- }
+ if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR)
+ e.addSuppressed(new QueryCancelledException());
r.setStateOnException(nodeId, e);
}
@@ -1223,9 +1218,6 @@ public class GridReduceQueryExecutor {
}
}
- r.setStateOnException(ctx.localNodeId(),
- new CacheException("Query is canceled.", new QueryCancelledException()));
-
if (!runs.remove(qryReqId, r))
U.warn(log, "Query run was already removed: " + qryReqId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index 217cfad..0cb986b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -17,13 +17,12 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import java.util.NoSuchElementException;
-import java.util.RandomAccess;
import java.util.UUID;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
@@ -73,11 +72,9 @@ public class GridResultPage {
Collection<?> plainRows = res.plainRows();
if (plainRows != null) {
- assert plainRows instanceof RandomAccess : "instance of " + plainRows.getClass();
-
rowsInPage = plainRows.size();
- if (rowsInPage == 0 || ((List<Value[]>)plainRows).get(0).length == res.columns())
+ if (rowsInPage == 0 || ((ArrayList<Value[]>)plainRows).get(0).length == res.columns())
rows = (Iterator<Value[]>)plainRows.iterator();
else {
// If it's a result of SELECT FOR UPDATE (we can tell by difference in number
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
index 8f8553a..48116d3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
@@ -17,11 +17,12 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
@@ -85,10 +86,10 @@ class MapNodeResults {
public void cancelRequest(long reqId) {
for (MapRequestKey key : res.keySet()) {
if (key.requestId() == reqId) {
- final MapQueryResults removed = res.remove(key);
+ MapQueryResults removed = res.remove(key);
if (removed != null)
- removed.cancel();
+ removed.cancel(true);
}
}
@@ -143,7 +144,7 @@ class MapNodeResults {
*/
public void cancelAll() {
for (MapQueryResults ress : res.values())
- ress.cancel();
+ ress.cancel(true);
// Cancel update requests
for (GridQueryCancel upd: updCancels.values())
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
index 1cbab19..98f3df9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
@@ -20,41 +20,25 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
-import org.apache.ignite.internal.processors.query.h2.H2Utils;
-import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
-import org.h2.engine.Session;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
-
/**
* Worker for lazy query execution.
*/
public class MapQueryLazyWorker extends GridWorker {
- /** Poll task timeout milliseconds. */
- private static final int POLL_TASK_TIMEOUT_MS = 1000;
-
/** Lazy thread flag. */
private static final ThreadLocal<MapQueryLazyWorker> LAZY_WORKER = new ThreadLocal<>();
/** Active lazy worker count (for testing purposes). */
private static final LongAdder ACTIVE_CNT = new LongAdder();
- /** Mutex to synchronization worker start/stop. */
- private final Object mux = new Object();
-
/** Task to be executed. */
private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>();
@@ -67,14 +51,8 @@ public class MapQueryLazyWorker extends GridWorker {
/** Latch decremented when worker finishes. */
private final CountDownLatch stopLatch = new CountDownLatch(1);
- /** Query context. */
- private GridH2QueryContext qctx;
-
- /** Worker is started flag. */
- private boolean started;
-
- /** Detached connection. */
- private ObjectPoolReusable<H2ConnectionWrapper> detached;
+ /** Map query result. */
+ private volatile MapQueryResult res;
/**
* Constructor.
@@ -92,106 +70,38 @@ public class MapQueryLazyWorker extends GridWorker {
this.exec = exec;
}
- /**
- * Start lazy worker for half-processed query.
- * In this case we have to detach H2 connection from current thread and use it for current query processing.
- * Also tables locks must be transferred to lazy thread from QUERY_POOL thread pool.
- *
- * @param ses H2 Session.
- * @param detached H2 connection detached from current thread.
- * @throws QueryCancelledException In case query is canceled during the worker start.
- */
- void start(Session ses, ObjectPoolReusable<H2ConnectionWrapper> detached) throws QueryCancelledException {
- synchronized (mux) {
- if (!exec.busyLock().enterBusy()) {
- log.warning("Lazy worker isn't started. Node is stopped [key=" + key + ']');
-
- return;
- }
-
- try {
- if (started)
- return;
-
- if (isCancelled) {
- if (detached != null)
- detached.recycle();
-
- throw new QueryCancelledException();
- }
-
- if (ses != null)
- lazyTransferStart(ses);
-
- this.detached = detached;
-
- exec.registerLazyWorker(this);
-
- IgniteThread thread = new IgniteThread(this);
-
- started = true;
-
- thread.start();
- }
- finally {
- exec.busyLock().leaveBusy();
- }
- }
- }
-
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
LAZY_WORKER.set(this);
ACTIVE_CNT.increment();
- boolean lockBusy = false;
-
try {
- if (qctx != null)
- GridH2QueryContext.set(qctx);
-
- if(detached != null)
- lazyTransferFinish(H2Utils.session(detached.object().connection()));
-
while (!isCancelled()) {
- Runnable task = tasks.poll(POLL_TASK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ Runnable task = tasks.take();
if (task != null) {
+ if (!exec.busyLock().enterBusy())
+ return;
+
try {
task.run();
}
- catch (Throwable t) {
- log.warning("Lazy task error", t);
- }
- }
- else {
- try {
- lockBusy = false;
-
- if (!exec.busyLock().enterBusy()) {
- log.info("Stop lazy worker [key=" + key + ']');
-
- return;
- }
-
- lockBusy = true;
- }
finally {
- if (lockBusy)
- exec.busyLock().leaveBusy();
+ exec.busyLock().leaveBusy();
}
}
}
}
finally {
- exec.unregisterLazyWorker(this);
+ if (res != null)
+ res.close();
LAZY_WORKER.set(null);
ACTIVE_CNT.decrement();
- stopLatch.countDown();
+ exec.unregisterLazyWorker(this);
}
}
@@ -201,9 +111,6 @@ public class MapQueryLazyWorker extends GridWorker {
* @param task Task to be executed.
*/
public void submit(Runnable task) {
- if (isCancelled)
- return;
-
tasks.add(task);
}
@@ -218,76 +125,45 @@ public class MapQueryLazyWorker extends GridWorker {
* Stop the worker.
* @param nodeStop Node is stopping.
*/
- private void stop0(boolean nodeStop) {
- synchronized (mux) {
- if (qctx != null && qctx.distributedJoinMode() == OFF && !qctx.isCleared())
- qctx.clearContext(nodeStop);
+ public void stop(final boolean nodeStop) {
+ if (MapQueryLazyWorker.currentWorker() == null)
+ submit(new Runnable() {
+ @Override public void run() {
+ stop(nodeStop);
+ }
+ });
+ else {
+ GridH2QueryContext qctx = GridH2QueryContext.get();
- if (detached != null) {
- detached.recycle();
+ if (qctx != null) {
+ qctx.clearContext(nodeStop);
- detached = null;
+ GridH2QueryContext.clearThreadLocal();
}
isCancelled = true;
- mux.notifyAll();
+ stopLatch.countDown();
}
}
/**
- * @param task Stop task.
+ * Await worker stop.
*/
- public void submitStopTask(Runnable task) {
- synchronized (mux) {
- if (LAZY_WORKER.get() != null)
- task.run();
- else
- submit(task);
+ public void awaitStop() {
+ try {
+ U.await(stopLatch);
}
- }
-
- /**
- * Stop the worker.
- * @param nodeStop Node is stopping.
- */
- public void stop(final boolean nodeStop) {
- synchronized (mux) {
- if (isCancelled)
- return;
-
- if (started && currentWorker() == null) {
- submit(new Runnable() {
- @Override public void run() {
- stop0(nodeStop);
- }
- });
-
- awaitStop();
- }
- else if (currentWorker() != null)
- stop0(nodeStop);
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e);
}
}
/**
- * Await worker stop.
+ * @param res Map query result.
*/
- private void awaitStop() {
- synchronized (mux) {
- try {
- if (!isCancelled)
- mux.wait();
-
- U.await(stopLatch);
- }
- catch (IgniteInterruptedCheckedException e) {
- throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
+ public void result(MapQueryResult res) {
+ this.res = res;
}
/**
@@ -305,13 +181,6 @@ public class MapQueryLazyWorker extends GridWorker {
}
/**
- * @param qctx Query context.
- */
- public void queryContext(GridH2QueryContext qctx) {
- this.qctx = qctx;
- }
-
- /**
* Construct worker name.
*
* @param instanceName Instance name.
@@ -322,32 +191,4 @@ public class MapQueryLazyWorker extends GridWorker {
return "query-lazy-worker_" + instanceName + "_" + key.nodeId() + "_" + key.queryRequestId() + "_" +
key.segment();
}
-
- /**
- * Start session transfer to lazy thread.
- *
- * @param ses Session.
- */
- private static void lazyTransferStart(Session ses) {
- GridH2QueryContext qctx = GridH2QueryContext.get();
-
- assert qctx != null;
-
- for(GridH2Table tbl : qctx.lockedTables())
- tbl.onLazyTransferStarted(ses);
- }
-
- /**
- * Finish session transfer to lazy thread.
- *
- * @param ses Session.
- */
- private static void lazyTransferFinish(Session ses) {
- GridH2QueryContext qctx = GridH2QueryContext.get();
-
- assert qctx != null;
-
- for(GridH2Table tbl : qctx.lockedTables())
- tbl.onLazyTransferFinished(ses);
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index 5a0c410..fb928c4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -22,7 +22,6 @@ import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -61,9 +60,6 @@ class MapQueryResult {
}
}
- /** Logger. */
- private final IgniteLogger log;
-
/** Indexing. */
private final IgniteH2Indexing h2;
@@ -100,23 +96,26 @@ class MapQueryResult {
/** */
private final Object[] params;
+ /** Lazy worker. */
+ private final MapQueryLazyWorker lazyWorker;
+
/**
- * @param h2 H2 indexing.
* @param rs Result set.
* @param cctx Cache context.
* @param qrySrcNodeId Query source node.
* @param qry Query.
* @param params Query params.
+ * @param lazyWorker Lazy worker.
*/
MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable GridCacheContext cctx,
- UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) {
- this.log = h2.kernalContext().log(MapQueryResult.class);
+ UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) {
this.h2 = h2;
this.cctx = cctx;
this.qry = qry;
this.params = params;
this.qrySrcNodeId = qrySrcNodeId;
this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
+ this.lazyWorker = lazyWorker;
if (rs != null) {
this.rs = rs;
@@ -175,6 +174,8 @@ class MapQueryResult {
* @return {@code true} If there are no more rows available.
*/
synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
+ assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
+
if (closed)
return true;
@@ -258,13 +259,30 @@ class MapQueryResult {
* Close the result.
*/
public void close() {
+ if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) {
+ lazyWorker.submit(new Runnable() {
+ @Override public void run() {
+ close();
+ }
+ });
+
+ lazyWorker.awaitStop();
+
+ return;
+ }
+
synchronized (this) {
+ assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
+
if (closed)
return;
closed = true;
- U.close(rs, log);
+ U.closeQuiet(rs);
+
+ if (lazyWorker != null)
+ lazyWorker.stop(false);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index b13137c..76527bc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -30,7 +30,7 @@ import org.jetbrains.annotations.Nullable;
* Mapper query results.
*/
class MapQueryResults {
- /** H2 indexing. */
+ /** H@ indexing. */
private final IgniteH2Indexing h2;
/** */
@@ -113,7 +113,10 @@ class MapQueryResults {
* @param params Query arguments.
*/
void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
- MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params);
+ MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params, lazyWorker);
+
+ if (lazyWorker != null)
+ lazyWorker.result(res);
if (!results.compareAndSet(qry, null, res))
throw new IllegalStateException();
@@ -136,37 +139,28 @@ class MapQueryResults {
/**
* Cancels the query.
*/
- void cancel() {
+ void cancel(boolean forceQryCancel) {
if (cancelled)
return;
cancelled = true;
for (int i = 0; i < results.length(); i++) {
- GridQueryCancel cancel = cancels[i];
+ MapQueryResult res = results.get(i);
- if (cancel != null)
- cancel.cancel();
- }
+ if (res != null) {
+ res.close();
- if (lazyWorker == null)
- close();
- else {
- lazyWorker.submitStopTask(this::close);
+ continue;
+ }
- lazyWorker.stop(false);
- }
- }
+ // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable).
+ if (forceQryCancel) {
+ GridQueryCancel cancel = cancels[i];
- /**
- *
- */
- public void close() {
- for (int i = 0; i < results.length(); i++) {
- MapQueryResult res = results.get(i);
-
- if (res != null)
- res.close();
+ if (cancel != null)
+ cancel.cancel();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
index a991530..a112969 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
@@ -96,6 +96,7 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT
int partsFilled = fillAllPartitions(cache, aff);
SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person")
+ .setLazy(true)
.setPageSize(1);
FieldsQueryCursor<List<?>> qryCursor = cache.query(qry);
@@ -142,6 +143,7 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT
int partsFilled = fillAllPartitions(cache, aff);
SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person")
+ .setLazy(true)
.setPageSize(1);
FieldsQueryCursor<List<?>> qryCursor = cache.query(qry);
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
index 24e2fb2..59be138 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
@@ -121,15 +121,12 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
private static int getStatementCacheSize(GridQueryProcessor qryProcessor) {
IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProcessor, GridQueryProcessor.class, "idx");
- ConcurrentMap<Thread, ConcurrentMap<H2ConnectionWrapper, Boolean>> conns =
- GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns");
+ ConcurrentMap<Thread, H2ConnectionWrapper> conns = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns");
int cntr = 0;
- for (ConcurrentMap<H2ConnectionWrapper, Boolean> connPerThread: conns.values()) {
- for (H2ConnectionWrapper w : connPerThread.keySet())
- cntr += w.statementCacheSize();
- }
+ for (H2ConnectionWrapper w : conns.values())
+ cntr += w.statementCacheSize();
return cntr;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
index 67a9501..56fd7b8 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
@@ -100,83 +100,84 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
/** */
public void testRemoteQueryExecutionTimeout() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true, true);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true);
}
/** */
public void testRemoteQueryWithMergeTableTimeout() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, true, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, true);
}
/** */
public void testRemoteQueryExecutionCancel0() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false, true);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false);
}
/** */
public void testRemoteQueryExecutionCancel1() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, false, true);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, false);
}
/** */
public void testRemoteQueryExecutionCancel2() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false, true);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false);
}
/** */
public void testRemoteQueryExecutionCancel3() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false, true);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false);
}
/** */
public void testRemoteQueryWithMergeTableCancel0() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false);
}
/** */
public void testRemoteQueryWithMergeTableCancel1() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false);
}
/** */
public void testRemoteQueryWithMergeTableCancel2() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false);
}
/** */
public void testRemoteQueryWithMergeTableCancel3() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false);
}
/** */
public void testRemoteQueryWithoutMergeTableCancel0() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false);
}
/** */
public void testRemoteQueryWithoutMergeTableCancel1() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false);
}
/** */
public void testRemoteQueryWithoutMergeTableCancel2() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false);
}
/** */
public void testRemoteQueryWithoutMergeTableCancel3() throws Exception {
- testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false, false);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false);
}
/** */
public void testRemoteQueryAlreadyFinishedStop() throws Exception {
- testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false, false);
+ testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false);
}
/** */
private void testQueryCancel(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit,
- boolean timeout, boolean checkCanceled) throws Exception {
+ boolean timeout) throws Exception {
try (Ignite client = startGrid("client")) {
+
IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
assertEquals(0, cache.localSize());
@@ -203,8 +204,7 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
qry.setTimeout(timeoutUnits, timeUnit);
cursor = cache.query(qry);
- }
- else {
+ } else {
cursor = cache.query(qry);
client.scheduler().runLocal(new Runnable() {
@@ -214,11 +214,8 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
}, timeoutUnits, timeUnit);
}
- try (QueryCursor<List<?>> ignored = cursor) {
- cursor.getAll();
-
- if (checkCanceled)
- fail("Query not canceled");
+ try(QueryCursor<List<?>> ignored = cursor) {
+ cursor.iterator();
}
catch (CacheException ex) {
log().error("Got expected exception", ex);
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
index 48b43a7..7e23c88 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
@@ -67,11 +67,6 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends GridCommonA
"where pr.companyId = co._key\n" +
"order by co._key, pr._key ";
- protected static final String QRY_LONG = "select pe.id, co.id, pr._key\n" +
- "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" +
- "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
- "order by pe.id desc";
-
/** */
protected static final int GRID_CNT = 2;
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
index 3beebff..bad5303 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
@@ -33,7 +33,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
-import org.apache.ignite.testframework.GridTestUtils;
/**
* Test for distributed queries with node restarts.
@@ -102,11 +101,11 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
assertEquals(broadcastQry, plan.contains("batched:broadcast"));
- final List<List<?>> goldenRes = grid(0).cache("pu").query(qry0).getAll();
+ final List<List<?>> pRes = grid(0).cache("pu").query(qry0).getAll();
Thread.sleep(3000);
- assertEquals(goldenRes, grid(0).cache("pu").query(qry0).getAll());
+ assertEquals(pRes, grid(0).cache("pu").query(qry0).getAll());
final SqlFieldsQuery qry1;
@@ -123,7 +122,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
final List<List<?>> rRes = grid(0).cache("co").query(qry1).getAll();
- assertFalse(goldenRes.isEmpty());
+ assertFalse(pRes.isEmpty());
assertFalse(rRes.isEmpty());
final AtomicInteger qryCnt = new AtomicInteger();
@@ -162,12 +161,9 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
qry.setPageSize(smallPageSize ? 30 : 1000);
try {
- assertEquals(goldenRes, cache.query(qry).getAll());
+ assertEquals(pRes, cache.query(qry).getAll());
}
catch (CacheException e) {
- if (!smallPageSize)
- log.error("Unexpected exception at the test", e);
-
assertTrue("On large page size must retry.", smallPageSize);
boolean failedOnRemoteFetch = false;
@@ -267,7 +263,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
}
}, restartThreadsNum, "restart-thread");
- GridTestUtils.waitForCondition(() -> fail.get(), duration);
+ Thread.sleep(duration);
info("Stopping...");
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
index 9f8a2fc..03a8d49 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
@@ -40,47 +40,47 @@ import org.apache.ignite.internal.util.typedef.internal.U;
public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extends IgniteCacheQueryAbstractDistributedJoinSelfTest {
/** */
public void testCancel1() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_LONG, 1, TimeUnit.MILLISECONDS, false, true);
+ testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, false);
}
/** */
public void testCancel2() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_LONG, 50, TimeUnit.MILLISECONDS, false, true);
+ testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, false);
}
/** */
public void testCancel3() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_LONG, 100, TimeUnit.MILLISECONDS, false, false);
+ testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, false);
}
/** */
public void testCancel4() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_LONG, 500, TimeUnit.MILLISECONDS, false, false);
+ testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, false);
}
/** */
public void testTimeout1() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_LONG, 1, TimeUnit.MILLISECONDS, true, true);
+ testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, true);
}
/** */
public void testTimeout2() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_LONG, 50, TimeUnit.MILLISECONDS, true, true);
+ testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, true);
}
/** */
public void testTimeout3() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_LONG, 100, TimeUnit.MILLISECONDS, true, false);
+ testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, true);
}
/** */
public void testTimeout4() throws Exception {
- testQueryCancel(grid(0), "pe", QRY_LONG, 500, TimeUnit.MILLISECONDS, true, false);
+ testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, true);
}
/** */
private void testQueryCancel(Ignite ignite, String cacheName, String sql, int timeoutUnits, TimeUnit timeUnit,
- boolean timeout, boolean checkCanceled) throws Exception {
+ boolean timeout) throws Exception {
SqlFieldsQuery qry = new SqlFieldsQuery(sql).setDistributedJoins(true);
IgniteCache<Object, Object> cache = ignite.cache(cacheName);
@@ -101,10 +101,7 @@ public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extend
}
try (QueryCursor<List<?>> ignored = cursor) {
- cursor.getAll();
-
- if (checkCanceled)
- fail("Query not canceled");
+ cursor.iterator();
}
catch (CacheException ex) {
log().error("Got expected exception", ex);
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
index 4d02b2e..072f1ab 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
@@ -627,8 +627,6 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
* @throws Exception If failed.
*/
public void testQueryConsistencyMultithreaded() throws Exception {
- final int KEY_COUNT = 5000;
-
// Start complex topology.
ignitionStart(serverConfiguration(1));
ignitionStart(serverConfiguration(2));
@@ -640,7 +638,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
run(cli, createSql);
- put(cli, 0, KEY_COUNT);
+ put(cli, 0, 5000);
final AtomicBoolean stopped = new AtomicBoolean();
@@ -698,7 +696,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
List<Cache.Entry<BinaryObject, BinaryObject>> res = cache.query(
new SqlQuery<BinaryObject, BinaryObject>(valTypeName, "from " + TBL_NAME)).getAll();
- assertEquals(KEY_COUNT, res.size());
+ assertEquals(5000, res.size());
}
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/75e414a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
index fe45ed6..7713004 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
@@ -160,7 +160,7 @@ public class H2ConnectionLeaksSelfTest extends GridCommonAbstractTest {
Map<Thread, ?> conns = perThreadConnections(i);
for(Thread t : conns.keySet())
- log.error("Connection is not closed for thread: " + t.getName());
+ log.error("+++ Connection is not closed for thread: " + t.getName());
}
fail("H2 JDBC connections leak detected. See the log above.");