You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/12 09:05:18 UTC
[1/2] incubator-ignite git commit: ignite-484 - extra spaces
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-484 [created] 7000722cf
ignite-484 - extra spaces
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e975b7a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e975b7a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e975b7a9
Branch: refs/heads/ignite-484
Commit: e975b7a9e1856bbb5c200f8c89a0d3d87423a016
Parents: ecc7a50
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon May 11 20:59:28 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon May 11 20:59:28 2015 +0300
----------------------------------------------------------------------
.../cache/query/GridCacheTwoStepQuery.java | 22 +++++++-
.../h2/twostep/messages/GridQueryRequest.java | 56 +++++++++++++++++++-
.../processors/query/h2/IgniteH2Indexing.java | 12 ++++-
.../query/h2/sql/GridSqlQuerySplitter.java | 48 ++++++++++++++++-
.../h2/twostep/GridReduceQueryExecutor.java | 23 +++++++-
.../cache/GridCacheCrossCacheQuerySelfTest.java | 3 +-
6 files changed, 157 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 53fc7a3..1aa5890 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -46,11 +46,17 @@ public class GridCacheTwoStepQuery {
/** */
private boolean explain;
+ /** */
+ private Set<String> spaces;
+
/**
+ * @param spaces All spaces accessed in query.
* @param qry Reduce query.
* @param params Reduce query parameters.
*/
- public GridCacheTwoStepQuery(String qry, Object ... params) {
+ public GridCacheTwoStepQuery(Set<String> spaces, String qry, Object ... params) {
+ this.spaces = spaces;
+
reduce = new GridCacheSqlQuery(null, qry, params);
}
@@ -115,4 +121,18 @@ public class GridCacheTwoStepQuery {
@Override public String toString() {
return S.toString(GridCacheTwoStepQuery.class, this);
}
+
+ /**
+ * @return Spaces.
+ */
+ public Set<String> spaces() {
+ return spaces;
+ }
+
+ /**
+ * @param spaces Spaces.
+ */
+ public void spaces(Set<String> spaces) {
+ this.spaces = spaces;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 3d3bcf9..319a818 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -48,6 +48,13 @@ public class GridQueryRequest implements Message {
@GridDirectCollection(GridCacheSqlQuery.class)
private Collection<GridCacheSqlQuery> qrys;
+ /** Topology version. */
+ private long topVer;
+
+ /** */
+ @GridDirectCollection(String.class)
+ private Collection<String> extraSpaces;
+
/**
* Default constructor.
*/
@@ -60,13 +67,32 @@ public class GridQueryRequest implements Message {
* @param pageSize Page size.
* @param space Space.
* @param qrys Queries.
+ * @param topVer Topology version.
+ * @param extraSpaces All space names participating in query other than {@code space}.
*/
- public GridQueryRequest(long reqId, int pageSize, String space, Collection<GridCacheSqlQuery> qrys) {
+ public GridQueryRequest(long reqId, int pageSize, String space, Collection<GridCacheSqlQuery> qrys, long topVer,
+ List<String> extraSpaces) {
this.reqId = reqId;
this.pageSize = pageSize;
this.space = space;
this.qrys = qrys;
+ this.topVer = topVer;
+ this.extraSpaces = extraSpaces;
+ }
+
+ /**
+ * @return All extra space names participating in query other than {@link #space()}.
+ */
+ public Collection<String> extraSpaces() {
+ return extraSpaces;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ public long topologyVersion() {
+ return topVer;
}
/**
@@ -138,6 +164,17 @@ public class GridQueryRequest implements Message {
writer.incrementState();
+ case 4:
+ if (!writer.writeLong("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeCollection("extraSpaces", extraSpaces, MessageCollectionItemType.STRING))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -183,6 +220,21 @@ public class GridQueryRequest implements Message {
reader.incrementState();
+ case 4:
+ topVer = reader.readLong("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ extraSpaces = reader.readCollection("extraSpaces", MessageCollectionItemType.STRING);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return true;
@@ -195,6 +247,6 @@ public class GridQueryRequest implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 4;
+ return 6;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 975378c..44db280 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1120,13 +1120,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param space Space name.
* @return Schema name.
*/
- private static String schema(@Nullable String space) {
+ public static String schema(@Nullable String space) {
if (space == null)
return "";
return space;
}
+ /**
+ * @param schema Schema.
+ * @return Space name.
+ */
+ public static String space(String schema) {
+ assert schema != null;
+
+ return "".equals(schema) ? null : schema;
+ }
+
/** {@inheritDoc} */
@Override public void rebuildIndexes(@Nullable String spaceName, GridQueryTypeDescriptor type) {
TableDescriptor tbl = tableDescriptor(spaceName, type);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 6c7e2e2..5795a1e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.sql;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.h2.*;
import org.apache.ignite.internal.util.typedef.*;
import org.h2.jdbc.*;
import org.h2.value.*;
@@ -212,7 +213,9 @@ public class GridSqlQuerySplitter {
}
// Build resulting two step query.
- GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(rdcQry.getSQL(),
+ GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(
+ collectAllSpaces(qry0, new HashSet<String>()),
+ rdcQry.getSQL(),
findParams(rdcQry, params, new ArrayList<>()).toArray());
res.addMapQuery(mergeTable, mapQry.getSQL(),
@@ -224,6 +227,49 @@ public class GridSqlQuerySplitter {
}
/**
+ * @param qry Query.
+ * @param spaces Space names.
+ * @return Space names.
+ */
+ private static Set<String> collectAllSpaces(GridSqlQuery qry, Set<String> spaces) {
+ if (qry instanceof GridSqlUnion) {
+ GridSqlUnion union = (GridSqlUnion)qry;
+
+ collectAllSpaces(union.left(), spaces);
+ collectAllSpaces(union.right(), spaces);
+ }
+ else
+ collectAllSpacesInFrom(((GridSqlSelect)qry).from(), spaces);
+
+ return spaces;
+ }
+
+ /**
+ * @param from From element.
+ * @param spaces Space names.
+ */
+ private static void collectAllSpacesInFrom(GridSqlElement from, Set<String> spaces) {
+ assert from != null;
+
+ if (from instanceof GridSqlJoin) {
+ for (int i = 0; i < from.size(); i++)
+ collectAllSpacesInFrom(from.child(i), spaces);
+ }
+ else if (from instanceof GridSqlTable) {
+ String schema = ((GridSqlTable)from).schema();
+
+ if (schema != null)
+ spaces.add(IgniteH2Indexing.space(schema));
+ }
+ else if (from instanceof GridSqlSubquery)
+ collectAllSpaces(((GridSqlSubquery)from).select(), spaces);
+ else if (from instanceof GridSqlAlias)
+ collectAllSpacesInFrom(from.child(), spaces);
+ else if (!(from instanceof GridSqlFunction))
+ throw new IllegalStateException(from.getClass().getName());
+ }
+
+ /**
* @param qry Select.
* @param params Parameters.
* @param target Extracted parameters.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 09a238f..2e69286 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -335,7 +335,9 @@ public class GridReduceQueryExecutor {
mapQry.marshallParams(m);
}
- send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys));
+ send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys,
+ ctx.cluster().get().topologyVersion(),
+ extraSpaces(space, qry.spaces())));
r.latch.await();
@@ -375,6 +377,25 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param mainSpace Main space.
+ * @param allSpaces All spaces.
+ * @return List of all extra spaces or {@code null} if none.
+ */
+ private List<String> extraSpaces(String mainSpace, Set<String> allSpaces) {
+ if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace)))
+ return null;
+
+ ArrayList<String> res = new ArrayList<>(allSpaces.size());
+
+ for (String space : allSpaces) {
+ if (!F.eq(space, mainSpace))
+ res.add(space);
+ }
+
+ return res;
+ }
+
+ /**
* @param c Connection.
* @param space Space.
* @param qry Query.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e975b7a9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 4e9bf31..0c9714d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -123,7 +123,8 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
// for (Map.Entry<Integer, FactPurchase> e : qx.createSqlQuery(FactPurchase.class, "1 = 1").execute().get())
// X.println("___ " + e);
- GridCacheTwoStepQuery q = new GridCacheTwoStepQuery("select cast(sum(x) as long) from _cnts_ where ? = ?", 1, 1);
+ GridCacheTwoStepQuery q = new GridCacheTwoStepQuery(null,
+ "select cast(sum(x) as long) from _cnts_ where ? = ?", 1, 1);
q.addMapQuery("_cnts_", "select count(*) x from \"partitioned\".FactPurchase where ? = ?", 2, 2);
[2/2] incubator-ignite git commit: ignite-484 - v1
Posted by se...@apache.org.
ignite-484 - v1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7000722c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7000722c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7000722c
Branch: refs/heads/ignite-484
Commit: 7000722cf550b4333eded7640d965583f2768bdf
Parents: e975b7a
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue May 12 08:19:40 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue May 12 08:19:40 2015 +0300
----------------------------------------------------------------------
.../affinity/AffinityTopologyVersion.java | 7 -
.../messages/GridQueryNextPageResponse.java | 39 ++++-
.../query/h2/twostep/GridMapQueryExecutor.java | 115 +++++++++++---
.../h2/twostep/GridReduceQueryExecutor.java | 156 +++++++++++--------
4 files changed, 223 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index 77f3359..650c047 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -76,13 +76,6 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
}
/**
- * @param topVer New topology version.
- */
- public void topologyVersion(long topVer) {
- this.topVer = topVer;
- }
-
- /**
* @return Minor topology version.
*/
public int minorTopologyVersion() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index 4fdc027..c2cca75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -33,6 +33,12 @@ public class GridQueryNextPageResponse implements Message {
private static final long serialVersionUID = 0L;
/** */
+ public static final byte CODE_OK = 0;
+
+ /** */
+ public static final byte CODE_RETRY = -1;
+
+ /** */
private long qryReqId;
/** */
@@ -55,6 +61,9 @@ public class GridQueryNextPageResponse implements Message {
@GridDirectTransient
private transient Collection<?> plainRows;
+ /** Response code. */
+ private byte code = CODE_OK;
+
/**
* For {@link Externalizable}.
*/
@@ -86,6 +95,20 @@ public class GridQueryNextPageResponse implements Message {
}
/**
+ * @return Response code.
+ */
+ public byte code() {
+ return code;
+ }
+
+ /**
+ * @param code Response code.
+ */
+ public void code(byte code) {
+ this.code = code;
+ }
+
+ /**
* @return Query request ID.
*/
public long queryRequestId() {
@@ -186,6 +209,12 @@ public class GridQueryNextPageResponse implements Message {
return false;
writer.incrementState();
+
+ case 6:
+ if (!writer.writeByte("code", code))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -247,6 +276,14 @@ public class GridQueryNextPageResponse implements Message {
reader.incrementState();
+ case 6:
+ code = reader.readByte("code");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return true;
@@ -259,6 +296,6 @@ public class GridQueryNextPageResponse implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index f15a2da..2483912 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -23,7 +23,9 @@ import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.query.h2.*;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
@@ -36,6 +38,7 @@ import org.h2.jdbc.*;
import org.h2.result.*;
import org.h2.store.*;
import org.h2.value.*;
+import org.jetbrains.annotations.*;
import org.jsr166.*;
import javax.cache.*;
@@ -198,6 +201,16 @@ public class GridMapQueryExecutor {
}
/**
+ * @param cacheName Cache name.
+ * @return Cache context or {@code null} if none.
+ */
+ @Nullable private GridCacheContext<?,?> cacheContext(String cacheName) {
+ GridCacheAdapter<?,?> cache = ctx.cache().internalCache(cacheName);
+
+ return cache == null ? null : cache.context();
+ }
+
+ /**
* Executing queries locally.
*
* @param node Node.
@@ -206,32 +219,75 @@ public class GridMapQueryExecutor {
private void onQueryRequest(ClusterNode node, GridQueryRequest req) {
ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
- Collection<GridCacheSqlQuery> qrys;
+ QueryResults qr = null;
+
+ List<GridDhtLocalPartition> reserved = new ArrayList<>();
try {
- qrys = req.queries();
+ Collection<GridCacheSqlQuery> qrys;
- if (!node.isLocal()) {
- Marshaller m = ctx.config().getMarshaller();
+ try {
+ qrys = req.queries();
+
+ if (!node.isLocal()) {
+ Marshaller m = ctx.config().getMarshaller();
- for (GridCacheSqlQuery qry : qrys)
- qry.unmarshallParams(m);
+ for (GridCacheSqlQuery qry : qrys)
+ qry.unmarshallParams(m);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
}
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- GridCacheContext<?,?> cctx = ctx.cache().internalCache(req.space()).context();
+ List<GridCacheContext<?,?>> cctxs = new ArrayList<>();
- QueryResults qr = new QueryResults(req.requestId(), qrys.size(), cctx);
+ for (String cacheName : F.concat(true, req.space(), req.extraSpaces())) {
+ GridCacheContext<?,?> cctx = cacheContext(cacheName);
- if (nodeRess.put(req.requestId(), qr) != null)
- throw new IllegalStateException();
+ if (cctx == null) { // Cache was not deployed yet.
+ sendRetry(node, req.requestId());
- h2.setFilters(h2.backupFilter());
+ return;
+ }
+ else
+ cctxs.add(cctx);
+ }
+
+ for (GridCacheContext<?,?> cctx : cctxs) { // Lock primary partitions.
+ // TODO how to get all partitions for topology version consistently?
+ List<GridDhtLocalPartition> parts = cctx.topology().localPartitions();
+ AffinityTopologyVersion affTopVer = cctx.topology().topologyVersion();
+
+ if (affTopVer.topologyVersion() != req.topologyVersion()) {
+ sendRetry(node, req.requestId());
+
+ return;
+ }
+
+ for (GridDhtLocalPartition part : parts) {
+ if (!part.primary(affTopVer))
+ continue;
+
+ if (!part.reserve()) {
+ sendRetry(node, req.requestId());
+
+ return;
+ }
+
+ reserved.add(part);
+ }
+ }
+
+ GridCacheContext<?,?> cctx = cctxs.get(0); // Main cache context.
+
+ qr = new QueryResults(req.requestId(), qrys.size(), cctx);
+
+ if (nodeRess.put(req.requestId(), qr) != null)
+ throw new IllegalStateException();
+
+ h2.setFilters(h2.backupFilter());
- try {
// TODO Prepare snapshots for all the needed tables before the run.
// Run queries.
@@ -276,9 +332,11 @@ public class GridMapQueryExecutor {
}
}
catch (Throwable e) {
- nodeRess.remove(req.requestId(), qr);
+ if (qr != null) {
+ nodeRess.remove(req.requestId(), qr);
- qr.cancel();
+ qr.cancel();
+ }
U.error(log, "Failed to execute local query: " + req, e);
@@ -289,6 +347,9 @@ public class GridMapQueryExecutor {
}
finally {
h2.setFilters(null);
+
+ for (GridDhtLocalPartition part : reserved)
+ part.release();
}
}
@@ -375,6 +436,24 @@ public class GridMapQueryExecutor {
}
/**
+ * @param node Node.
+ * @param reqId Request ID.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void sendRetry(ClusterNode node, long reqId) throws IgniteCheckedException {
+ boolean loc = node.isLocal();
+
+ GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
+ /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
+ loc ? null : Collections.<Message>emptyList(),
+ loc ? Collections.<Value[]>emptyList() : null);
+
+ msg.code(GridQueryNextPageResponse.CODE_RETRY);
+
+ ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
+ }
+
+ /**
* @param bytes Bytes.
* @return Rows.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 2e69286..3391c97 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -260,6 +260,9 @@ public class GridReduceQueryExecutor {
idx.addPage(page);
+ if (msg.code() == GridQueryNextPageResponse.CODE_RETRY)
+ r.retry = true;
+
if (msg.allRows() != -1) // Only the first page contains row count.
r.latch.countDown();
}
@@ -270,109 +273,123 @@ public class GridReduceQueryExecutor {
* @return Cursor.
*/
public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) {
- long qryReqId = reqIdGen.incrementAndGet();
+ for (int attempt = 0;; attempt++) {
+ long qryReqId = reqIdGen.incrementAndGet();
- QueryRun r = new QueryRun();
+ QueryRun r = new QueryRun();
- r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
+ r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
- r.tbls = new ArrayList<>(qry.mapQueries().size());
+ r.tbls = new ArrayList<>(qry.mapQueries().size());
- String space = cctx.name();
+ String space = cctx.name();
- r.conn = (JdbcConnection)h2.connectionForSpace(space);
+ r.conn = (JdbcConnection)h2.connectionForSpace(space);
- // TODO Add topology version.
- ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
+ final long topVer = ctx.cluster().get().topologyVersion();
- if (cctx.isReplicated() || qry.explain()) {
- assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node.";
+ // TODO get projection for this topology version.
+ ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
- // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
- dataNodes = dataNodes.forRandom();
- }
+ if (cctx.isReplicated() || qry.explain()) {
+ assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node.";
- final Collection<ClusterNode> nodes = dataNodes.nodes();
+ // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
+ dataNodes = dataNodes.forRandom();
+ }
- for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
- GridMergeTable tbl;
+ final Collection<ClusterNode> nodes = dataNodes.nodes();
- try {
- tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+ GridMergeTable tbl;
- GridMergeIndex idx = tbl.getScanIndex(null);
+ try {
+ tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
- for (ClusterNode node : nodes)
- idx.addSource(node.id());
+ GridMergeIndex idx = tbl.getScanIndex(null);
- r.tbls.add(tbl);
+ for (ClusterNode node : nodes)
+ idx.addSource(node.id());
- curFunTbl.set(tbl);
- }
+ r.tbls.add(tbl);
- r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
+ curFunTbl.set(tbl);
+ }
- runs.put(qryReqId, r);
+ r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
- try {
- Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
+ runs.put(qryReqId, r);
- if (qry.explain()) {
- mapQrys = new ArrayList<>(qry.mapQueries().size());
+ try {
+ Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
- for (GridCacheSqlQuery mapQry : qry.mapQueries())
- mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
- }
+ if (qry.explain()) {
+ mapQrys = new ArrayList<>(qry.mapQueries().size());
- if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
- Marshaller m = ctx.config().getMarshaller();
+ for (GridCacheSqlQuery mapQry : qry.mapQueries())
+ mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
+ }
- for (GridCacheSqlQuery mapQry : mapQrys)
- mapQry.marshallParams(m);
- }
+ if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
+ Marshaller m = ctx.config().getMarshaller();
- send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys,
- ctx.cluster().get().topologyVersion(),
- extraSpaces(space, qry.spaces())));
+ for (GridCacheSqlQuery mapQry : mapQrys)
+ mapQry.marshallParams(m);
+ }
+
+ send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer,
+ extraSpaces(space, qry.spaces())));
- r.latch.await();
+ U.await(r.latch);
- if (r.rmtErr != null)
- throw new CacheException("Failed to run map query remotely.", r.rmtErr);
+ if (r.rmtErr != null)
+ throw new CacheException("Failed to run map query remotely.", r.rmtErr);
- if (qry.explain())
- return explainPlan(r.conn, space, qry);
+ ResultSet res = null;
- GridCacheSqlQuery rdc = qry.reduceQuery();
+ if (!r.retry) {
+ if (qry.explain())
+ return explainPlan(r.conn, space, qry);
- final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
+ GridCacheSqlQuery rdc = qry.reduceQuery();
+
+ res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
+ }
- for (GridMergeTable tbl : r.tbls) {
- if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
- send(nodes, new GridQueryCancelRequest(qryReqId));
+ for (GridMergeTable tbl : r.tbls) {
+ if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
+ send(nodes, new GridQueryCancelRequest(qryReqId));
// dropTable(r.conn, tbl.getName()); TODO
- }
+ }
- return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
- }
- catch (IgniteCheckedException | InterruptedException | RuntimeException e) {
- U.closeQuiet(r.conn);
+ if (r.retry) {
+ if (attempt > 0)
+ U.sleep(attempt * 10);
+
+ continue;
+ }
+
+ return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ U.closeQuiet(r.conn);
- if (e instanceof CacheException)
- throw (CacheException)e;
+ if (e instanceof CacheException)
+ throw (CacheException)e;
- throw new CacheException("Failed to run reduce query locally.", e);
- }
- finally {
- if (!runs.remove(qryReqId, r))
- U.warn(log, "Query run was already removed: " + qryReqId);
+ throw new CacheException("Failed to run reduce query locally.", e);
+ }
+ finally {
+ if (!runs.remove(qryReqId, r))
+ U.warn(log, "Query run was already removed: " + qryReqId);
- curFunTbl.remove();
+ curFunTbl.remove();
+ }
}
}
@@ -680,6 +697,9 @@ public class GridReduceQueryExecutor {
/** */
private volatile CacheException rmtErr;
+
+ /** */
+ private volatile boolean retry;
}
/**