You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/08/15 18:00:12 UTC
[11/29] ignite git commit: IGNITE-5982: GridMapQueryExecutor was
split into several pieces.
IGNITE-5982: GridMapQueryExecutor was split into several pieces.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/879f1910
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/879f1910
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/879f1910
Branch: refs/heads/ignite-5947
Commit: 879f19106b22e66d5f6ea94424d961d049397410
Parents: b093afb
Author: devozerov <vo...@gridgain.com>
Authored: Tue Aug 8 15:16:58 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Aug 8 15:17:58 2017 +0300
----------------------------------------------------------------------
.../query/h2/twostep/GridMapQueryExecutor.java | 501 ++-----------------
.../query/h2/twostep/MapNodeResults.java | 108 ++++
.../query/h2/twostep/MapQueryResult.java | 258 ++++++++++
.../query/h2/twostep/MapQueryResults.java | 155 ++++++
.../h2/twostep/MapReplicatedReservation.java | 38 ++
.../query/h2/twostep/MapRequestKey.java | 65 +++
.../query/h2/twostep/MapReservationKey.java | 73 +++
7 files changed, 730 insertions(+), 468 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/879f1910/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index fcf5f10..19b628b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.ResultSet;
import java.util.AbstractCollection;
@@ -31,7 +30,6 @@ import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReferenceArray;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -39,7 +37,6 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheQueryExecutedEvent;
-import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
@@ -57,35 +54,29 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.QueryTable;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.jdbc.JdbcResultSet;
-import org.h2.result.ResultInterface;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
-import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
@@ -94,30 +85,13 @@ import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoin
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
-import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
/**
* Map query executor.
*/
+@SuppressWarnings("ForLoopReplaceableByForEach")
public class GridMapQueryExecutor {
/** */
- private static final Field RESULT_FIELD;
-
- /*
- * Initialize.
- */
- static {
- try {
- RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result");
-
- RESULT_FIELD.setAccessible(true);
- }
- catch (NoSuchFieldException e) {
- throw new IllegalStateException("Check H2 version in classpath.", e);
- }
- }
-
- /** */
private IgniteLogger log;
/** */
@@ -127,14 +101,13 @@ public class GridMapQueryExecutor {
private IgniteH2Indexing h2;
/** */
- private ConcurrentMap<UUID, NodeResults> qryRess = new ConcurrentHashMap8<>();
+ private ConcurrentMap<UUID, MapNodeResults> qryRess = new ConcurrentHashMap8<>();
/** */
private final GridSpinBusyLock busyLock;
/** */
- private final ConcurrentMap<T2<String, AffinityTopologyVersion>, GridReservable> reservations =
- new ConcurrentHashMap8<>();
+ private final ConcurrentMap<MapReservationKey, GridReservable> reservations = new ConcurrentHashMap8<>();
/**
* @param busyLock Busy lock.
@@ -162,7 +135,7 @@ public class GridMapQueryExecutor {
GridH2QueryContext.clearAfterDeadNode(locNodeId, nodeId);
- NodeResults nodeRess = qryRess.remove(nodeId);
+ MapNodeResults nodeRess = qryRess.remove(nodeId);
if (nodeRess == null)
return;
@@ -172,6 +145,7 @@ public class GridMapQueryExecutor {
}, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+ @SuppressWarnings("deprecation")
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!busyLock.enterBusy())
return;
@@ -228,7 +202,7 @@ public class GridMapQueryExecutor {
private void onCancel(ClusterNode node, GridQueryCancelRequest msg) {
long qryReqId = msg.queryRequestId();
- NodeResults nodeRess = resultsForNode(node.id());
+ MapNodeResults nodeRess = resultsForNode(node.id());
boolean clear = GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
@@ -245,13 +219,13 @@ public class GridMapQueryExecutor {
* @param nodeId Node ID.
* @return Results for node.
*/
- private NodeResults resultsForNode(UUID nodeId) {
- NodeResults nodeRess = qryRess.get(nodeId);
+ private MapNodeResults resultsForNode(UUID nodeId) {
+ MapNodeResults nodeRess = qryRess.get(nodeId);
if (nodeRess == null) {
- nodeRess = new NodeResults();
+ nodeRess = new MapNodeResults();
- NodeResults old = qryRess.putIfAbsent(nodeId, nodeRess);
+ MapNodeResults old = qryRess.putIfAbsent(nodeId, nodeRess);
if (old != null)
nodeRess = old;
@@ -300,13 +274,12 @@ public class GridMapQueryExecutor {
continue;
// For replicated cache topology version does not make sense.
- final T2<String,AffinityTopologyVersion> grpKey =
- new T2<>(cctx.name(), cctx.isReplicated() ? null : topVer);
+ final MapReservationKey grpKey = new MapReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer);
GridReservable r = reservations.get(grpKey);
if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits.
- if (r != ReplicatedReservation.INSTANCE) {
+ if (r != MapReplicatedReservation.INSTANCE) {
if (!r.reserve())
return false; // We need explicit partitions here -> retry.
@@ -327,7 +300,7 @@ public class GridMapQueryExecutor {
}
// Mark that we checked this replicated cache.
- reservations.putIfAbsent(grpKey, ReplicatedReservation.INSTANCE);
+ reservations.putIfAbsent(grpKey, MapReplicatedReservation.INSTANCE);
}
}
else { // Reserve primary partitions for partitioned cache (if no explicit given).
@@ -381,6 +354,7 @@ public class GridMapQueryExecutor {
return Collections.emptySet();
return new AbstractCollection<Integer>() {
+ @SuppressWarnings("NullableProblems")
@Override public Iterator<Integer> iterator() {
return new Iterator<Integer>() {
/** */
@@ -537,9 +511,9 @@ public class GridMapQueryExecutor {
GridCacheContext<?, ?> mainCctx =
!F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null;
- NodeResults nodeRess = resultsForNode(node.id());
+ MapNodeResults nodeRess = resultsForNode(node.id());
- QueryResults qr = null;
+ MapQueryResults qr = null;
List<GridReservable> reserved = new ArrayList<>();
@@ -553,7 +527,7 @@ public class GridMapQueryExecutor {
}
}
- qr = new QueryResults(reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null);
+ qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null);
if (nodeRess.put(reqId, segmentId, qr) != null)
throw new IllegalStateException();
@@ -619,7 +593,7 @@ public class GridMapQueryExecutor {
rs = h2.executeSqlQueryWithTimer(conn, qry.query(),
F.asList(qry.parameters(params)), true,
timeout,
- qr.cancels[qryIdx]);
+ qr.queryCancel(qryIdx));
if (evt) {
assert mainCctx != null;
@@ -644,7 +618,7 @@ public class GridMapQueryExecutor {
qr.addResult(qryIdx, qry, node.id(), rs, params);
- if (qr.canceled) {
+ if (qr.cancelled()) {
qr.result(qryIdx).close();
throw new QueryCancelledException();
@@ -724,7 +698,7 @@ public class GridMapQueryExecutor {
* @param req Request.
*/
private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) {
- NodeResults nodeRess = qryRess.get(node.id());
+ MapNodeResults nodeRess = qryRess.get(node.id());
if (nodeRess == null) {
sendError(node, req.queryRequestId(), new CacheException("No node result found for request: " + req));
@@ -736,11 +710,11 @@ public class GridMapQueryExecutor {
return;
}
- QueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
+ MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
if (qr == null)
sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
- else if (qr.canceled)
+ else if (qr.cancelled())
sendError(node, req.queryRequestId(), new QueryCancelledException());
else
sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
@@ -754,16 +728,16 @@ public class GridMapQueryExecutor {
* @param segmentId Index segment ID.
* @param pageSize Page size.
*/
- private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry, int segmentId,
+ private void sendNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
int pageSize) {
- QueryResult res = qr.result(qry);
+ MapQueryResult res = qr.result(qry);
assert res != null;
- if (res.closed)
+ if (res.closed())
return;
- int page = res.page;
+ int page = res.page();
List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize));
@@ -773,16 +747,16 @@ public class GridMapQueryExecutor {
res.close();
if (qr.isAllClosed())
- nodeRess.remove(qr.qryReqId, segmentId, qr);
+ nodeRess.remove(qr.queryRequestId(), segmentId, qr);
}
try {
boolean loc = node.isLocal();
- GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, segmentId, qry, page,
- page == 0 ? res.rowCnt : -1,
- res.cols,
- loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)),
+ GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.queryRequestId(), segmentId, qry, page,
+ page == 0 ? res.rowCount() : -1,
+ res.columnCount(),
+ loc ? null : toMessages(rows, new ArrayList<Message>(res.columnCount())),
loc ? rows : null);
if (loc)
@@ -828,418 +802,9 @@ public class GridMapQueryExecutor {
*/
public void onCacheStop(String cacheName) {
// Drop group reservations.
- for (T2<String,AffinityTopologyVersion> grpKey : reservations.keySet()) {
- if (F.eq(grpKey.get1(), cacheName))
+ for (MapReservationKey grpKey : reservations.keySet()) {
+ if (F.eq(grpKey.cacheName(), cacheName))
reservations.remove(grpKey);
}
}
-
-
- /**
- *
- */
- private static class NodeResults {
- /** */
- private final ConcurrentMap<RequestKey, QueryResults> res = new ConcurrentHashMap8<>();
-
- /** */
- private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
- new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
-
- /**
- * @param reqId Query Request ID.
- * @return {@code False} if query was already cancelled.
- */
- boolean cancelled(long reqId) {
- return qryHist.get(reqId) != null;
- }
-
- /**
- * @param reqId Query Request ID.
- * @return {@code True} if cancelled.
- */
- boolean onCancel(long reqId) {
- Boolean old = qryHist.putIfAbsent(reqId, Boolean.FALSE);
-
- return old == null;
- }
-
- /**
- * @param reqId Query Request ID.
- * @param segmentId Index segment ID.
- * @return query partial results.
- */
- public QueryResults get(long reqId, int segmentId) {
- return res.get(new RequestKey(reqId, segmentId));
- }
-
- /**
- * Cancel all thread of given request.
- * @param reqID Request ID.
- */
- public void cancelRequest(long reqID) {
- for (RequestKey key : res.keySet()) {
- if (key.reqId == reqID) {
- QueryResults removed = res.remove(key);
-
- if (removed != null)
- removed.cancel(true);
- }
-
- }
- }
-
- /**
- * @param reqId Query Request ID.
- * @param segmentId Index segment ID.
- * @param qr Query Results.
- * @return {@code True} if removed.
- */
- public boolean remove(long reqId, int segmentId, QueryResults qr) {
- return res.remove(new RequestKey(reqId, segmentId), qr);
- }
-
- /**
- * @param reqId Query Request ID.
- * @param segmentId Index segment ID.
- * @param qr Query Results.
- * @return previous value.
- */
- public QueryResults put(long reqId, int segmentId, QueryResults qr) {
- return res.put(new RequestKey(reqId, segmentId), qr);
- }
-
- /**
- * Cancel all node queries.
- */
- public void cancelAll() {
- for (QueryResults ress : res.values())
- ress.cancel(true);
- }
-
- /**
- *
- */
- private static class RequestKey {
- /** */
- private long reqId;
-
- /** */
- private int segmentId;
-
- /** Constructor */
- RequestKey(long reqId, int segmentId) {
- this.reqId = reqId;
- this.segmentId = segmentId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- RequestKey other = (RequestKey)o;
-
- return reqId == other.reqId && segmentId == other.segmentId;
-
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int result = (int)(reqId ^ (reqId >>> 32));
- result = 31 * result + segmentId;
- return result;
- }
- }
- }
-
- /**
- *
- */
- private class QueryResults {
- /** */
- private final long qryReqId;
-
- /** */
- private final AtomicReferenceArray<QueryResult> results;
-
- /** */
- private final GridQueryCancel[] cancels;
-
- /** */
- private final String cacheName;
-
- /** */
- private volatile boolean canceled;
-
- /**
- * @param qryReqId Query request ID.
- * @param qrys Number of queries.
- * @param cacheName Cache name.
- */
- @SuppressWarnings("unchecked")
- private QueryResults(long qryReqId, int qrys, @Nullable String cacheName) {
- this.qryReqId = qryReqId;
- this.cacheName = cacheName;
-
- results = new AtomicReferenceArray<>(qrys);
- cancels = new GridQueryCancel[qrys];
-
- for (int i = 0; i < cancels.length; i++)
- cancels[i] = new GridQueryCancel();
- }
-
- /**
- * @param qry Query result index.
- * @return Query result.
- */
- QueryResult result(int qry) {
- return results.get(qry);
- }
-
- /**
- * @param qry Query result index.
- * @param q Query object.
- * @param qrySrcNodeId Query source node.
- * @param rs Result set.
- */
- void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
- if (!results.compareAndSet(qry, null, new QueryResult(rs, ctx, cacheName, qrySrcNodeId, q, params)))
- throw new IllegalStateException();
- }
-
- /**
- * @return {@code true} If all results are closed.
- */
- boolean isAllClosed() {
- for (int i = 0; i < results.length(); i++) {
- QueryResult res = results.get(i);
-
- if (res == null || !res.closed)
- return false;
- }
-
- return true;
- }
-
- /**
- * Cancels the query.
- */
- void cancel(boolean forceQryCancel) {
- if (canceled)
- return;
-
- canceled = true;
-
- for (int i = 0; i < results.length(); i++) {
- QueryResult res = results.get(i);
-
- if (res != null) {
- res.close();
-
- continue;
- }
-
- if (forceQryCancel) {
- GridQueryCancel cancel = cancels[i];
-
- if (cancel != null)
- cancel.cancel();
- }
- }
- }
- }
-
- /**
- * Result for a single part of the query.
- */
- private class QueryResult implements AutoCloseable {
- /** */
- private final ResultInterface res;
-
- /** */
- private final ResultSet rs;
-
- /** Kernal context. */
- private final GridKernalContext ctx;
-
- /** */
- private final String cacheName;
-
- /** */
- private final GridCacheSqlQuery qry;
-
- /** */
- private final UUID qrySrcNodeId;
-
- /** */
- private final int cols;
-
- /** */
- private int page;
-
- /** */
- private final int rowCnt;
-
- /** */
- private boolean cpNeeded;
-
- /** */
- private volatile boolean closed;
-
- /** */
- private final Object[] params;
-
- /**
- * @param rs Result set.
- * @param ctx Kernal context.
- * @param cacheName Cache name.
- * @param qrySrcNodeId Query source node.
- * @param qry Query.
- * @param params Query params.
- */
- private QueryResult(ResultSet rs, GridKernalContext ctx, @Nullable String cacheName,
- UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) {
- this.ctx = ctx;
- this.cacheName = cacheName;
- this.qry = qry;
- this.params = params;
- this.qrySrcNodeId = qrySrcNodeId;
- this.cpNeeded = F.eq(ctx.localNodeId(), qrySrcNodeId);
-
- if (rs != null) {
- this.rs = rs;
- try {
- res = (ResultInterface)RESULT_FIELD.get(rs);
- }
- catch (IllegalAccessException e) {
- throw new IllegalStateException(e); // Must not happen.
- }
-
- rowCnt = res.getRowCount();
- cols = res.getVisibleColumnCount();
- }
- else {
- this.rs = null;
- this.res = null;
- this.cols = -1;
- this.rowCnt = -1;
-
- closed = true;
- }
- }
-
- /**
- * @param rows Collection to fetch into.
- * @param pageSize Page size.
- * @return {@code true} If there are no more rows available.
- */
- synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
- if (closed)
- return true;
-
- boolean readEvt = cacheName != null && ctx.event().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
- page++;
-
- for (int i = 0 ; i < pageSize; i++) {
- if (!res.next())
- return true;
-
- Value[] row = res.currentRow();
-
- if (cpNeeded) {
- boolean copied = false;
-
- for (int j = 0; j < row.length; j++) {
- Value val = row[j];
-
- if (val instanceof GridH2ValueCacheObject) {
- GridH2ValueCacheObject valCacheObj = (GridH2ValueCacheObject)val;
-
- row[j] = new GridH2ValueCacheObject(valCacheObj.getCacheObject(), h2.objectContext()) {
- @Override public Object getObject() {
- return getObject(true);
- }
- };
-
- copied = true;
- }
- }
-
- if (i == 0 && !copied)
- cpNeeded = false; // No copy on read caches, skip next checks.
- }
-
- assert row != null;
-
- if (readEvt) {
- ctx.event().record(new CacheQueryReadEvent<>(
- ctx.discovery().localNode(),
- "SQL fields query result set row read.",
- EVT_CACHE_QUERY_OBJECT_READ,
- CacheQueryType.SQL.name(),
- cacheName,
- null,
- qry.query(),
- null,
- null,
- params,
- qrySrcNodeId,
- null,
- null,
- null,
- null,
- row(row)));
- }
-
- rows.add(res.currentRow());
- }
-
- return false;
- }
-
- /**
- * @param row Values array row.
- * @return Objects list row.
- */
- private List<?> row(Value[] row) {
- List<Object> res = new ArrayList<>(row.length);
-
- for (Value v : row)
- res.add(v.getObject());
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void close() {
- if (closed)
- return;
-
- closed = true;
-
- U.close(rs, log);
- }
- }
-
- /**
- * Fake reservation object for replicated caches.
- */
- private static class ReplicatedReservation implements GridReservable {
- /** */
- static final ReplicatedReservation INSTANCE = new ReplicatedReservation();
-
- /** {@inheritDoc} */
- @Override public boolean reserve() {
- throw new IllegalStateException();
- }
-
- /** {@inheritDoc} */
- @Override public void release() {
- throw new IllegalStateException();
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/879f1910/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
new file mode 100644
index 0000000..d5ea357
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
+import org.jsr166.ConcurrentHashMap8;
+
+import java.util.concurrent.ConcurrentMap;
+
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
+
+/**
+ * Mapper node results.
+ */
+class MapNodeResults {
+ /** */
+ private final ConcurrentMap<MapRequestKey, MapQueryResults> res = new ConcurrentHashMap8<>();
+
+ /** */
+ private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
+ new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
+
+ /**
+ * @param reqId Query Request ID.
+ * @return {@code False} if query was already cancelled.
+ */
+ boolean cancelled(long reqId) {
+ return qryHist.get(reqId) != null;
+ }
+
+ /**
+ * @param reqId Query Request ID.
+ * @return {@code True} if cancelled.
+ */
+ boolean onCancel(long reqId) {
+ Boolean old = qryHist.putIfAbsent(reqId, Boolean.FALSE);
+
+ return old == null;
+ }
+
+ /**
+ * @param reqId Query Request ID.
+ * @param segmentId Index segment ID.
+ * @return query partial results.
+ */
+ public MapQueryResults get(long reqId, int segmentId) {
+ return res.get(new MapRequestKey(reqId, segmentId));
+ }
+
+ /**
+ * Cancel all thread of given request.
+ * @param reqId Request ID.
+ */
+ public void cancelRequest(long reqId) {
+ for (MapRequestKey key : res.keySet()) {
+ if (key.requestId() == reqId) {
+ MapQueryResults removed = res.remove(key);
+
+ if (removed != null)
+ removed.cancel(true);
+ }
+ }
+ }
+
+ /**
+ * @param reqId Query Request ID.
+ * @param segmentId Index segment ID.
+ * @param qr Query Results.
+ * @return {@code True} if removed.
+ */
+ public boolean remove(long reqId, int segmentId, MapQueryResults qr) {
+ return res.remove(new MapRequestKey(reqId, segmentId), qr);
+ }
+
+ /**
+ * @param reqId Query Request ID.
+ * @param segmentId Index segment ID.
+ * @param qr Query Results.
+ * @return previous value.
+ */
+ public MapQueryResults put(long reqId, int segmentId, MapQueryResults qr) {
+ return res.put(new MapRequestKey(reqId, segmentId), qr);
+ }
+
+ /**
+ * Cancel all node queries.
+ */
+ public void cancelAll() {
+ for (MapQueryResults ress : res.values())
+ ress.cancel(true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/879f1910/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
new file mode 100644
index 0000000..4799e03
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.events.CacheQueryReadEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.jdbc.JdbcResultSet;
+import org.h2.result.ResultInterface;
+import org.h2.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+import java.lang.reflect.Field;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+
+/**
+ * Mapper result for a single part of the query.
+ */
+class MapQueryResult implements AutoCloseable {
+ /** */
+ private static final Field RESULT_FIELD;
+
+ /*
+ * Initialize.
+ */
+ static {
+ try {
+ RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result");
+
+ RESULT_FIELD.setAccessible(true);
+ }
+ catch (NoSuchFieldException e) {
+ throw new IllegalStateException("Check H2 version in classpath.", e);
+ }
+ }
+
+ /** Indexing. */
+ private final IgniteH2Indexing h2;
+
+ /** */
+ private final ResultInterface res;
+
+ /** */
+ private final ResultSet rs;
+
+ /** */
+ private final String cacheName;
+
+ /** */
+ private final GridCacheSqlQuery qry;
+
+ /** */
+ private final UUID qrySrcNodeId;
+
+ /** */
+ private final int cols;
+
+ /** */
+ private int page;
+
+ /** */
+ private final int rowCnt;
+
+ /** */
+ private boolean cpNeeded;
+
+ /** */
+ private volatile boolean closed;
+
+ /** */
+ private final Object[] params;
+
+ /**
+ * @param rs Result set.
+ * @param cacheName Cache name.
+ * @param qrySrcNodeId Query source node.
+ * @param qry Query.
+ * @param params Query params.
+ */
+ MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable String cacheName,
+ UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) {
+ this.h2 = h2;
+ this.cacheName = cacheName;
+ this.qry = qry;
+ this.params = params;
+ this.qrySrcNodeId = qrySrcNodeId;
+ this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
+
+ if (rs != null) {
+ this.rs = rs;
+ try {
+ res = (ResultInterface)RESULT_FIELD.get(rs);
+ }
+ catch (IllegalAccessException e) {
+ throw new IllegalStateException(e); // Must not happen.
+ }
+
+ rowCnt = res.getRowCount();
+ cols = res.getVisibleColumnCount();
+ }
+ else {
+ this.rs = null;
+ this.res = null;
+ this.cols = -1;
+ this.rowCnt = -1;
+
+ closed = true;
+ }
+ }
+
+ /**
+ * @return Page number.
+ */
+ int page() {
+ return page;
+ }
+
+ /**
+ * @return Row count.
+ */
+ int rowCount() {
+ return rowCnt;
+ }
+
+ /**
+ * @return Column ocunt.
+ */
+ int columnCount() {
+ return cols;
+ }
+
+ /**
+ * @return Closed flag.
+ */
+ boolean closed() {
+ return closed;
+ }
+
+ /**
+ * @param rows Collection to fetch into.
+ * @param pageSize Page size.
+ * @return {@code true} If there are no more rows available.
+ */
+ synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
+ if (closed)
+ return true;
+
+ boolean readEvt = cacheName != null && h2.kernalContext().event().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
+ page++;
+
+ for (int i = 0 ; i < pageSize; i++) {
+ if (!res.next())
+ return true;
+
+ Value[] row = res.currentRow();
+
+ if (cpNeeded) {
+ boolean copied = false;
+
+ for (int j = 0; j < row.length; j++) {
+ Value val = row[j];
+
+ if (val instanceof GridH2ValueCacheObject) {
+ GridH2ValueCacheObject valCacheObj = (GridH2ValueCacheObject)val;
+
+ row[j] = new GridH2ValueCacheObject(valCacheObj.getCacheObject(), h2.objectContext()) {
+ @Override public Object getObject() {
+ return getObject(true);
+ }
+ };
+
+ copied = true;
+ }
+ }
+
+ if (i == 0 && !copied)
+ cpNeeded = false; // No copy on read caches, skip next checks.
+ }
+
+ assert row != null;
+
+ if (readEvt) {
+ GridKernalContext ctx = h2.kernalContext();
+
+ ctx.event().record(new CacheQueryReadEvent<>(
+ ctx.discovery().localNode(),
+ "SQL fields query result set row read.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.SQL.name(),
+ cacheName,
+ null,
+ qry.query(),
+ null,
+ null,
+ params,
+ qrySrcNodeId,
+ null,
+ null,
+ null,
+ null,
+ row(row)));
+ }
+
+ rows.add(res.currentRow());
+ }
+
+ return false;
+ }
+
+ /**
+ * @param row Values array row.
+ * @return Objects list row.
+ */
+ private List<?> row(Value[] row) {
+ List<Object> res = new ArrayList<>(row.length);
+
+ for (Value v : row)
+ res.add(v.getObject());
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void close() {
+ if (closed)
+ return;
+
+ closed = true;
+
+ U.closeQuiet(rs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/879f1910/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
new file mode 100644
index 0000000..7ad1d14
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.jetbrains.annotations.Nullable;
+
+import java.sql.ResultSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+/**
+ * Mapper query results.
+ */
+class MapQueryResults {
+ /** H@ indexing. */
+ private final IgniteH2Indexing h2;
+
+ /** */
+ private final long qryReqId;
+
+ /** */
+ private final AtomicReferenceArray<MapQueryResult> results;
+
+ /** */
+ private final GridQueryCancel[] cancels;
+
+ /** */
+ private final String cacheName;
+
+ /** */
+ private volatile boolean cancelled;
+
+ /**
+ * @param qryReqId Query request ID.
+ * @param qrys Number of queries.
+ * @param cacheName Cache name.
+ */
+ @SuppressWarnings("unchecked")
+ MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys,
+ @Nullable String cacheName) {
+ this.h2 = h2;
+ this.qryReqId = qryReqId;
+ this.cacheName = cacheName;
+
+ results = new AtomicReferenceArray<>(qrys);
+ cancels = new GridQueryCancel[qrys];
+
+ for (int i = 0; i < cancels.length; i++)
+ cancels[i] = new GridQueryCancel();
+ }
+
+ /**
+ * @param qry Query result index.
+ * @return Query result.
+ */
+ MapQueryResult result(int qry) {
+ return results.get(qry);
+ }
+
+ /**
+ * Get cancel token for query.
+ *
+ * @param qryIdx Query index.
+ * @return Cancel token.
+ */
+ GridQueryCancel queryCancel(int qryIdx) {
+ return cancels[qryIdx];
+ }
+
+ /**
+ * @param qry Query result index.
+ * @param q Query object.
+ * @param qrySrcNodeId Query source node.
+ * @param rs Result set.
+ */
+ void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
+ MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params);
+
+ if (!results.compareAndSet(qry, null, res))
+ throw new IllegalStateException();
+ }
+
+ /**
+ * @return {@code true} If all results are closed.
+ */
+ boolean isAllClosed() {
+ for (int i = 0; i < results.length(); i++) {
+ MapQueryResult res = results.get(i);
+
+ if (res == null || !res.closed())
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Cancels the query.
+ */
+ void cancel(boolean forceQryCancel) {
+ if (cancelled)
+ return;
+
+ cancelled = true;
+
+ for (int i = 0; i < results.length(); i++) {
+ MapQueryResult res = results.get(i);
+
+ if (res != null) {
+ res.close();
+
+ continue;
+ }
+
+ if (forceQryCancel) {
+ GridQueryCancel cancel = cancels[i];
+
+ if (cancel != null)
+ cancel.cancel();
+ }
+ }
+ }
+
+ /**
+ * @return Cancel flag.
+ */
+ boolean cancelled() {
+ return cancelled;
+ }
+
+ /**
+ * @return Query request ID.
+ */
+ long queryRequestId() {
+ return qryReqId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/879f1910/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java
new file mode 100644
index 0000000..dd8237b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+
+/**
+ * Mapper fake reservation object for replicated caches.
+ */
+class MapReplicatedReservation implements GridReservable {
+ /** */
+ static final MapReplicatedReservation INSTANCE = new MapReplicatedReservation();
+
+ /** {@inheritDoc} */
+ @Override public boolean reserve() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release() {
+ throw new IllegalStateException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/879f1910/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
new file mode 100644
index 0000000..6feb8ea
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+/**
+ * Mapper request key.
+ */
+class MapRequestKey {
+ /** */
+ private long reqId;
+
+ /** */
+ private int segmentId;
+
+ /** Constructor */
+ MapRequestKey(long reqId, int segmentId) {
+ this.reqId = reqId;
+ this.segmentId = segmentId;
+ }
+
+ /**
+ * @return Request ID.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ MapRequestKey other = (MapRequestKey)o;
+
+ return reqId == other.reqId && segmentId == other.segmentId;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = (int)(reqId ^ (reqId >>> 32));
+
+ res = 31 * res + segmentId;
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/879f1910/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java
new file mode 100644
index 0000000..9d2d7ba
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Mapper reservation key.
+ */
+public class MapReservationKey {
+ /** Cache name. */
+ private final String cacheName;
+
+ /** Topology version. */
+ private final AffinityTopologyVersion topVer;
+
+ /**
+ * Constructor.
+ *
+ * @param cacheName Cache name.
+ * @param topVer Topology version.
+ */
+ public MapReservationKey(String cacheName, AffinityTopologyVersion topVer) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
+ }
+
+ /**
+ * @return Cache name.
+ */
+ public String cacheName() {
+ return cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ MapReservationKey other = (MapReservationKey)o;
+
+ return F.eq(cacheName, other.cacheName) && F.eq(topVer, other.topVer);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = cacheName != null ? cacheName.hashCode() : 0;
+
+ res = 31 * res + (topVer != null ? topVer.hashCode() : 0);
+
+ return res;
+ }
+}