You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/26 19:26:56 UTC
[1/6] incubator-ignite git commit: Merge branches 'ignite-484' and
'ignite-sprint-4' of https://git-wip-us.apache.org/repos/asf/incubator-ignite
into ignite-484
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-484-1 [created] 357b4c06a
Merge branches 'ignite-484' and 'ignite-sprint-4' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ae4c4fc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ae4c4fc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ae4c4fc0
Branch: refs/heads/ignite-484-1
Commit: ae4c4fc0b6a8c1a53cc97c45a298a073ff3251ba
Parents: b1a512b cff73bf
Author: S.Vladykin <sv...@gridgain.com>
Authored: Thu May 21 08:23:08 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Thu May 21 08:23:08 2015 +0300
----------------------------------------------------------------------
modules/core/src/main/java/META-INF/LICENSE | 238 +++++++++++++++++++++++
modules/core/src/main/java/META-INF/NOTICE | 12 ++
parent/pom.xml | 1 -
pom.xml | 39 ++--
4 files changed, 278 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
[4/6] incubator-ignite git commit: Merge branch 'ignite-sprint-5' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1
Posted by se...@apache.org.
Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d827cbbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d827cbbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d827cbbc
Branch: refs/heads/ignite-484-1
Commit: d827cbbc4b29ade1ce1556c501c78f5d21ff4486
Parents: 20ec22b f39a3a9
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue May 26 07:20:34 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue May 26 07:20:34 2015 +0300
----------------------------------------------------------------------
dev-tools/slurp.sh | 8 -
dev-tools/src/main/groovy/jiraslurp.groovy | 186 ++++++++++++-------
.../visor/commands/node/VisorNodeCommand.scala | 2 +-
.../commands/tasks/VisorTasksCommand.scala | 2 +-
.../scala/org/apache/ignite/visor/visor.scala | 63 ++++++-
pom.xml | 11 ++
6 files changed, 191 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
[3/6] incubator-ignite git commit: ignite-484 - explicit partitions
list
Posted by se...@apache.org.
ignite-484 - explicit partitions list
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/20ec22b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/20ec22b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/20ec22b8
Branch: refs/heads/ignite-484-1
Commit: 20ec22b85cddf8dfb5c453758aa83dcb49fbbd32
Parents: a12aadf
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon May 25 03:20:54 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon May 25 03:20:54 2015 +0300
----------------------------------------------------------------------
.../processors/query/GridQueryIndexing.java | 5 ++-
.../processors/query/GridQueryProcessor.java | 4 +-
.../h2/twostep/messages/GridQueryRequest.java | 36 +++++++++++++--
.../processors/query/h2/IgniteH2Indexing.java | 43 ++++++++++++++++--
.../query/h2/twostep/GridMapQueryExecutor.java | 47 ++++++++++++++++++--
.../h2/twostep/GridReduceQueryExecutor.java | 33 +++++++++++++-
6 files changed, 151 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 6b1401d..216773e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -222,8 +222,11 @@ public interface GridQueryIndexing {
/**
* Returns backup filter.
*
+ * @param caches List of caches.
* @param topVer Topology version.
+ * @param parts Partitions.
* @return Backup filter.
*/
- public IndexingQueryFilter backupFilter(AffinityTopologyVersion topVer);
+ public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer,
+ List<int[]> parts);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index afd0386..202ec75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -603,7 +603,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new CacheException("Failed to find SQL table for type: " + type);
final GridCloseableIterator<IgniteBiTuple<K,V>> i = idx.query(space, sqlQry, F.asList(params), typeDesc,
- idx.backupFilter(null));
+ idx.backupFilter(null, null, null));
if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -670,7 +670,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
String sql = qry.getSql();
Object[] args = qry.getArgs();
- GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter(null));
+ GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter(null, null, null));
if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
ctx.event().record(new CacheQueryExecutedEvent<>(
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 2d53944..74b4392 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -54,7 +54,11 @@ public class GridQueryRequest implements Message {
/** */
@GridDirectCollection(String.class)
- private Collection<String> extraSpaces;
+ private List<String> extraSpaces;
+
+ /** */
+ @GridDirectCollection(int[].class)
+ private List<int[]> parts;
/**
* Default constructor.
@@ -70,6 +74,7 @@ public class GridQueryRequest implements Message {
* @param qrys Queries.
* @param topVer Topology version.
* @param extraSpaces All space names participating in query other than {@code space}.
+ * @param parts Optional partitions for unstable topology.
*/
public GridQueryRequest(
long reqId,
@@ -77,7 +82,8 @@ public class GridQueryRequest implements Message {
String space,
Collection<GridCacheSqlQuery> qrys,
AffinityTopologyVersion topVer,
- List<String> extraSpaces) {
+ List<String> extraSpaces,
+ List<int[]> parts) {
this.reqId = reqId;
this.pageSize = pageSize;
this.space = space;
@@ -85,12 +91,20 @@ public class GridQueryRequest implements Message {
this.qrys = qrys;
this.topVer = topVer;
this.extraSpaces = extraSpaces;
+ this.parts = parts;
+ }
+
+ /**
+ * @return All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
+ */
+ public List<int[]> partitions() {
+ return parts;
}
/**
* @return All extra space names participating in query other than {@link #space()}.
*/
- public Collection<String> extraSpaces() {
+ public List<String> extraSpaces() {
return extraSpaces;
}
@@ -181,6 +195,12 @@ public class GridQueryRequest implements Message {
return false;
writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("partitions", parts, MessageCollectionItemType.INT_ARR))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -241,6 +261,14 @@ public class GridQueryRequest implements Message {
return false;
reader.incrementState();
+
+ case 6:
+ parts = reader.readCollection("partitions", MessageCollectionItemType.INT_ARR);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return true;
@@ -253,6 +281,6 @@ public class GridQueryRequest implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 1cfc314..0ee9876 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1354,21 +1354,56 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public IndexingQueryFilter backupFilter(AffinityTopologyVersion topVer) {
+ @Override public IndexingQueryFilter backupFilter(
+ @Nullable final List<String> caches,
+ @Nullable final AffinityTopologyVersion topVer,
+ @Nullable final List<int[]> parts
+ ) {
final AffinityTopologyVersion topVer0 = topVer != null ? topVer : AffinityTopologyVersion.NONE;
return new IndexingQueryFilter() {
@Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) {
final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName);
- if (cache.context().isReplicated() || cache.configuration().getBackups() == 0)
+ if (cache.context().isReplicated() || (cache.configuration().getBackups() == 0 && parts == null))
return null;
+ final GridCacheAffinityManager aff = cache.context().affinity();
+
+ if (parts != null) {
+ int idx = caches.indexOf(spaceName);
+
+ final int[] parts0 = parts.get(idx);
+
+ if (parts0.length < 64) {
+ return new IgniteBiPredicate<K,V>() {
+ @Override public boolean apply(K k, V v) {
+ int p = aff.partition(k);
+
+ for (int p0 : parts0) {
+ if (p0 == p)
+ return true;
+ }
+
+ return false;
+ }
+ };
+ }
+
+ return new IgniteBiPredicate<K,V>() {
+ @Override public boolean apply(K k, V v) {
+ int p = aff.partition(k);
+
+ return Arrays.binarySearch(parts0, p) >= 0;
+ }
+ };
+ }
+
final ClusterNode locNode = ctx.discovery().localNode();
return new IgniteBiPredicate<K, V>() {
@Override public boolean apply(K k, V v) {
- return cache.context().affinity().primary(locNode, k, topVer0);
+ return aff.primary(locNode, k, topVer0);
}
};
}
@@ -1392,7 +1427,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @return Current topology version.
*/
public AffinityTopologyVersion topologyVersion() {
- return ctx.discovery().topologyVersionEx();
+ return ctx.cache().context().exchange().readyAffinityVersion();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 112949f..06bad76 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -217,16 +217,22 @@ public class GridMapQueryExecutor {
/**
* @param cacheNames Cache names.
* @param topVer Topology version.
+ * @param parts Explicit partitions.
* @param reserved Reserved list.
* @return {@code true} If all the needed partitions successfully reserved.
* @throws IgniteCheckedException If failed.
*/
- private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer,
+ private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, List<int[]> parts,
List<GridDhtLocalPartition> reserved) throws IgniteCheckedException {
+ assert parts == null || parts.size() == cacheNames.size();
+
+ int i = 0;
+
for (String cacheName : cacheNames) {
GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer);
- Set<Integer> partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+ Collection<Integer> partIds = parts != null ? wrap(parts.get(i++)) :
+ cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
for (int partId : partIds) {
GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
@@ -250,6 +256,37 @@ public class GridMapQueryExecutor {
}
/**
+ * @param ints Integers.
+ * @return Collection wrapper.
+ */
+ private static Collection<Integer> wrap(final int[] ints) {
+ return new AbstractCollection<Integer>() {
+ @Override public Iterator<Integer> iterator() {
+ return new Iterator<Integer>() {
+ /** */
+ private int i = 0;
+
+ @Override public boolean hasNext() {
+ return i < ints.length;
+ }
+
+ @Override public Integer next() {
+ return ints[i++];
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override public int size() {
+ return ints.length;
+ }
+ };
+ }
+
+ /**
* Executing queries locally.
*
* @param node Node.
@@ -280,6 +317,8 @@ public class GridMapQueryExecutor {
throw new CacheException(e);
}
+ List<String> caches = (List<String>)F.concat(true, req.space(), req.extraSpaces());
+
// Topology version can be null in rolling restart with previous version!
final AffinityTopologyVersion topVer = req.topologyVersion();
@@ -288,7 +327,7 @@ public class GridMapQueryExecutor {
h2.awaitForCacheAffinity(topVer);
// Reserve primary partitions.
- if (!reservePartitions(F.concat(true, req.space(), req.extraSpaces()), topVer, reserved)) {
+ if (!reservePartitions(caches, topVer, req.partitions(), reserved)) {
sendRetry(node, req.requestId());
return;
@@ -303,7 +342,7 @@ public class GridMapQueryExecutor {
if (nodeRess.put(req.requestId(), qr) != null)
throw new IllegalStateException();
- h2.setFilters(h2.backupFilter(topVer));
+ h2.setFilters(h2.backupFilter(caches, topVer, req.partitions()));
// TODO Prepare snapshots for all the needed tables before the run.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index eb6db88..0836a75 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -279,6 +279,21 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param set Set.
+ * @return Array.
+ */
+ private static int[] unbox(Set<Integer> set) {
+ int[] arr = new int[set.size()];
+
+ int i = 0;
+
+ for (int x : set)
+ arr[i++] = x;
+
+ return arr;
+ }
+
+ /**
* @param cctx Cache context.
* @param qry Query.
* @return Cursor.
@@ -304,12 +319,27 @@ public class GridReduceQueryExecutor {
if (F.isEmpty(nodes))
throw new CacheException("No data nodes found for cache: " + space);
+ List<String> extraSpaces = extraSpaces(space, qry.spaces());
+
+ List<int[]> parts = null;
+
if (cctx.isReplicated() || qry.explain()) {
assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
// Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
nodes = Collections.singleton(F.rand(nodes));
}
+ else if (ctx.cache().context().exchange().hasPendingExchange()) { // TODO isActive ??
+ parts = new ArrayList<>(extraSpaces == null ? 1 : extraSpaces.size() + 1);
+
+ parts.add(unbox(cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+
+ if (extraSpaces != null) {
+ for (String extraSpace : extraSpaces)
+ parts.add(unbox(ctx.cache().internalCache(extraSpace).context()
+ .affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+ }
+ }
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
GridMergeTable tbl;
@@ -355,8 +385,7 @@ public class GridReduceQueryExecutor {
boolean ok = false;
try {
- send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer,
- extraSpaces(space, qry.spaces())));
+ send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, parts));
ok = true;
}
[5/6] incubator-ignite git commit: Merge branch 'ignite-sprint-5' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1
Posted by se...@apache.org.
Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1
# Conflicts:
# modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ebde2802
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ebde2802
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ebde2802
Branch: refs/heads/ignite-484-1
Commit: ebde2802e3af1e9c281cb586ad205f7cfe42a9c2
Parents: d827cbb c75caba
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue May 26 17:25:24 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue May 26 17:25:24 2015 +0300
----------------------------------------------------------------------
dev-tools/src/main/groovy/jiraslurp.groovy | 36 ++++++++------
.../processors/cache/IgniteCacheProxy.java | 3 +-
.../GridDistributedTxRemoteAdapter.java | 10 ++--
.../processors/query/GridQueryIndexing.java | 16 ++++++
.../processors/query/GridQueryProcessor.java | 52 +++++++-------------
.../query/h2/GridH2IndexingGeoSelfTest.java | 20 +++++++-
.../processors/query/h2/IgniteH2Indexing.java | 17 +++++++
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 18 +++++--
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 4 +-
.../query/h2/sql/GridSqlOperationType.java | 2 +-
.../IgniteCacheQueryMultiThreadedSelfTest.java | 2 +-
.../local/IgniteCacheLocalQuerySelfTest.java | 6 +++
12 files changed, 123 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebde2802/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebde2802/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebde2802/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
[2/6] incubator-ignite git commit: Merge branch 'ignite-sprint-5' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484
Posted by se...@apache.org.
Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a12aadfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a12aadfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a12aadfb
Branch: refs/heads/ignite-484-1
Commit: a12aadfbd7af84104e45ff77492317420d8e11d5
Parents: ae4c4fc 050f429
Author: S.Vladykin <sv...@gridgain.com>
Authored: Sun May 24 23:20:14 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Sun May 24 23:20:14 2015 +0300
----------------------------------------------------------------------
.gitignore | 3 +-
DEVNOTES.txt | 32 +-
bin/ignite-schema-import.bat | 2 +-
bin/ignite-schema-import.sh | 2 +-
bin/ignite.bat | 2 +-
bin/ignite.sh | 2 +-
bin/ignitevisorcmd.bat | 2 +-
bin/ignitevisorcmd.sh | 2 +-
bin/include/functions.sh | 2 +-
dev-tools/.gitignore | 2 +
dev-tools/build.gradle | 54 +
dev-tools/gradle/wrapper/gradle-wrapper.jar | Bin 0 -> 51017 bytes
.../gradle/wrapper/gradle-wrapper.properties | 6 +
dev-tools/gradlew | 164 +
dev-tools/slurp.sh | 74 +
dev-tools/src/main/groovy/jiraslurp.groovy | 431 ++
examples/pom.xml | 2 +-
.../streaming/StreamTransformerExample.java | 4 +-
.../streaming/StreamVisitorExample.java | 4 +-
.../ignite/examples/streaming/package-info.java | 1 -
.../streaming/wordcount/CacheConfig.java | 2 +-
.../streaming/wordcount/QueryWords.java | 12 +-
.../streaming/wordcount/StreamWords.java | 12 +-
.../streaming/wordcount/package-info.java | 1 -
.../socket/WordsSocketStreamerClient.java | 82 +
.../socket/WordsSocketStreamerServer.java | 124 +
.../wordcount/socket/package-info.java | 21 +
modules/aop/pom.xml | 2 +-
modules/aws/pom.xml | 2 +-
modules/clients/pom.xml | 2 +-
.../ClientAbstractConnectivitySelfTest.java | 14 +
modules/cloud/pom.xml | 4 +-
.../TcpDiscoveryCloudIpFinderSelfTest.java | 2 -
modules/codegen/pom.xml | 2 +-
modules/core/pom.xml | 2 +-
.../internal/GridEventConsumeHandler.java | 26 +
.../apache/ignite/internal/IgniteKernal.java | 26 +-
.../org/apache/ignite/internal/IgnitionEx.java | 136 +-
.../internal/direct/DirectByteBufferStream.java | 4 +-
.../interop/InteropAwareEventFilter.java | 37 +
.../internal/interop/InteropBootstrap.java | 34 +
.../interop/InteropBootstrapFactory.java | 39 +
.../internal/interop/InteropException.java | 71 +
.../internal/interop/InteropIgnition.java | 166 +
.../interop/InteropLocalEventListener.java | 28 +
.../interop/InteropNoCallbackException.java | 50 +
.../internal/interop/InteropProcessor.java | 36 +
.../managers/communication/GridIoManager.java | 6 +-
.../GridLifecycleAwareMessageFilter.java | 5 +-
.../eventstorage/GridEventStorageManager.java | 29 +-
.../processors/cache/GridCacheAdapter.java | 169 +-
.../processors/cache/GridCacheContext.java | 7 +
.../cache/GridCacheEvictionManager.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 320 +-
.../processors/cache/GridCacheMapEntry.java | 105 +-
.../processors/cache/GridCacheMessage.java | 8 +-
.../processors/cache/GridCacheMvccManager.java | 4 +-
.../GridCachePartitionExchangeManager.java | 4 +-
.../processors/cache/GridCacheProxyImpl.java | 24 -
.../processors/cache/GridCacheSwapManager.java | 215 +-
.../processors/cache/IgniteInternalCache.java | 27 -
.../GridDistributedCacheAdapter.java | 210 +-
.../distributed/GridDistributedLockRequest.java | 111 +-
.../GridDistributedTxFinishRequest.java | 70 +-
.../distributed/GridDistributedTxMapping.java | 5 +-
.../GridDistributedTxPrepareRequest.java | 112 +-
.../GridDistributedTxRemoteAdapter.java | 20 +-
.../distributed/dht/GridDhtCacheAdapter.java | 22 +-
.../distributed/dht/GridDhtLockFuture.java | 2 -
.../distributed/dht/GridDhtLockRequest.java | 45 +-
.../dht/GridDhtOffHeapCacheEntry.java | 63 +
.../dht/GridDhtTransactionalCacheAdapter.java | 15 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 3 -
.../distributed/dht/GridDhtTxFinishRequest.java | 43 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 38 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 95 +-
.../cache/distributed/dht/GridDhtTxMapping.java | 2 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 100 +-
.../dht/GridDhtTxPrepareRequest.java | 60 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 8 +-
.../distributed/dht/GridNoStorageCacheMap.java | 4 +-
.../dht/atomic/GridDhtAtomicCache.java | 7 +-
.../atomic/GridDhtAtomicOffHeapCacheEntry.java | 63 +
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 8 +
.../dht/atomic/GridNearAtomicUpdateFuture.java | 44 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 36 +-
.../atomic/GridNearAtomicUpdateResponse.java | 18 +-
.../dht/colocated/GridDhtColocatedCache.java | 5 +-
.../colocated/GridDhtColocatedLockFuture.java | 37 +-
.../GridDhtColocatedOffHeapCacheEntry.java | 63 +
.../colocated/GridDhtDetachedCacheEntry.java | 4 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 6 +
.../dht/preloader/GridDhtForceKeysResponse.java | 54 +-
.../GridDhtPartitionsExchangeFuture.java | 2 +-
.../distributed/near/GridNearCacheAdapter.java | 13 +-
.../distributed/near/GridNearCacheEntry.java | 6 +-
.../distributed/near/GridNearGetResponse.java | 8 +-
.../distributed/near/GridNearLockFuture.java | 11 -
.../distributed/near/GridNearLockRequest.java | 61 +-
.../near/GridNearOffHeapCacheEntry.java | 60 +
.../near/GridNearOptimisticTxPrepareFuture.java | 768 ++
.../GridNearPessimisticTxPrepareFuture.java | 347 +
.../near/GridNearTransactionalCache.java | 4 -
.../near/GridNearTxFinishRequest.java | 28 +-
.../cache/distributed/near/GridNearTxLocal.java | 109 +-
.../near/GridNearTxPrepareFuture.java | 1050 ---
.../near/GridNearTxPrepareFutureAdapter.java | 231 +
.../near/GridNearTxPrepareRequest.java | 52 +-
.../near/GridNearTxPrepareResponse.java | 28 +-
.../distributed/near/GridNearTxRemote.java | 24 +-
.../processors/cache/local/GridLocalCache.java | 8 +-
.../cache/local/GridLocalCacheEntry.java | 18 +
.../local/atomic/GridLocalAtomicCache.java | 27 +-
.../cache/query/GridCacheQueryManager.java | 21 +-
.../cache/transactions/IgniteInternalTx.java | 14 +-
.../transactions/IgniteTransactionsImpl.java | 4 +-
.../cache/transactions/IgniteTxAdapter.java | 74 +-
.../cache/transactions/IgniteTxEntry.java | 48 +-
.../cache/transactions/IgniteTxHandler.java | 74 +-
.../transactions/IgniteTxLocalAdapter.java | 183 +-
.../cache/transactions/IgniteTxLocalEx.java | 21 +-
.../cache/transactions/IgniteTxManager.java | 98 +-
.../processors/igfs/IgfsDeleteWorker.java | 4 +
.../offheap/GridOffHeapProcessor.java | 17 +
.../processors/resource/GridResourceField.java | 16 +-
.../processors/resource/GridResourceIoc.java | 389 +-
.../processors/resource/GridResourceMethod.java | 13 +
.../resource/GridResourceProcessor.java | 20 +-
.../ignite/internal/util/IgniteUtils.java | 28 +-
.../util/lang/GridComputeJobWrapper.java | 96 -
.../util/lang/GridFilteredIterator.java | 2 +-
.../ignite/internal/util/lang/GridFunc.java | 7218 +++++-------------
.../internal/util/nio/GridBufferedParser.java | 4 -
.../internal/util/nio/GridDelimitedParser.java | 91 +
.../util/nio/GridNioDelimitedBuffer.java | 106 +
.../util/offheap/GridOffHeapPartitionedMap.java | 9 +
.../unsafe/GridUnsafePartitionedMap.java | 155 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 70 +-
.../discovery/tcp/TcpDiscoverySpiAdapter.java | 12 +-
.../org/apache/ignite/stream/StreamAdapter.java | 111 +
.../ignite/stream/StreamTupleExtractor.java | 33 +
.../stream/socket/SocketMessageConverter.java | 31 +
.../ignite/stream/socket/SocketStreamer.java | 218 +
.../ignite/stream/socket/package-info.java | 21 +
.../resources/META-INF/classnames.properties | 1 -
.../core/src/main/resources/ignite.properties | 2 +-
.../cache/CacheOffheapMapEntrySelfTest.java | 168 +
.../cache/CacheRemoveAllSelfTest.java | 81 +
.../GridCacheAbstractFailoverSelfTest.java | 8 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 227 +-
.../cache/GridCacheAbstractSelfTest.java | 4 +-
.../cache/GridCacheSwapReloadSelfTest.java | 20 +-
.../cache/IgniteCacheNearLockValueSelfTest.java | 145 +
.../IgniteCacheP2pUnmarshallingErrorTest.java | 189 +
...gniteCacheP2pUnmarshallingNearErrorTest.java | 56 +
...CacheP2pUnmarshallingRebalanceErrorTest.java | 80 +
.../IgniteCacheP2pUnmarshallingTxErrorTest.java | 109 +
.../cache/IgniteCachePeekModesAbstractTest.java | 15 +-
.../cache/OffHeapTieredTransactionSelfTest.java | 127 +
...CacheLoadingConcurrentGridStartSelfTest.java | 49 +-
.../GridCacheAbstractNodeRestartSelfTest.java | 11 +-
.../distributed/GridCacheLockAbstractTest.java | 2 -
.../distributed/IgniteTxGetAfterStopTest.java | 131 +
...icOffHeapTieredMultiNodeFullApiSelfTest.java | 43 +
...achePartitionedNearDisabledLockSelfTest.java | 47 +
...ionedNearDisabledOffHeapFullApiSelfTest.java | 8 +-
...DisabledOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...abledOffHeapTieredAtomicFullApiSelfTest.java | 56 +
...earDisabledOffHeapTieredFullApiSelfTest.java | 33 +
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +
...ePrimaryNodeFailureRecoveryAbstractTest.java | 4 +-
...idCacheAtomicReplicatedFailoverSelfTest.java | 6 +
...CacheAtomicOffHeapTieredFullApiSelfTest.java | 32 +
...icOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +
...yWriteOrderOffHeapTieredFullApiSelfTest.java | 33 +
...erOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +
...achePartitionedMultiNodeFullApiSelfTest.java | 15 +-
...dCachePartitionedOffHeapFullApiSelfTest.java | 8 +-
...titionedOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...PartitionedOffHeapTieredFullApiSelfTest.java | 32 +
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 72 +
.../GridCachePartitionedTxSalvageSelfTest.java | 25 +-
.../near/IgniteCacheNearOnlyTxTest.java | 190 +
.../near/NoneRebalanceModeSelfTest.java | 67 +
.../GridCacheReplicatedFailoverSelfTest.java | 6 +
.../GridCacheReplicatedLockSelfTest.java | 5 +
.../GridCacheReplicatedNodeRestartSelfTest.java | 80 +
...idCacheReplicatedOffHeapFullApiSelfTest.java | 8 +-
...plicatedOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...eReplicatedOffHeapTieredFullApiSelfTest.java | 33 +
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +
.../RandomEvictionPolicyCacheSizeSelfTest.java | 72 +
...LocalAtomicOffHeapTieredFullApiSelfTest.java | 32 +
.../GridCacheLocalIsolatedNodesSelfTest.java | 18 +-
.../GridCacheLocalOffHeapFullApiSelfTest.java | 6 +-
...dCacheLocalOffHeapTieredFullApiSelfTest.java | 32 +
.../util/nio/GridNioDelimitedBufferTest.java | 112 +
...idFileSwapSpaceSpiMultithreadedLoadTest.java | 4 +-
.../discovery/tcp/TcpDiscoveryRestartTest.java | 199 +
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 65 +-
.../stream/socket/SocketStreamerSelfTest.java | 315 +
.../ignite/stream/socket/package-info.java | 21 +
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +
.../IgniteCacheEvictionSelfTestSuite.java | 1 +
.../IgniteCacheFailoverTestSuite.java | 10 +-
.../IgniteCacheFullApiSelfTestSuite.java | 18 +
...gniteCacheP2pUnmarshallingErrorTestSuit.java | 41 +
.../testsuites/IgniteCacheRestartTestSuite.java | 8 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 293 +-
.../testsuites/IgniteCacheTestSuite2.java | 144 +
.../testsuites/IgniteCacheTestSuite3.java | 142 +
.../testsuites/IgniteCacheTestSuite4.java | 135 +
.../testsuites/IgniteStreamTestSuite.java | 39 +
.../testsuites/IgniteUtilSelfTestSuite.java | 1 +
modules/extdata/p2p/pom.xml | 2 +-
modules/extdata/uri/pom.xml | 2 +-
modules/gce/pom.xml | 4 +-
modules/geospatial/pom.xml | 2 +-
modules/hadoop/pom.xml | 2 +-
.../processors/hadoop/v2/HadoopV2Context.java | 10 +-
.../testsuites/IgniteHadoopTestSuite.java | 2 +-
modules/hibernate/pom.xml | 2 +-
modules/indexing/pom.xml | 2 +-
.../cache/GridCacheOffHeapAndSwapSelfTest.java | 11 +-
.../cache/GridCacheOffHeapSelfTest.java | 11 +-
.../cache/GridCacheOffheapIndexGetSelfTest.java | 111 +
...niteCacheP2pUnmarshallingQueryErrorTest.java | 56 +
...eQueryMultiThreadedOffHeapTiredSelfTest.java | 37 +
.../IgniteCacheQueryMultiThreadedSelfTest.java | 29 +-
.../IgniteCacheQuerySelfTestSuite.java | 4 +
.../IgniteCacheWithIndexingTestSuite.java | 2 +
modules/jcl/pom.xml | 2 +-
modules/jta/pom.xml | 2 +-
.../processors/cache/jta/CacheJtaManager.java | 4 +-
modules/log4j/pom.xml | 2 +-
modules/rest-http/pom.xml | 2 +-
modules/scalar/pom.xml | 2 +-
.../ignite/scalar/ScalarConversions.scala | 8 -
.../scalar/tests/ScalarCacheQueriesSpec.scala | 154 +-
.../ignite/scalar/tests/ScalarCacheSpec.scala | 23 +-
.../scalar/tests/ScalarConversionsSpec.scala | 43 +-
.../scalar/tests/ScalarProjectionSpec.scala | 128 +-
.../scalar/tests/ScalarReturnableSpec.scala | 41 +-
modules/schedule/pom.xml | 2 +-
modules/schema-import/pom.xml | 2 +-
.../ignite/schema/generator/CodeGenerator.java | 6 +-
modules/slf4j/pom.xml | 2 +-
modules/spring/pom.xml | 2 +-
modules/ssh/pom.xml | 2 +-
modules/tools/pom.xml | 2 +-
modules/urideploy/pom.xml | 2 +-
modules/visor-console/pom.xml | 4 +-
.../ignite/visor/VisorRuntimeBaseSpec.scala | 2 +-
.../visor/commands/VisorArgListSpec.scala | 60 +-
.../commands/VisorFileNameCompleterSpec.scala | 34 +-
.../commands/ack/VisorAckCommandSpec.scala | 20 +-
.../commands/alert/VisorAlertCommandSpec.scala | 68 +-
.../cache/VisorCacheClearCommandSpec.scala | 48 +-
.../commands/cache/VisorCacheCommandSpec.scala | 66 +-
.../config/VisorConfigurationCommandSpec.scala | 8 +-
.../cswap/VisorCacheSwapCommandSpec.scala | 24 +-
.../deploy/VisorDeployCommandSpec.scala | 10 +-
.../disco/VisorDiscoveryCommandSpec.scala | 46 +-
.../events/VisorEventsCommandSpec.scala | 28 +-
.../visor/commands/gc/VisorGcCommandSpec.scala | 30 +-
.../commands/help/VisorHelpCommandSpec.scala | 57 +-
.../commands/kill/VisorKillCommandSpec.scala | 58 +-
.../commands/log/VisorLogCommandSpec.scala | 10 +-
.../commands/mem/VisorMemoryCommandSpec.scala | 77 +-
.../commands/node/VisorNodeCommandSpec.scala | 22 +-
.../commands/open/VisorOpenCommandSpec.scala | 16 +-
.../commands/ping/VisorPingCommandSpec.scala | 16 +-
.../commands/start/VisorStartCommandSpec.scala | 126 +-
.../commands/tasks/VisorTasksCommandSpec.scala | 112 +-
.../commands/top/VisorTopologyCommandSpec.scala | 52 +-
.../commands/vvm/VisorVvmCommandSpec.scala | 30 +-
modules/visor-plugins/pom.xml | 2 +-
modules/web/pom.xml | 2 +-
.../config/benchmark-client-mode.properties | 89 +
modules/yardstick/pom.xml | 2 +-
parent/pom.xml | 4 +
pom.xml | 39 +-
282 files changed, 12047 insertions(+), 10011 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a12aadfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a12aadfb/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
[6/6] incubator-ignite git commit: ignite-484-1 - per partition
mapping on unstable topology
Posted by se...@apache.org.
ignite-484-1 - per partition mapping on unstable topology
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/357b4c06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/357b4c06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/357b4c06
Branch: refs/heads/ignite-484-1
Commit: 357b4c06a9ff7e3557eb765b5266b46fd9742bba
Parents: ebde280
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue May 26 20:26:40 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue May 26 20:26:40 2015 +0300
----------------------------------------------------------------------
.../h2/twostep/messages/GridQueryRequest.java | 20 ++
.../processors/query/h2/IgniteH2Indexing.java | 9 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 5 +-
.../h2/twostep/GridReduceQueryExecutor.java | 183 +++++++++++++------
4 files changed, 159 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 74b4392..99ef094 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -95,6 +95,19 @@ public class GridQueryRequest implements Message {
}
/**
+ * @param cp Copy from.
+ */
+ public GridQueryRequest(GridQueryRequest cp) {
+ this.reqId = cp.reqId;
+ this.pageSize = cp.pageSize;
+ this.space = cp.space;
+ this.qrys = cp.qrys;
+ this.topVer = cp.topVer;
+ this.extraSpaces = cp.extraSpaces;
+ this.parts = cp.parts;
+ }
+
+ /**
* @return All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
*/
public List<int[]> partitions() {
@@ -102,6 +115,13 @@ public class GridQueryRequest implements Message {
}
/**
+ * @param parts All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
+ */
+ public void partitions(List<int[]> parts) {
+ this.parts = parts;
+ }
+
+ /**
* @return All extra space names participating in query other than {@link #space()}.
*/
public List<String> extraSpaces() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 67b4874..ffedfb3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1375,7 +1375,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final int[] parts0 = parts.get(idx);
- if (parts0.length < 64) {
+ if (parts0.length < 64) { // Fast scan for small arrays.
return new IgniteBiPredicate<K,V>() {
@Override public boolean apply(K k, V v) {
int p = aff.partition(k);
@@ -1383,6 +1383,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
for (int p0 : parts0) {
if (p0 == p)
return true;
+
+ if (p0 > p) // Array is sorted.
+ return false;
}
return false;
@@ -1424,9 +1427,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * @return Current topology version.
+ * @return Ready topology version.
*/
- public AffinityTopologyVersion topologyVersion() {
+ public AffinityTopologyVersion readyTopologyVersion() {
return ctx.cache().context().exchange().readyAffinityVersion();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 06bad76..9d9060a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -260,6 +260,9 @@ public class GridMapQueryExecutor {
* @return Collection wrapper.
*/
private static Collection<Integer> wrap(final int[] ints) {
+ if (F.isEmpty(ints))
+ return Collections.emptySet();
+
return new AbstractCollection<Integer>() {
@Override public Iterator<Integer> iterator() {
return new Iterator<Integer>() {
@@ -503,7 +506,7 @@ public class GridMapQueryExecutor {
loc ? null : Collections.<Message>emptyList(),
loc ? Collections.<Value[]>emptyList() : null);
- msg.retry(h2.topologyVersion());
+ msg.retry(h2.readyTopologyVersion());
ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 0836a75..c445844 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -146,7 +146,7 @@ public class GridReduceQueryExecutor {
for (GridMergeTable tbl : r.tbls) {
if (tbl.getScanIndex(null).hasSource(nodeId)) {
// Will attempt to retry. If reduce query was started it will fail on next page fetching.
- retry(r, h2.topologyVersion(), nodeId);
+ retry(r, h2.readyTopologyVersion(), nodeId);
break;
}
@@ -283,6 +283,9 @@ public class GridReduceQueryExecutor {
* @return Array.
*/
private static int[] unbox(Set<Integer> set) {
+ if (set == null)
+ return null;
+
int[] arr = new int[set.size()];
int i = 0;
@@ -294,6 +297,20 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param readyTop Latest ready topology.
+ * @return {@code true} If preloading is active.
+ */
+ private boolean isPreloadingActive(AffinityTopologyVersion readyTop) {
+ AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx();
+
+ int res = readyTop.compareTo(freshTop);
+
+ assert res <= 0 : readyTop + " " + freshTop;
+
+ return res < 0;
+ }
+
+ /**
* @param cctx Cache context.
* @param qry Query.
* @return Cursor.
@@ -312,7 +329,7 @@ public class GridReduceQueryExecutor {
r.conn = (JdbcConnection)h2.connectionForSpace(space);
- AffinityTopologyVersion topVer = h2.topologyVersion();
+ AffinityTopologyVersion topVer = h2.readyTopologyVersion();
Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(space, topVer);
@@ -321,7 +338,8 @@ public class GridReduceQueryExecutor {
List<String> extraSpaces = extraSpaces(space, qry.spaces());
- List<int[]> parts = null;
+ // Explicit partition mapping for unstable topology: {nodeId -> {cacheName -> {parts}}}
+ Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap = null;
if (cctx.isReplicated() || qry.explain()) {
assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
@@ -329,16 +347,17 @@ public class GridReduceQueryExecutor {
// Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
nodes = Collections.singleton(F.rand(nodes));
}
- else if (ctx.cache().context().exchange().hasPendingExchange()) { // TODO isActive ??
- parts = new ArrayList<>(extraSpaces == null ? 1 : extraSpaces.size() + 1);
+ else if (isPreloadingActive(topVer)) {
+ gridPartsMap = new HashMap<>(nodes.size(), 1f);
- parts.add(unbox(cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+ collectPartitionOwners(gridPartsMap, cctx);
if (extraSpaces != null) {
for (String extraSpace : extraSpaces)
- parts.add(unbox(ctx.cache().internalCache(extraSpace).context()
- .affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+ collectPartitionOwners(gridPartsMap, ctx.cache().internalCache(extraSpace).context());
}
+
+ nodes = gridPartsMap.keySet();
}
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
@@ -382,34 +401,23 @@ public class GridReduceQueryExecutor {
mapQry.marshallParams(m);
}
- boolean ok = false;
-
- try {
- send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, parts));
+ send(nodes,
+ new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null),
+ gridPartsMap);
- ok = true;
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send query request to nodes: " + nodes);
- }
+ U.await(r.latch);
AffinityTopologyVersion retry = null;
- if (ok) { // Sent successfully.
- U.await(r.latch);
-
- Object state = r.state.get();
+ Object state = r.state.get();
- if (state != null) {
- if (state instanceof CacheException)
- throw new CacheException("Failed to run map query remotely.", (CacheException)state);
+ if (state != null) {
+ if (state instanceof CacheException)
+ throw new CacheException("Failed to run map query remotely.", (CacheException)state);
- if (state instanceof AffinityTopologyVersion)
- retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
- }
+ if (state instanceof AffinityTopologyVersion)
+ retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry.
}
- else // Send failed -> retry.
- retry = h2.topologyVersion();
ResultSet res = null;
@@ -423,14 +431,8 @@ public class GridReduceQueryExecutor {
}
for (GridMergeTable tbl : r.tbls) {
- if (!tbl.getScanIndex(null).fetchedAll()) { // We have to explicitly cancel queries on remote nodes.
- try {
- send(nodes, new GridQueryCancelRequest(qryReqId));
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send cancel request to nodes: " + nodes);
- }
- }
+ if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
+ send(nodes, new GridQueryCancelRequest(qryReqId), null);
// dropTable(r.conn, tbl.getName()); TODO
}
@@ -461,6 +463,48 @@ public class GridReduceQueryExecutor {
}
/**
+ * Collects actual partition owners for the cache context int the given map.
+ *
+ * @param gridPartsMap Target map.
+ * @param cctx Cache context.
+ */
+ private void collectPartitionOwners(
+ Map<ClusterNode,Map<String,Set<Integer>>> gridPartsMap,
+ GridCacheContext<?,?> cctx
+ ) {
+ int partsCnt = cctx.affinity().partitions();
+
+ for (int p = 0; p < partsCnt; p++) {
+ // We don't care about exact topology version here, we just need to get all the needed partition
+ // owners in actual state.
+ List<ClusterNode> owners = cctx.topology().owners(p);
+
+ if (F.isEmpty(owners))
+ continue; // All primary and backup nodes are dead now for this partition. We sorrow.
+
+ ClusterNode owner = F.rand(owners);
+
+ Map<String, Set<Integer>> nodePartsMap = gridPartsMap.get(owner);
+
+ if (nodePartsMap == null) {
+ nodePartsMap = new HashMap<>();
+
+ gridPartsMap.put(owner, nodePartsMap);
+ }
+
+ Set<Integer> parts = nodePartsMap.get(cctx.name());
+
+ if (parts == null) {
+ parts = new TreeSet<>(); // We need them sorted.
+
+ nodePartsMap.put(cctx.name(), parts);
+ }
+
+ parts.add(p);
+ }
+ }
+
+ /**
* @param mainSpace Main space.
* @param allSpaces All spaces.
* @return List of all extra spaces or {@code null} if none.
@@ -531,33 +575,64 @@ public class GridReduceQueryExecutor {
/**
* @param nodes Nodes.
* @param msg Message.
- * @throws IgniteCheckedException If failed.
+ * @param gridPartsMap Partitions.
*/
- private void send(Collection<ClusterNode> nodes, Message msg) throws IgniteCheckedException {
+ private void send(
+ Collection<ClusterNode> nodes,
+ Message msg,
+ Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap
+ ) {
+ boolean locNodeFound = false;
+
for (ClusterNode node : nodes) {
if (node.isLocal()) {
- if (nodes.size() > 1) {
- ArrayList<ClusterNode> remotes = new ArrayList<>(nodes.size() - 1);
+ locNodeFound = true;
- for (ClusterNode node0 : nodes) {
- if (!node0.isLocal())
- remotes.add(node0);
- }
+ continue;
+ }
- assert remotes.size() == nodes.size() - 1;
+ try {
+ ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message to node: " + node, e);
+ }
+ }
- ctx.io().send(remotes, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
- }
+ if (locNodeFound) // Local node goes the last to allow parallel execution.
+ h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), gridPartsMap));
+ }
+
+ /**
+ * @param msg Message to copy.
+ * @param node Node.
+ * @param gridPartsMap Partitions map.
+ * @return Copy of message with partitions set.
+ */
+ private Message copy(Message msg, ClusterNode node, Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap) {
+ if (gridPartsMap == null)
+ return msg;
- // Local node goes the last to allow parallel execution.
- h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg);
+ Map<String,Set<Integer>> nodeParts = gridPartsMap.get(node);
- return;
- }
+ assert nodeParts != null;
+
+ GridQueryRequest req = (GridQueryRequest)msg;
+
+ List<int[]> parts = new ArrayList<>(nodeParts.size());
+
+ parts.add(unbox(nodeParts.get(req.space())));
+
+ if (req.extraSpaces() != null) {
+ for (String extraSpace : req.extraSpaces())
+ parts.add(unbox(nodeParts.get(extraSpace)));
}
- // All the given nodes are remotes.
- ctx.io().send(nodes, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL);
+ GridQueryRequest res = new GridQueryRequest(req);
+
+ res.partitions(parts);
+
+ return res;
}
/**