You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/05/27 09:09:09 UTC
[40/50] incubator-ignite git commit: ignite-921 Create scan query
able to iterate over single partition
ignite-921 Create scan query able to iterate over single partition
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f4cc4b6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f4cc4b6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f4cc4b6c
Branch: refs/heads/ignite-389
Commit: f4cc4b6cecf0b521edcd8f07a10de8202832932d
Parents: edf6ffc
Author: agura <ag...@gridgain.com>
Authored: Mon May 25 20:26:04 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Mon May 25 22:09:37 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 15 +-
.../processors/cache/GridCacheSwapManager.java | 55 ++++-
.../processors/cache/IgniteCacheProxy.java | 2 +-
.../distributed/dht/GridDhtLocalPartition.java | 7 +
.../processors/cache/query/CacheQuery.java | 2 +-
.../query/GridCacheDistributedQueryManager.java | 3 +
.../cache/query/GridCacheQueryAdapter.java | 28 ++-
.../cache/query/GridCacheQueryManager.java | 200 ++++++++++++-------
.../cache/query/GridCacheQueryRequest.java | 31 ++-
.../datastructures/GridCacheSetImpl.java | 4 +-
...achePartitionedPreloadLifecycleSelfTest.java | 2 +-
...CacheReplicatedPreloadLifecycleSelfTest.java | 6 +-
.../GridCacheSwapScanQueryAbstractSelfTest.java | 112 ++++++++---
.../cache/IgniteCacheAbstractQuerySelfTest.java | 53 ++++-
14 files changed, 384 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d390037..d7cec9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1317,8 +1317,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>();
- IgniteInternalFuture<Object> readFut =
- readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2<KeyCacheObject, Object>() {
+ IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null,
+ subjId, taskName, new CI2<KeyCacheObject, Object>() {
/** Version for all loaded entries. */
private GridCacheVersion nextVer = ctx.versions().next();
@@ -1948,7 +1948,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param filter Optional filter.
* @return Put operation future.
*/
- public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, @Nullable final CacheEntryPredicate... filter) {
+ public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val,
+ @Nullable final CacheEntryPredicate... filter) {
A.notNull(key, "key", val, "val");
if (keyCheck)
@@ -3117,7 +3118,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) throws IgniteCheckedException {
+ @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout)
+ throws IgniteCheckedException {
if (F.isEmpty(keys))
return true;
@@ -3689,7 +3691,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1)
return localIteratorHonorExpirePolicy(opCtx);
- CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, ctx.keepPortable())
+ CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, null, ctx.keepPortable())
.keepAll(false)
.execute();
@@ -3918,7 +3920,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return t;
}
- catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | IgniteTxRollbackCheckedException e) {
+ catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException |
+ IgniteTxRollbackCheckedException e) {
throw e;
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..e4b1cbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1211,7 +1211,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
checkIteratorQueue();
if (offHeapEnabled() && !swapEnabled())
- return rawOffHeapIterator(true, true);
+ return rawOffHeapIterator(null, true, true);
if (swapEnabled() && !offHeapEnabled())
return rawSwapIterator(true, true);
@@ -1227,7 +1227,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
private Map.Entry<byte[], byte[]> cur;
{
- it = rawOffHeapIterator(true, true);
+ it = rawOffHeapIterator(null, true, true);
advance();
}
@@ -1554,11 +1554,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @param c Key/value closure.
+ * @param part Partition.
* @param primary Include primaries.
* @param backup Include backups.
* @return Off-heap iterator.
*/
public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
+ Integer part,
boolean primary,
boolean backup)
{
@@ -1574,24 +1576,31 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
- Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
- cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ Set<Integer> parts;
+
+ if (part == null)
+ parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+ cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ else
+ parts = Collections.singleton(part);
return new CloseablePartitionsIterator<T, T>(parts) {
@Override protected GridCloseableIterator<T> partitionIterator(int part)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
return offheap.iterator(spaceName, c, part);
}
};
}
/**
+ *
+ * @param part Partition.
* @param primary Include primaries.
* @param backup Include backups.
* @return Raw off-heap iterator.
*/
- public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(final boolean primary,
+ public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(@Nullable Integer part,
+ final boolean primary,
final boolean backup)
{
if (!offheapEnabled || (!primary && !backup))
@@ -1626,8 +1635,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
- Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
- cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ Set<Integer> parts;
+
+ if (part == null)
+ parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+ cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+ else
+ parts = Collections.singleton(part);
return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, IgniteBiTuple<byte[], byte[]>>(parts) {
private Map.Entry<byte[], byte[]> cur;
@@ -1701,6 +1715,29 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/**
+ * @param part Partition.
+ * @return Raw off-heap iterator.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(int part)
+ throws IgniteCheckedException
+ {
+ if (!swapEnabled)
+ return new GridEmptyCloseableIterator<>();
+
+ checkIteratorQueue();
+
+ return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>(
+ Collections.singleton(part)) {
+ @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part)
+ throws IgniteCheckedException
+ {
+ return swapMgr.rawIterator(spaceName, part);
+ }
+ };
+ }
+
+ /**
* @param primary If {@code true} includes primary entries.
* @param backup If {@code true} includes backup entries.
* @param topVer Topology version.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 2de5bf0..8833232 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -353,7 +353,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (filter instanceof ScanQuery) {
IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
- qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, isKeepPortable);
+ qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, null, isKeepPortable);
if (grp != null)
qry.projection(grp);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 0749f66..8ac3809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -149,6 +149,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
}
/**
+ * @return Keys belonging to partition.
+ */
+ public Set<KeyCacheObject> keySet() {
+ return map.keySet();
+ }
+
+ /**
* @return Entries belonging to partition.
*/
public Collection<GridDhtCacheEntry> entries() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 0658828..2d2db1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -76,7 +76,7 @@ import org.jetbrains.annotations.*;
* </li>
* <li>
* Joins will work correctly only if joined objects are stored in
- * collocated mode or at least one side of the join is stored in
+ * colocated mode or at least one side of the join is stored in
* {@link org.apache.ignite.cache.CacheMode#REPLICATED} cache. Refer to
* {@link AffinityKey} javadoc for more information about colocation.
* </li>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index a579aab..2b93144 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -229,6 +229,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
false,
null,
req.keyValueFilter(),
+ req.partition(),
req.className(),
req.clause(),
req.includeMetaData(),
@@ -518,6 +519,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
qry.query().clause(),
clsName,
qry.query().scanFilter(),
+ qry.query().partition(),
qry.reducer(),
qry.transform(),
qry.query().pageSize(),
@@ -626,6 +628,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
qry.query().clause(),
null,
null,
+ null,
qry.reducer(),
qry.transform(),
qry.query().pageSize(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 4b1fc87..d976d2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -56,6 +56,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** */
private final IgniteBiPredicate<Object, Object> filter;
+ /** Partition. */
+ private Integer part;
+
/** */
private final boolean incMeta;
@@ -95,6 +98,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param clsName Class name.
* @param clause Clause.
* @param filter Scan filter.
+ * @param part Partition.
* @param incMeta Include metadata flag.
* @param keepPortable Keep portable flag.
*/
@@ -103,6 +107,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
@Nullable String clsName,
@Nullable String clause,
@Nullable IgniteBiPredicate<Object, Object> filter,
+ @Nullable Integer part,
boolean incMeta,
boolean keepPortable) {
assert cctx != null;
@@ -113,6 +118,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
this.clsName = clsName;
this.clause = clause;
this.filter = filter;
+ this.part = part;
this.incMeta = incMeta;
this.keepPortable = keepPortable;
@@ -132,6 +138,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param dedup Enable dedup flag.
* @param prj Grid projection.
* @param filter Key-value filter.
+ * @param part Partition.
* @param clsName Class name.
* @param clause Clause.
* @param incMeta Include metadata flag.
@@ -149,6 +156,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
boolean dedup,
ClusterGroup prj,
IgniteBiPredicate<Object, Object> filter,
+ @Nullable Integer part,
@Nullable String clsName,
String clause,
boolean incMeta,
@@ -165,6 +173,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
this.dedup = dedup;
this.prj = prj;
this.filter = filter;
+ this.part = part;
this.clsName = clsName;
this.clause = clause;
this.incMeta = incMeta;
@@ -334,6 +343,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
/**
+ * @return Partition.
+ */
+ @Nullable public Integer partition() {
+ return part;
+ }
+
+ /**
* @throws IgniteCheckedException If query is invalid.
*/
public void validate() throws IgniteCheckedException {
@@ -448,14 +464,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
case REPLICATED:
if (prj != null)
- return nodes(cctx, prj);
+ return nodes(cctx, prj, partition());
return cctx.affinityNode() ?
Collections.singletonList(cctx.localNode()) :
- Collections.singletonList(F.rand(nodes(cctx, null)));
+ Collections.singletonList(F.rand(nodes(cctx, null, partition())));
case PARTITIONED:
- return nodes(cctx, prj);
+ return nodes(cctx, prj, partition());
default:
throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode);
@@ -467,13 +483,15 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param prj Projection (optional).
* @return Collection of data nodes in provided projection (if any).
*/
- private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) {
+ private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
+ @Nullable final ClusterGroup prj, @Nullable final Integer part) {
assert cctx != null;
return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
- (prj == null || prj.node(n.id()) != null);
+ (prj == null || prj.node(n.id()) != null) &&
+ (part == null || cctx.affinity().primary(n , part, cctx.affinity().affinityTopologyVersion()));
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 16a8028..fac3d8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -52,6 +52,8 @@ import java.util.concurrent.*;
import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
/**
@@ -111,8 +113,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final Object recipient = recipient(nodeId, entry.getKey());
entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() {
- @Override
- public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
+ @Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
throws IgniteCheckedException {
f.get().closeIfNotShared(recipient);
}
@@ -768,98 +769,139 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final boolean backups = qry.includeBackups() || cctx.isReplicated();
- final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
- private IgniteBiTuple<K, V> next;
+ final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
+ new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
+ private IgniteBiTuple<K, V> next;
- private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
+ private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
- private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator();
+ private Iterator<K> iter;
- {
- advance();
- }
+ private GridDhtLocalPartition locPart;
- @Override public boolean onHasNext() {
- return next != null;
- }
+ {
+ Integer part = qry.partition();
- @Override public IgniteBiTuple<K, V> onNext() {
- if (next == null)
- throw new NoSuchElementException();
+ if (part == null || dht == null)
+ iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator();
+ else if (part < 0 || part >= cctx.affinity().partitions())
+ iter = F.emptyIterator();
+ else {
+ AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
- IgniteBiTuple<K, V> next0 = next;
+ locPart = dht.topology().localPartition(part, topVer, false);
- advance();
+ if (locPart == null || (locPart.state() != OWNING && locPart.state() != RENTING) ||
+ !locPart.reserve())
+ throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved");
- return next0;
- }
- private void advance() {
- IgniteBiTuple<K, V> next0 = null;
+ iter = new Iterator<K>() {
+ private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
- while (iter.hasNext()) {
- next0 = null;
+ @Override public boolean hasNext() {
+ return iter0.hasNext();
+ }
- K key = iter.next();
+ @Override public K next() {
+ KeyCacheObject key = iter0.next();
- V val;
+ return key.value(cctx.cacheObjectContext(), false);
+ }
- try {
- val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc);
+ @Override public void remove() {
+ iter0.remove();
+ }
+ };
}
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to peek value: " + e);
- val = null;
- }
+ advance();
+ }
- if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
- dht.sendTtlUpdateRequest(expiryPlc);
+ @Override public boolean onHasNext() {
+ return next != null;
+ }
- expiryPlc = cctx.cache().expiryPolicy(plc);
- }
+ @Override public IgniteBiTuple<K, V> onNext() {
+ if (next == null)
+ throw new NoSuchElementException();
- if (val != null) {
- next0 = F.t(key, val);
+ IgniteBiTuple<K, V> next0 = next;
- if (checkPredicate(next0))
- break;
- else
- next0 = null;
- }
+ advance();
+
+ return next0;
}
- next = next0 != null ?
- new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
- null;
+ private void advance() {
+ IgniteBiTuple<K, V> next0 = null;
- if (next == null)
- sendTtlUpdate();
- }
+ while (iter.hasNext()) {
+ next0 = null;
- @Override protected void onClose() {
- sendTtlUpdate();
- }
+ K key = iter.next();
+
+ V val;
+
+ try {
+ val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to peek value: " + e);
+
+ val = null;
+ }
+
+ if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = cctx.cache().expiryPolicy(plc);
+ }
+
+ if (val != null) {
+ next0 = F.t(key, val);
+
+ if (checkPredicate(next0))
+ break;
+ else
+ next0 = null;
+ }
+ }
- private void sendTtlUpdate() {
- if (dht != null && expiryPlc != null) {
- dht.sendTtlUpdateRequest(expiryPlc);
+ next = next0 != null ?
+ new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
+ null;
- expiryPlc = null;
+ if (next == null)
+ sendTtlUpdate();
}
- }
- private boolean checkPredicate(Map.Entry<K, V> e) {
- if (keyValFilter != null) {
- Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
+ @Override protected void onClose() {
+ sendTtlUpdate();
- return keyValFilter.apply(e0.getKey(), e0.getValue());
+ if (locPart != null)
+ locPart.release();
}
- return true;
- }
- };
+ private void sendTtlUpdate() {
+ if (dht != null && expiryPlc != null) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = null;
+ }
+ }
+
+ private boolean checkPredicate(Map.Entry<K, V> e) {
+ if (keyValFilter != null) {
+ Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
+
+ return keyValFilter.apply(e0.getKey(), e0.getValue());
+ }
+
+ return true;
+ }
+ };
final GridIterator<IgniteBiTuple<K, V>> it;
@@ -914,7 +956,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
throws IgniteCheckedException {
IgniteBiPredicate<K, V> filter = qry.scanFilter();
- Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator(true, backups);
+ Integer part = qry.partition();
+
+ Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) :
+ cctx.swap().rawSwapIterator(part);
return scanIterator(it, filter, qry.keepPortable());
}
@@ -930,10 +975,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (cctx.offheapTiered() && filter != null) {
OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable());
- return cctx.swap().rawOffHeapIterator(c, true, backups);
+ return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups);
}
else {
- Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(true, backups);
+ Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups);
return scanIterator(it, filter, qry.keepPortable());
}
@@ -1222,7 +1267,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
try {
// Preparing query closures.
- IgniteClosure<Map.Entry<K, V>, Object> trans = (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+ IgniteClosure<Map.Entry<K, V>, Object> trans =
+ (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+
IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
injectResources(trans);
@@ -1529,11 +1576,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), false,
qryInfo.query().subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId())));
}
- catch (Error e) {
- fut.onDone(e);
-
- throw e;
- }
catch (Throwable e) {
fut.onDone(e);
@@ -1843,7 +1885,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
return new IgniteBiPredicate<K, V>() {
@Override public boolean apply(K k, V v) {
- return cache.context().affinity().primary(ctx.discovery().localNode(), k, AffinityTopologyVersion.NONE);
+ return cache.context().affinity().primary(ctx.discovery().localNode(), k, NONE);
}
};
}
@@ -2920,6 +2962,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
null,
null,
+ null,
false,
keepPortable);
}
@@ -2928,17 +2971,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* Creates user's predicate based scan query.
*
* @param filter Scan filter.
+ * @param part Partition.
* @param keepPortable Keep portable flag.
* @return Created query.
*/
- @SuppressWarnings("unchecked")
public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
- boolean keepPortable) {
+ @Nullable Integer part, boolean keepPortable) {
+
return new GridCacheQueryAdapter<>(cctx,
SCAN,
null,
null,
(IgniteBiPredicate<Object, Object>)filter,
+ part,
false,
keepPortable);
}
@@ -2962,6 +3007,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
clsName,
search,
null,
+ null,
false,
keepPortable);
}
@@ -2982,6 +3028,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
qry,
null,
+ null,
false,
keepPortable);
}
@@ -3002,6 +3049,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
qry,
null,
+ null,
incMeta,
keepPortable);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 845077f..7577954 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -26,6 +26,8 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
import java.io.*;
import java.nio.*;
import java.util.*;
@@ -109,6 +111,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/** */
private int taskHash;
+ /** Partition. */
+ private Integer part;
+
/**
* Required by {@link Externalizable}
*/
@@ -173,6 +178,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
* @param clause Query clause.
* @param clsName Query class name.
* @param keyValFilter Key-value filter.
+ * @param part Partition.
* @param rdc Reducer.
* @param trans Transformer.
* @param pageSize Page size.
@@ -189,6 +195,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
String clause,
String clsName,
IgniteBiPredicate<Object, Object> keyValFilter,
+ @Nullable Integer part,
IgniteReducer<Object, Object> rdc,
IgniteClosure<Object, Object> trans,
int pageSize,
@@ -211,6 +218,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
this.clause = clause;
this.clsName = clsName;
this.keyValFilter = keyValFilter;
+ this.part = part;
this.rdc = rdc;
this.trans = trans;
this.pageSize = pageSize;
@@ -414,6 +422,13 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
return taskHash;
}
+ /**
+ * @return partition.
+ */
+ @Nullable public Integer partition() {
+ return part;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -537,6 +552,11 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
writer.incrementState();
+ case 21:
+ if (!writer.writeInt("part", part != null ? part : -1))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -701,6 +721,15 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
reader.incrementState();
+ case 21:
+ int part0 = reader.readInt("part");
+
+ part = part0 == -1 ? null : part0;
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return true;
@@ -713,7 +742,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 21;
+ return 22;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f516968..c0e763f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -114,7 +114,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
}
CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
- new GridSetQueryPredicate<>(id, collocated), false, false);
+ new GridSetQueryPredicate<>(id, collocated), -1, false, false);
Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
@@ -345,7 +345,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
private GridCloseableIterator<T> iterator0() {
try {
CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
- new GridSetQueryPredicate<>(id, collocated), false, false);
+ new GridSetQueryPredicate<>(id, collocated), null, false, false);
Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
index 9d41074..4601586 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
@@ -176,7 +176,7 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo
for (int j = 0; j < G.allGrids().size(); j++) {
GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
- CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false);
+ CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false);
int totalCnt = F.sumInt(qry.execute(new IgniteReducer<Map.Entry<Object, MyValue>, Integer>() {
@IgniteInstanceResource
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
index 62bf3f7..cc8217d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
@@ -179,7 +179,7 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
for (int j = 0; j < G.allGrids().size(); j++) {
GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
- CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false);
+ CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false);
final int i0 = j;
final int j0 = i;
@@ -207,8 +207,8 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
Object v1 = e.getValue();
Object v2 = ((IgniteKernal)grid).getCache("one").get(key);
- assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 +
- ", missedKey=" + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2);
+ assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + ", missedKey=" +
+ key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2);
assertEquals(v1, v2);
}
catch (IgniteCheckedException e1) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
index 068a46c..6ccfbc2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
@@ -115,49 +115,81 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
* @throws Exception If failed.
*/
public void testQuery() throws Exception {
- checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME));
+ checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), false);
- checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME));
+ checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), false);
+
+ checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), true);
+
+ checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), true);
}
/**
* @param cache Cache.
+ * @param scanPartitions Scan partitions.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- private void checkQuery(GridCacheAdapter cache) throws Exception {
+ private void checkQuery(GridCacheAdapter cache, boolean scanPartitions) throws Exception {
final int ENTRY_CNT = 500;
- for (int i = 0; i < ENTRY_CNT; i++)
- cache.getAndPut(new Key(i), new Person("p-" + i, i));
+ Map<Integer, Map<Key, Person>> entries = new HashMap<>();
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ Key key = new Key(i);
+ Person val = new Person("p-" + i, i);
+
+ int part = cache.context().affinity().partition(key);
+
+ cache.getAndPut(key, val);
+
+ Map<Key, Person> partEntries = entries.get(part);
+
+ if (partEntries == null)
+ entries.put(part, partEntries = new HashMap<>());
+
+ partEntries.put(key, val);
+ }
try {
- CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
- new IgniteBiPredicate<Key, Person>() {
- @Override public boolean apply(Key key, Person p) {
- assertEquals(key.id, (Integer)p.salary);
+ int partitions = scanPartitions ? cache.context().affinity().partitions() : 1;
- return key.id % 2 == 0;
- }
- }, false);
+ for (int i = 0; i < partitions; i++) {
+ CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
+ new IgniteBiPredicate<Key, Person>() {
+ @Override public boolean apply(Key key, Person p) {
+ assertEquals(key.id, (Integer)p.salary);
- Collection<Map.Entry<Key, Person>> res = qry.execute().get();
+ return key.id % 2 == 0;
+ }
+ }, (scanPartitions ? i : null), false);
- assertEquals(ENTRY_CNT / 2, res.size());
+ Collection<Map.Entry<Key, Person>> res = qry.execute().get();
- for (Map.Entry<Key, Person> e : res) {
- Key k = e.getKey();
- Person p = e.getValue();
+ if (!scanPartitions)
+ assertEquals(ENTRY_CNT / 2, res.size());
- assertEquals(k.id, (Integer)p.salary);
- assertEquals(0, k.id % 2);
- }
+ for (Map.Entry<Key, Person> e : res) {
+ Key k = e.getKey();
+ Person p = e.getValue();
- qry = cache.context().queries().createScanQuery(null, false);
+ assertEquals(k.id, (Integer)p.salary);
+ assertEquals(0, k.id % 2);
- res = qry.execute().get();
+ if (scanPartitions) {
+ Map<Key, Person> partEntries = entries.get(i);
- assertEquals(ENTRY_CNT, res.size());
+ assertEquals(p, partEntries.get(k));
+ }
+ }
+
+ qry = cache.context().queries().createScanQuery(null, (scanPartitions ? i : null), false);
+
+ res = qry.execute().get();
+
+ if (!scanPartitions)
+ assertEquals(ENTRY_CNT, res.size());
+ }
testMultithreaded(cache, ENTRY_CNT / 2);
}
@@ -185,7 +217,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
return key.id % 2 == 0;
}
- }, false);
+ }, null, false);
for (int i = 0; i < 250; i++) {
Collection<Map.Entry<Key, Person>> res = qry.execute().get();
@@ -229,7 +261,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
return val % 2 == 0;
}
- }, false);
+ }, null, false);
Collection<Map.Entry<String, Long>> res = qry.execute().get();
@@ -244,7 +276,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
assertEquals(0, val % 2);
}
- qry = cache.context().queries().createScanQuery(null, false);
+ qry = cache.context().queries().createScanQuery(null, null, false);
res = qry.execute().get();
@@ -284,7 +316,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
return key % 2 == 0;
}
- }, false);
+ }, null, false);
Collection<Map.Entry<Integer, byte[]>> res = qry.execute().get();
@@ -299,7 +331,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
assertEquals(0, key % 2);
}
- qry = cache.context().queries().createScanQuery(null, false);
+ qry = cache.context().queries().createScanQuery(null, null, false);
res = qry.execute().get();
@@ -367,5 +399,29 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
this.name = name;
this.salary = salary;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Person person = (Person)o;
+
+ if (salary != person.salary)
+ return false;
+
+ return !(name != null ? !name.equals(person.name) : person.name != null);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = name != null ? name.hashCode() : 0;
+
+ return 31 * result + salary;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4cc4b6c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 1a60bbd..eb5027c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -61,6 +61,9 @@ import static org.junit.Assert.*;
* Various tests for cache queries.
*/
public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstractTest {
+ /** Key count. */
+ private static final int KEY_CNT = 5000;
+
/** Cache store. */
private static TestStore store = new TestStore();
@@ -643,6 +646,47 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
}
/**
+ * @throws Exception In case of error.
+ */
+ public void testScanPartitionQuery() throws Exception {
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ GridCacheContext cctx = ((IgniteCacheProxy)cache).context();
+
+ Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
+
+ for (int i = 0; i < KEY_CNT; i++) {
+ cache.put(i, i);
+
+ int part = cctx.affinity().partition(i);
+
+ Map<Integer, Integer> partEntries = entries.get(part);
+
+ if (partEntries == null)
+ entries.put(part, partEntries = new HashMap<>());
+
+ partEntries.put(i, i);
+ }
+
+ for (int i = 0; i < cctx.affinity().partitions(); i++) {
+ CacheQuery<Map.Entry<Integer, Integer>> qry =
+ ((IgniteCacheProxy<Integer, Integer>)cache).context().queries().createScanQuery(null, i, false);
+
+ CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+
+ Map<Integer, Integer> exp = entries.get(i);
+
+ Collection<Map.Entry<Integer, Integer>> actual = fut.get();
+
+ if (exp == null)
+ assertTrue(actual.isEmpty());
+ else
+ for (Map.Entry<Integer, Integer> entry : actual)
+ assertTrue(entry.getValue().equals(exp.get(entry.getKey())));
+ }
+ }
+
+ /**
* JUnit.
*
* @throws Exception In case of error.
@@ -1048,11 +1092,13 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
for (int i = 0; i < 20; i++)
cache.put(i, i);
- QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(new IgniteBiPredicate<Integer,Integer>() {
+ IgniteBiPredicate<Integer, Integer> filter = new IgniteBiPredicate<Integer, Integer>() {
@Override public boolean apply(Integer k, Integer v) {
return k >= 10;
}
- }));
+ };
+
+ QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(filter));
q.getAll();
@@ -1187,7 +1233,8 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
* @return {@code true} if index has a table for given class.
* @throws IgniteCheckedException If failed.
*/
- private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr) throws IgniteCheckedException {
+ private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr)
+ throws IgniteCheckedException {
return qryMgr.size(cls) != -1;
}