You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/05/27 18:40:24 UTC
[1/4] incubator-ignite git commit: # ignite-456: format jira comment
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-389 9bb71bafe -> 5a7dd02f5
# ignite-456: format jira comment
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/26b1d0d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/26b1d0d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/26b1d0d3
Branch: refs/heads/ignite-389
Commit: 26b1d0d346bf42abf82ecb48c8c11ebdbe61f659
Parents: c75caba
Author: artem.shutak <as...@gridgain.com>
Authored: Tue May 26 21:10:24 2015 +0300
Committer: artem.shutak <as...@gridgain.com>
Committed: Tue May 26 21:10:24 2015 +0300
----------------------------------------------------------------------
dev-tools/src/main/groovy/jiraslurp.groovy | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/26b1d0d3/dev-tools/src/main/groovy/jiraslurp.groovy
----------------------------------------------------------------------
diff --git a/dev-tools/src/main/groovy/jiraslurp.groovy b/dev-tools/src/main/groovy/jiraslurp.groovy
index 93349ef..610060f 100644
--- a/dev-tools/src/main/groovy/jiraslurp.groovy
+++ b/dev-tools/src/main/groovy/jiraslurp.groovy
@@ -407,10 +407,17 @@ def runAllTestBuilds = {builds, jiraNum ->
}
}
+ // Format comment for jira.
def triggeredBuildsComment = "There was triggered next test builds for last attached patch-file:\\n"
+ def n = 1;
+
triggeredBuilds.each { name, url ->
- triggeredBuildsComment += "${name as String} - ${url as String}\\n"
+ def prefix = n < 10 ? "0" : ""
+
+ triggeredBuildsComment += "${prefix}${n}. ${url as String} - ${name as String}\\n"
+
+ n++
}
addJiraComment(jiraNum, triggeredBuildsComment)
[2/4] incubator-ignite git commit: # ignite-456: slurp.sh
Posted by ag...@apache.org.
# ignite-456: slurp.sh
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/982235b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/982235b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/982235b7
Branch: refs/heads/ignite-389
Commit: 982235b75f980eff4c20f5fc7560db55af32bd7d
Parents: 26b1d0d
Author: artem.shutak <as...@gridgain.com>
Authored: Tue May 26 21:14:52 2015 +0300
Committer: artem.shutak <as...@gridgain.com>
Committed: Tue May 26 21:14:52 2015 +0300
----------------------------------------------------------------------
dev-tools/slurp.sh | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/982235b7/dev-tools/slurp.sh
----------------------------------------------------------------------
diff --git a/dev-tools/slurp.sh b/dev-tools/slurp.sh
index 1636f21..7eb6fdb 100755
--- a/dev-tools/slurp.sh
+++ b/dev-tools/slurp.sh
@@ -51,6 +51,16 @@ TASK_RUNNER_USER='task_runner'
#
TASK_RUNNER_PWD=''
+echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
+echo "<"$(date +"%H:%M:%S")"> Starting task triggering"
+echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
+
+# Useful settings
+#cd /home/teamcity/jobs/incubator-ignite/
+#
+#export JAVA_HOME=<java_home>
+#export PATH=$PATH:<gradle_path>
+
git fetch
git checkout ${DEFAULT_BRANCH}
[3/4] incubator-ignite git commit: ignite-921 Create scan query able
to iterate over single partition
Posted by ag...@apache.org.
ignite-921 Create scan query able to iterate over single partition
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b58bb122
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b58bb122
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b58bb122
Branch: refs/heads/ignite-389
Commit: b58bb122da1d4c196b8125a28a4c1df33a9fc82f
Parents: 982235b
Author: agura <ag...@gridgain.com>
Authored: Mon May 25 20:26:04 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Tue May 26 21:18:49 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 15 +-
.../processors/cache/GridCacheSwapManager.java | 55 ++++-
.../processors/cache/IgniteCacheProxy.java | 2 +-
.../distributed/dht/GridDhtLocalPartition.java | 7 +
.../processors/cache/query/CacheQuery.java | 2 +-
.../query/GridCacheDistributedQueryManager.java | 3 +
.../cache/query/GridCacheQueryAdapter.java | 228 ++++++++++++++++++-
.../cache/query/GridCacheQueryManager.java | 200 +++++++++-------
.../cache/query/GridCacheQueryRequest.java | 31 ++-
.../datastructures/GridCacheSetImpl.java | 4 +-
...achePartitionedPreloadLifecycleSelfTest.java | 2 +-
...CacheReplicatedPreloadLifecycleSelfTest.java | 6 +-
.../GridCacheSwapScanQueryAbstractSelfTest.java | 112 ++++++---
...CacheScanPartitionQueryFallbackSelfTest.java | 213 +++++++++++++++++
.../cache/IgniteCacheAbstractQuerySelfTest.java | 53 ++++-
.../IgniteCacheQuerySelfTestSuite.java | 2 +
16 files changed, 795 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d390037..d7cec9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1317,8 +1317,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>();
- IgniteInternalFuture<Object> readFut =
- readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2<KeyCacheObject, Object>() {
+ IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null,
+ subjId, taskName, new CI2<KeyCacheObject, Object>() {
/** Version for all loaded entries. */
private GridCacheVersion nextVer = ctx.versions().next();
@@ -1948,7 +1948,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param filter Optional filter.
* @return Put operation future.
*/
- public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, @Nullable final CacheEntryPredicate... filter) {
+ public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val,
+ @Nullable final CacheEntryPredicate... filter) {
A.notNull(key, "key", val, "val");
if (keyCheck)
@@ -3117,7 +3118,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) throws IgniteCheckedException {
+ @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout)
+ throws IgniteCheckedException {
if (F.isEmpty(keys))
return true;
@@ -3689,7 +3691,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1)
return localIteratorHonorExpirePolicy(opCtx);
- CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, ctx.keepPortable())
+ CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, null, ctx.keepPortable())
.keepAll(false)
.execute();
@@ -3918,7 +3920,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return t;
}
- catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | IgniteTxRollbackCheckedException e) {
+ catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException |
+ IgniteTxRollbackCheckedException e) {
throw e;
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..e4b1cbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1211,7 +1211,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
checkIteratorQueue();
if (offHeapEnabled() && !swapEnabled())
- return rawOffHeapIterator(true, true);
+ return rawOffHeapIterator(null, true, true);
if (swapEnabled() && !offHeapEnabled())
return rawSwapIterator(true, true);
@@ -1227,7 +1227,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
private Map.Entry<byte[], byte[]> cur;
{
- it = rawOffHeapIterator(true, true);
+ it = rawOffHeapIterator(null, true, true);
advance();
}
@@ -1554,11 +1554,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @param c Key/value closure.
+ * @param part Partition.
* @param primary Include primaries.
* @param backup Include backups.
* @return Off-heap iterator.
*/
public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
+ Integer part,
boolean primary,
boolean backup)
{
@@ -1574,24 +1576,31 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
- Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
- cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ Set<Integer> parts;
+
+ if (part == null)
+ parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+ cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ else
+ parts = Collections.singleton(part);
return new CloseablePartitionsIterator<T, T>(parts) {
@Override protected GridCloseableIterator<T> partitionIterator(int part)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
return offheap.iterator(spaceName, c, part);
}
};
}
/**
+ *
+ * @param part Partition.
* @param primary Include primaries.
* @param backup Include backups.
* @return Raw off-heap iterator.
*/
- public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(final boolean primary,
+ public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(@Nullable Integer part,
+ final boolean primary,
final boolean backup)
{
if (!offheapEnabled || (!primary && !backup))
@@ -1626,8 +1635,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
- Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
- cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ Set<Integer> parts;
+
+ if (part == null)
+ parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+ cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ else
+ parts = Collections.singleton(part);
return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, IgniteBiTuple<byte[], byte[]>>(parts) {
private Map.Entry<byte[], byte[]> cur;
@@ -1701,6 +1715,29 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/**
+ * @param part Partition.
+ * @return Raw off-heap iterator.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(int part)
+ throws IgniteCheckedException
+ {
+ if (!swapEnabled)
+ return new GridEmptyCloseableIterator<>();
+
+ checkIteratorQueue();
+
+ return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>(
+ Collections.singleton(part)) {
+ @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part)
+ throws IgniteCheckedException
+ {
+ return swapMgr.rawIterator(spaceName, part);
+ }
+ };
+ }
+
+ /**
* @param primary If {@code true} includes primary entries.
* @param backup If {@code true} includes backup entries.
* @param topVer Topology version.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f840015..0009127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -353,7 +353,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (filter instanceof ScanQuery) {
IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
- qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, isKeepPortable);
+ qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, null, isKeepPortable);
if (grp != null)
qry.projection(grp);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 0749f66..8ac3809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -149,6 +149,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
}
/**
+ * @return Keys belonging to partition.
+ */
+ public Set<KeyCacheObject> keySet() {
+ return map.keySet();
+ }
+
+ /**
* @return Entries belonging to partition.
*/
public Collection<GridDhtCacheEntry> entries() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 0658828..2d2db1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -76,7 +76,7 @@ import org.jetbrains.annotations.*;
* </li>
* <li>
* Joins will work correctly only if joined objects are stored in
- * collocated mode or at least one side of the join is stored in
+ * colocated mode or at least one side of the join is stored in
* {@link org.apache.ignite.cache.CacheMode#REPLICATED} cache. Refer to
* {@link AffinityKey} javadoc for more information about colocation.
* </li>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index a579aab..2b93144 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -229,6 +229,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
false,
null,
req.keyValueFilter(),
+ req.partition(),
req.className(),
req.clause(),
req.includeMetaData(),
@@ -518,6 +519,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
qry.query().clause(),
clsName,
qry.query().scanFilter(),
+ qry.query().partition(),
qry.reducer(),
qry.transform(),
qry.query().pageSize(),
@@ -626,6 +628,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
qry.query().clause(),
null,
null,
+ null,
qry.reducer(),
qry.transform(),
qry.query().pageSize(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 4b1fc87..05198a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -21,7 +21,9 @@ import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.query.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -31,6 +33,7 @@ import org.apache.ignite.plugin.security.*;
import org.jetbrains.annotations.*;
import java.util.*;
+import java.util.concurrent.*;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
@@ -56,6 +59,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** */
private final IgniteBiPredicate<Object, Object> filter;
+ /** Partition. */
+ private Integer part;
+
/** */
private final boolean incMeta;
@@ -95,6 +101,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param clsName Class name.
* @param clause Clause.
* @param filter Scan filter.
+ * @param part Partition.
* @param incMeta Include metadata flag.
* @param keepPortable Keep portable flag.
*/
@@ -103,6 +110,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
@Nullable String clsName,
@Nullable String clause,
@Nullable IgniteBiPredicate<Object, Object> filter,
+ @Nullable Integer part,
boolean incMeta,
boolean keepPortable) {
assert cctx != null;
@@ -113,6 +121,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
this.clsName = clsName;
this.clause = clause;
this.filter = filter;
+ this.part = part;
this.incMeta = incMeta;
this.keepPortable = keepPortable;
@@ -132,6 +141,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param dedup Enable dedup flag.
* @param prj Grid projection.
* @param filter Key-value filter.
+ * @param part Partition.
* @param clsName Class name.
* @param clause Clause.
* @param incMeta Include metadata flag.
@@ -149,6 +159,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
boolean dedup,
ClusterGroup prj,
IgniteBiPredicate<Object, Object> filter,
+ @Nullable Integer part,
@Nullable String clsName,
String clause,
boolean incMeta,
@@ -165,6 +176,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
this.dedup = dedup;
this.prj = prj;
this.filter = filter;
+ this.part = part;
this.clsName = clsName;
this.clause = clause;
this.incMeta = incMeta;
@@ -334,6 +346,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
/**
+ * @return Partition.
+ */
+ @Nullable public Integer partition() {
+ return part;
+ }
+
+ /**
* @throws IgniteCheckedException If query is invalid.
*/
public void validate() throws IgniteCheckedException {
@@ -376,10 +395,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
return execute(null, rmtTransform, args);
}
+ /** {@inheritDoc} */
@Override public QueryMetrics metrics() {
return metrics.copy();
}
+ /** {@inheritDoc} */
@Override public void resetMetrics() {
metrics = new GridCacheQueryMetricsAdapter();
}
@@ -418,18 +439,34 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
taskHash = cctx.kernalContext().job().currentTaskNameHash();
- GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer,
+ final GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer,
(IgniteClosure<Object, Object>)rmtTransform, args);
- GridCacheQueryManager qryMgr = cctx.queries();
+ final GridCacheQueryManager qryMgr = cctx.queries();
boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
if (type == SQL_FIELDS || type == SPI)
return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
qryMgr.queryFieldsDistributed(bean, nodes));
- else
- return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
+ else {
+ final CacheQueryFuture<R> fut =
+ (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
+
+ if (type == SCAN && part != null) {
+ assert nodes.size() == 1;
+
+ final Queue<ClusterNode> backups = new LinkedList<>(
+ cctx.affinity().backups(part, cctx.affinity().affinityTopologyVersion()));
+
+ if (F.isEmpty(backups))
+ return fut;
+
+ return new CacheQueryFallbackFuture<>(backups, bean, qryMgr, fut);
+ }
+
+ return fut;
+ }
}
/**
@@ -448,14 +485,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
case REPLICATED:
if (prj != null)
- return nodes(cctx, prj);
+ return nodes(cctx, prj, partition());
return cctx.affinityNode() ?
Collections.singletonList(cctx.localNode()) :
- Collections.singletonList(F.rand(nodes(cctx, null)));
+ Collections.singletonList(F.rand(nodes(cctx, null, partition())));
case PARTITIONED:
- return nodes(cctx, prj);
+ return nodes(cctx, prj, partition());
default:
throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode);
@@ -467,13 +504,17 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param prj Projection (optional).
* @return Collection of data nodes in provided projection (if any).
*/
- private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) {
+ private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
+ @Nullable final ClusterGroup prj, @Nullable final Integer part) {
assert cctx != null;
return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
+ AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+
return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
- (prj == null || prj.node(n.id()) != null);
+ (prj == null || prj.node(n.id()) != null) &&
+ (part == null || cctx.affinity().primary(n, part.intValue(), topVer));
}
});
}
@@ -482,4 +523,173 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
@Override public String toString() {
return S.toString(GridCacheQueryAdapter.class, this);
}
+
+ /**
+ * Wrapper for queries with fallback.
+ */
+ private static class CacheQueryFallbackFuture<R> extends GridCacheQueryFutureAdapter<Object, Object, R> {
+ /** Target. */
+ private GridCacheQueryFutureAdapter<?, ?, R> target;
+
+ /** Backups. */
+ private final Queue<ClusterNode> backups;
+
+ /** Bean. */
+ private final GridCacheQueryBean bean;
+
+ /** Query manager. */
+ private final GridCacheQueryManager qryMgr;
+
+ /**
+ * @param backups Backups.
+ * @param bean Bean.
+ * @param qryMgr Query manager.
+ * @param fut Future.
+ */
+ public CacheQueryFallbackFuture(Queue<ClusterNode> backups, GridCacheQueryBean bean,
+ GridCacheQueryManager qryMgr, CacheQueryFuture<R> fut) {
+ this.backups = backups;
+ this.bean = bean;
+ this.qryMgr = qryMgr;
+ this.target = (GridCacheQueryFutureAdapter<?, ?, R>)fut;
+
+ init();
+ }
+
+ /**
+ *
+ */
+ private void init() {
+ target.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
+ @Override public void apply(IgniteInternalFuture<Collection<R>> fut) {
+ try {
+ onDone(fut.get());
+ }
+ catch (IgniteCheckedException e) {
+ if (F.isEmpty(backups))
+ onDone(e);
+ else {
+ Set<ClusterNode> backup = Collections.singleton(backups.poll());
+
+ target =
+ (GridCacheQueryFutureAdapter<?, ?, R>)qryMgr.queryDistributed(bean, backup);
+
+ init();
+ }
+ }
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onPage(UUID nodeId, boolean last) {
+ return target.onPage(nodeId, last);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void loadPage() {
+ target.loadPage();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
+ target.loadAllPages();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void cancelQuery() throws IgniteCheckedException {
+ target.cancelQuery();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int available() {
+ return target.available();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() throws IgniteCheckedException {
+ return target.cancel();
+ }
+
+ /** {@inheritDoc} */
+ @Override void clear() {
+ target.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return target.endTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void enqueue(Collection<?> col) {
+ target.enqueue(col);
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean fields() {
+ return target.fields();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<R> get() throws IgniteCheckedException {
+ return target.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException {
+ return target.get(timeout, unit);
+ }
+
+ /** {@inheritDoc} */
+ @Override public R next() {
+ return target.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<R> nextPage() throws IgniteCheckedException {
+ return target.nextPage();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(Collection<R> res, Throwable err) {
+ return target.onDone(res, err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<R> nextPage(long timeout) throws IgniteCheckedException {
+ return target.nextPage(timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onNodeLeft(UUID evtNodeId) {
+ target.onNodeLeft(evtNodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data,
+ @Nullable Throwable err, boolean finished) {
+ target.onPage(nodeId, data, err, finished);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ target.onTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() {
+ target.printMemoryStats();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheQueryBean query() {
+ return target.query();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return target.timeoutId();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 16a8028..fac3d8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -52,6 +52,8 @@ import java.util.concurrent.*;
import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
/**
@@ -111,8 +113,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final Object recipient = recipient(nodeId, entry.getKey());
entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() {
- @Override
- public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
+ @Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
throws IgniteCheckedException {
f.get().closeIfNotShared(recipient);
}
@@ -768,98 +769,139 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final boolean backups = qry.includeBackups() || cctx.isReplicated();
- final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
- private IgniteBiTuple<K, V> next;
+ final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
+ new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
+ private IgniteBiTuple<K, V> next;
- private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
+ private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
- private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator();
+ private Iterator<K> iter;
- {
- advance();
- }
+ private GridDhtLocalPartition locPart;
- @Override public boolean onHasNext() {
- return next != null;
- }
+ {
+ Integer part = qry.partition();
- @Override public IgniteBiTuple<K, V> onNext() {
- if (next == null)
- throw new NoSuchElementException();
+ if (part == null || dht == null)
+ iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator();
+ else if (part < 0 || part >= cctx.affinity().partitions())
+ iter = F.emptyIterator();
+ else {
+ AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
- IgniteBiTuple<K, V> next0 = next;
+ locPart = dht.topology().localPartition(part, topVer, false);
- advance();
+ if (locPart == null || (locPart.state() != OWNING && locPart.state() != RENTING) ||
+ !locPart.reserve())
+ throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved");
- return next0;
- }
- private void advance() {
- IgniteBiTuple<K, V> next0 = null;
+ iter = new Iterator<K>() {
+ private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
- while (iter.hasNext()) {
- next0 = null;
+ @Override public boolean hasNext() {
+ return iter0.hasNext();
+ }
- K key = iter.next();
+ @Override public K next() {
+ KeyCacheObject key = iter0.next();
- V val;
+ return key.value(cctx.cacheObjectContext(), false);
+ }
- try {
- val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc);
+ @Override public void remove() {
+ iter0.remove();
+ }
+ };
}
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to peek value: " + e);
- val = null;
- }
+ advance();
+ }
- if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
- dht.sendTtlUpdateRequest(expiryPlc);
+ @Override public boolean onHasNext() {
+ return next != null;
+ }
- expiryPlc = cctx.cache().expiryPolicy(plc);
- }
+ @Override public IgniteBiTuple<K, V> onNext() {
+ if (next == null)
+ throw new NoSuchElementException();
- if (val != null) {
- next0 = F.t(key, val);
+ IgniteBiTuple<K, V> next0 = next;
- if (checkPredicate(next0))
- break;
- else
- next0 = null;
- }
+ advance();
+
+ return next0;
}
- next = next0 != null ?
- new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
- null;
+ private void advance() {
+ IgniteBiTuple<K, V> next0 = null;
- if (next == null)
- sendTtlUpdate();
- }
+ while (iter.hasNext()) {
+ next0 = null;
- @Override protected void onClose() {
- sendTtlUpdate();
- }
+ K key = iter.next();
+
+ V val;
+
+ try {
+ val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to peek value: " + e);
+
+ val = null;
+ }
+
+ if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = cctx.cache().expiryPolicy(plc);
+ }
+
+ if (val != null) {
+ next0 = F.t(key, val);
+
+ if (checkPredicate(next0))
+ break;
+ else
+ next0 = null;
+ }
+ }
- private void sendTtlUpdate() {
- if (dht != null && expiryPlc != null) {
- dht.sendTtlUpdateRequest(expiryPlc);
+ next = next0 != null ?
+ new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
+ null;
- expiryPlc = null;
+ if (next == null)
+ sendTtlUpdate();
}
- }
- private boolean checkPredicate(Map.Entry<K, V> e) {
- if (keyValFilter != null) {
- Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
+ @Override protected void onClose() {
+ sendTtlUpdate();
- return keyValFilter.apply(e0.getKey(), e0.getValue());
+ if (locPart != null)
+ locPart.release();
}
- return true;
- }
- };
+ private void sendTtlUpdate() {
+ if (dht != null && expiryPlc != null) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = null;
+ }
+ }
+
+ private boolean checkPredicate(Map.Entry<K, V> e) {
+ if (keyValFilter != null) {
+ Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
+
+ return keyValFilter.apply(e0.getKey(), e0.getValue());
+ }
+
+ return true;
+ }
+ };
final GridIterator<IgniteBiTuple<K, V>> it;
@@ -914,7 +956,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
throws IgniteCheckedException {
IgniteBiPredicate<K, V> filter = qry.scanFilter();
- Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator(true, backups);
+ Integer part = qry.partition();
+
+ Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) :
+ cctx.swap().rawSwapIterator(part);
return scanIterator(it, filter, qry.keepPortable());
}
@@ -930,10 +975,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (cctx.offheapTiered() && filter != null) {
OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable());
- return cctx.swap().rawOffHeapIterator(c, true, backups);
+ return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups);
}
else {
- Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(true, backups);
+ Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups);
return scanIterator(it, filter, qry.keepPortable());
}
@@ -1222,7 +1267,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
try {
// Preparing query closures.
- IgniteClosure<Map.Entry<K, V>, Object> trans = (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+ IgniteClosure<Map.Entry<K, V>, Object> trans =
+ (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+
IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
injectResources(trans);
@@ -1529,11 +1576,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), false,
qryInfo.query().subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId())));
}
- catch (Error e) {
- fut.onDone(e);
-
- throw e;
- }
catch (Throwable e) {
fut.onDone(e);
@@ -1843,7 +1885,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
return new IgniteBiPredicate<K, V>() {
@Override public boolean apply(K k, V v) {
- return cache.context().affinity().primary(ctx.discovery().localNode(), k, AffinityTopologyVersion.NONE);
+ return cache.context().affinity().primary(ctx.discovery().localNode(), k, NONE);
}
};
}
@@ -2920,6 +2962,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
null,
null,
+ null,
false,
keepPortable);
}
@@ -2928,17 +2971,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* Creates user's predicate based scan query.
*
* @param filter Scan filter.
+ * @param part Partition.
* @param keepPortable Keep portable flag.
* @return Created query.
*/
- @SuppressWarnings("unchecked")
public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
- boolean keepPortable) {
+ @Nullable Integer part, boolean keepPortable) {
+
return new GridCacheQueryAdapter<>(cctx,
SCAN,
null,
null,
(IgniteBiPredicate<Object, Object>)filter,
+ part,
false,
keepPortable);
}
@@ -2962,6 +3007,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
clsName,
search,
null,
+ null,
false,
keepPortable);
}
@@ -2982,6 +3028,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
qry,
null,
+ null,
false,
keepPortable);
}
@@ -3002,6 +3049,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
qry,
null,
+ null,
incMeta,
keepPortable);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 845077f..7577954 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -26,6 +26,8 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
import java.io.*;
import java.nio.*;
import java.util.*;
@@ -109,6 +111,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/** */
private int taskHash;
+ /** Partition. */
+ private Integer part;
+
/**
* Required by {@link Externalizable}
*/
@@ -173,6 +178,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
* @param clause Query clause.
* @param clsName Query class name.
* @param keyValFilter Key-value filter.
+ * @param part Partition.
* @param rdc Reducer.
* @param trans Transformer.
* @param pageSize Page size.
@@ -189,6 +195,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
String clause,
String clsName,
IgniteBiPredicate<Object, Object> keyValFilter,
+ @Nullable Integer part,
IgniteReducer<Object, Object> rdc,
IgniteClosure<Object, Object> trans,
int pageSize,
@@ -211,6 +218,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
this.clause = clause;
this.clsName = clsName;
this.keyValFilter = keyValFilter;
+ this.part = part;
this.rdc = rdc;
this.trans = trans;
this.pageSize = pageSize;
@@ -414,6 +422,13 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
return taskHash;
}
+ /**
+ * @return partition.
+ */
+ @Nullable public Integer partition() {
+ return part;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -537,6 +552,11 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
writer.incrementState();
+ case 21:
+ if (!writer.writeInt("part", part != null ? part : -1))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -701,6 +721,15 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
reader.incrementState();
+ case 21:
+ int part0 = reader.readInt("part");
+
+ part = part0 == -1 ? null : part0;
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return true;
@@ -713,7 +742,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 21;
+ return 22;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f516968..c0e763f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -114,7 +114,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
}
CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
- new GridSetQueryPredicate<>(id, collocated), false, false);
+ new GridSetQueryPredicate<>(id, collocated), -1, false, false);
Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
@@ -345,7 +345,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
private GridCloseableIterator<T> iterator0() {
try {
CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
- new GridSetQueryPredicate<>(id, collocated), false, false);
+ new GridSetQueryPredicate<>(id, collocated), null, false, false);
Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
index 9d41074..4601586 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
@@ -176,7 +176,7 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo
for (int j = 0; j < G.allGrids().size(); j++) {
GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
- CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false);
+ CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false);
int totalCnt = F.sumInt(qry.execute(new IgniteReducer<Map.Entry<Object, MyValue>, Integer>() {
@IgniteInstanceResource
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
index 62bf3f7..cc8217d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
@@ -179,7 +179,7 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
for (int j = 0; j < G.allGrids().size(); j++) {
GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
- CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false);
+ CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false);
final int i0 = j;
final int j0 = i;
@@ -207,8 +207,8 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
Object v1 = e.getValue();
Object v2 = ((IgniteKernal)grid).getCache("one").get(key);
- assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 +
- ", missedKey=" + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2);
+ assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + ", missedKey=" +
+ key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2);
assertEquals(v1, v2);
}
catch (IgniteCheckedException e1) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
index 068a46c..6ccfbc2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
@@ -115,49 +115,81 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
* @throws Exception If failed.
*/
public void testQuery() throws Exception {
- checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME));
+ checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), false);
- checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME));
+ checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), false);
+
+ checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), true);
+
+ checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), true);
}
/**
* @param cache Cache.
+ * @param scanPartitions Scan partitions.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- private void checkQuery(GridCacheAdapter cache) throws Exception {
+ private void checkQuery(GridCacheAdapter cache, boolean scanPartitions) throws Exception {
final int ENTRY_CNT = 500;
- for (int i = 0; i < ENTRY_CNT; i++)
- cache.getAndPut(new Key(i), new Person("p-" + i, i));
+ Map<Integer, Map<Key, Person>> entries = new HashMap<>();
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ Key key = new Key(i);
+ Person val = new Person("p-" + i, i);
+
+ int part = cache.context().affinity().partition(key);
+
+ cache.getAndPut(key, val);
+
+ Map<Key, Person> partEntries = entries.get(part);
+
+ if (partEntries == null)
+ entries.put(part, partEntries = new HashMap<>());
+
+ partEntries.put(key, val);
+ }
try {
- CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
- new IgniteBiPredicate<Key, Person>() {
- @Override public boolean apply(Key key, Person p) {
- assertEquals(key.id, (Integer)p.salary);
+ int partitions = scanPartitions ? cache.context().affinity().partitions() : 1;
- return key.id % 2 == 0;
- }
- }, false);
+ for (int i = 0; i < partitions; i++) {
+ CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
+ new IgniteBiPredicate<Key, Person>() {
+ @Override public boolean apply(Key key, Person p) {
+ assertEquals(key.id, (Integer)p.salary);
- Collection<Map.Entry<Key, Person>> res = qry.execute().get();
+ return key.id % 2 == 0;
+ }
+ }, (scanPartitions ? i : null), false);
- assertEquals(ENTRY_CNT / 2, res.size());
+ Collection<Map.Entry<Key, Person>> res = qry.execute().get();
- for (Map.Entry<Key, Person> e : res) {
- Key k = e.getKey();
- Person p = e.getValue();
+ if (!scanPartitions)
+ assertEquals(ENTRY_CNT / 2, res.size());
- assertEquals(k.id, (Integer)p.salary);
- assertEquals(0, k.id % 2);
- }
+ for (Map.Entry<Key, Person> e : res) {
+ Key k = e.getKey();
+ Person p = e.getValue();
- qry = cache.context().queries().createScanQuery(null, false);
+ assertEquals(k.id, (Integer)p.salary);
+ assertEquals(0, k.id % 2);
- res = qry.execute().get();
+ if (scanPartitions) {
+ Map<Key, Person> partEntries = entries.get(i);
- assertEquals(ENTRY_CNT, res.size());
+ assertEquals(p, partEntries.get(k));
+ }
+ }
+
+ qry = cache.context().queries().createScanQuery(null, (scanPartitions ? i : null), false);
+
+ res = qry.execute().get();
+
+ if (!scanPartitions)
+ assertEquals(ENTRY_CNT, res.size());
+ }
testMultithreaded(cache, ENTRY_CNT / 2);
}
@@ -185,7 +217,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
return key.id % 2 == 0;
}
- }, false);
+ }, null, false);
for (int i = 0; i < 250; i++) {
Collection<Map.Entry<Key, Person>> res = qry.execute().get();
@@ -229,7 +261,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
return val % 2 == 0;
}
- }, false);
+ }, null, false);
Collection<Map.Entry<String, Long>> res = qry.execute().get();
@@ -244,7 +276,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
assertEquals(0, val % 2);
}
- qry = cache.context().queries().createScanQuery(null, false);
+ qry = cache.context().queries().createScanQuery(null, null, false);
res = qry.execute().get();
@@ -284,7 +316,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
return key % 2 == 0;
}
- }, false);
+ }, null, false);
Collection<Map.Entry<Integer, byte[]>> res = qry.execute().get();
@@ -299,7 +331,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
assertEquals(0, key % 2);
}
- qry = cache.context().queries().createScanQuery(null, false);
+ qry = cache.context().queries().createScanQuery(null, null, false);
res = qry.execute().get();
@@ -367,5 +399,29 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
this.name = name;
this.salary = salary;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Person person = (Person)o;
+
+ if (salary != person.salary)
+ return false;
+
+ return !(name != null ? !name.equals(person.name) : person.name != null);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = name != null ? name.hashCode() : 0;
+
+ return 31 * result + salary;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
new file mode 100644
index 0000000..3b1b842
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+/**
+ * Tests partition scan query fallback.
+ */
+public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ private static final int GRID_CNT = 5;
+
+ /** Kys count. */
+ private static final int KEYS_CNT = 1;
+
+ /** Backups. */
+ private int backups;
+
+ /** Cache mode. */
+ private CacheMode cacheMode;
+
+ /** Fallback. */
+ private boolean fallback;
+
+ /** Primary node id. */
+ private static volatile UUID expNodeId;
+
+ /** Fail node id. */
+ private static volatile UUID failNodeId;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+ ccfg.setBackups(backups);
+ ccfg.setNearConfiguration(null);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimary() throws Exception {
+ cacheMode = CacheMode.PARTITIONED;
+ backups = 0;
+ failNodeId = null;
+ fallback = false;
+
+ doTestScanPartition();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFallbackToBackup() throws Exception {
+ cacheMode = CacheMode.PARTITIONED;
+ backups = 1;
+ failNodeId = null;
+ fallback = true;
+
+ doTestScanPartition();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ protected void doTestScanPartition() throws Exception {
+ try {
+ Ignite ignite = startGrids(GRID_CNT);
+
+ IgniteCacheProxy<Integer, Integer> cache =
+ (IgniteCacheProxy<Integer, Integer>)ignite.<Integer, Integer>cache(null);
+
+ Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ cache.put(i, i);
+
+ int part = cache.context().affinity().partition(i);
+
+ Map<Integer, Integer> partEntries = entries.get(part);
+
+ if (partEntries == null)
+ entries.put(part, partEntries = new HashMap<>());
+
+ partEntries.put(i, i);
+ }
+
+ IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context(), true);
+
+ int part = tup.get1();
+
+ if (fallback)
+ failNodeId = tup.get2();
+ else
+ expNodeId = tup.get2();
+
+ if (fallback)
+ expNodeId = remoteBackup(part, cache.context());
+
+ CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+
+ CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+
+ Collection<Map.Entry<Integer, Integer>> expEntries = fut.get();
+
+ for (Map.Entry<Integer, Integer> e : expEntries) {
+ Map<Integer, Integer> map = entries.get(part);
+
+ if(map == null)
+ assertTrue(expEntries.isEmpty());
+ else
+ assertEquals(map.get(e.getKey()), e.getValue());
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param cctx Cctx.
+ * @param primary Primary.
+ */
+ private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx, boolean primary) {
+ ClusterNode node = F.first(cctx.kernalContext().grid().cluster().forRemotes().nodes());
+
+ GridCacheAffinityManager affMgr = cctx.affinity();
+
+ AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion();
+
+ Set<Integer> parts = primary ?
+ affMgr.primaryPartitions(node.id(), topVer) : affMgr.backupPartitions(node.id(), topVer);
+
+ return new IgniteBiTuple<>(F.first(parts), node.id());
+ }
+
+ /**
+ * @param part Partition.
+ * @param cctx Cctx.
+ */
+ private UUID remoteBackup(int part, final GridCacheContext cctx) {
+ final UUID locUuid = cctx.localNodeId();
+
+ GridCacheAffinityManager affMgr = cctx.affinity();
+
+ AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion();
+
+ return F.first(F.view(affMgr.backups(part, topVer), new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return !node.id().equals(locUuid);
+ }
+ })).id();
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg)
+ throws IgniteSpiException {
+ Object origMsg = ((GridIoMessage)msg).message();
+
+ if (origMsg instanceof GridCacheQueryRequest) {
+ if (node.id().equals(failNodeId))
+ throw new IgniteSpiException("");
+ else
+ assertEquals(expNodeId, node.id());
+ }
+
+ super.sendMessage(node, msg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 1a60bbd..eb5027c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -61,6 +61,9 @@ import static org.junit.Assert.*;
* Various tests for cache queries.
*/
public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstractTest {
+ /** Key count. */
+ private static final int KEY_CNT = 5000;
+
/** Cache store. */
private static TestStore store = new TestStore();
@@ -643,6 +646,47 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
}
/**
+ * @throws Exception In case of error.
+ */
+ public void testScanPartitionQuery() throws Exception {
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ GridCacheContext cctx = ((IgniteCacheProxy)cache).context();
+
+ Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
+
+ for (int i = 0; i < KEY_CNT; i++) {
+ cache.put(i, i);
+
+ int part = cctx.affinity().partition(i);
+
+ Map<Integer, Integer> partEntries = entries.get(part);
+
+ if (partEntries == null)
+ entries.put(part, partEntries = new HashMap<>());
+
+ partEntries.put(i, i);
+ }
+
+ for (int i = 0; i < cctx.affinity().partitions(); i++) {
+ CacheQuery<Map.Entry<Integer, Integer>> qry =
+ ((IgniteCacheProxy<Integer, Integer>)cache).context().queries().createScanQuery(null, i, false);
+
+ CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+
+ Map<Integer, Integer> exp = entries.get(i);
+
+ Collection<Map.Entry<Integer, Integer>> actual = fut.get();
+
+ if (exp == null)
+ assertTrue(actual.isEmpty());
+ else
+ for (Map.Entry<Integer, Integer> entry : actual)
+ assertTrue(entry.getValue().equals(exp.get(entry.getKey())));
+ }
+ }
+
+ /**
* JUnit.
*
* @throws Exception In case of error.
@@ -1048,11 +1092,13 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
for (int i = 0; i < 20; i++)
cache.put(i, i);
- QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(new IgniteBiPredicate<Integer,Integer>() {
+ IgniteBiPredicate<Integer, Integer> filter = new IgniteBiPredicate<Integer, Integer>() {
@Override public boolean apply(Integer k, Integer v) {
return k >= 10;
}
- }));
+ };
+
+ QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(filter));
q.getAll();
@@ -1187,7 +1233,8 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
* @return {@code true} if index has a table for given class.
* @throws IgniteCheckedException If failed.
*/
- private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr) throws IgniteCheckedException {
+ private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr)
+ throws IgniteCheckedException {
return qryMgr.size(cls) != -1;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index f42963a..713cf84 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -70,6 +70,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class);
suite.addTestSuite(GridCacheQuerySerializationSelfTest.class);
+ // Scan queries.
+ suite.addTestSuite(CacheScanPartitionQueryFallbackSelfTest.class);
// Fields queries.
suite.addTestSuite(IgniteCacheLocalFieldsQuerySelfTest.class);
[4/4] incubator-ignite git commit: Merge branch 'ignite-921' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-389
Posted by ag...@apache.org.
Merge branch 'ignite-921' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-389
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5a7dd02f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5a7dd02f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5a7dd02f
Branch: refs/heads/ignite-389
Commit: 5a7dd02f540c98312bc120c2566ba0db94f7a570
Parents: 9bb71ba b58bb12
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed May 27 09:40:17 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed May 27 09:40:17 2015 -0700
----------------------------------------------------------------------
dev-tools/slurp.sh | 10 +
dev-tools/src/main/groovy/jiraslurp.groovy | 9 +-
.../cache/query/GridCacheQueryAdapter.java | 200 ++++++++++++++++-
...CacheScanPartitionQueryFallbackSelfTest.java | 213 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
5 files changed, 429 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5a7dd02f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 9ab8c4f,05198a4..1f7b736
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@@ -487,14 -508,13 +508,16 @@@ public class GridCacheQueryAdapter<T> i
@Nullable final ClusterGroup prj, @Nullable final Integer part) {
assert cctx != null;
+ final List<ClusterNode> owners = part == null ? null :
+ cctx.topology().owners(part, cctx.affinity().affinityTopologyVersion());
+
return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
+ AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+
return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
(prj == null || prj.node(n.id()) != null) &&
- (part == null || cctx.affinity().primary(n, part.intValue(), topVer));
+ (part == null || owners.contains(n));
}
});
}