You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/11 09:18:22 UTC
[07/50] incubator-ignite git commit: ignite-389 Avoid backups
filtering in case of partition scan query
ignite-389 Avoid backups filtering in case of partition scan query
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5d6bb532
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5d6bb532
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5d6bb532
Branch: refs/heads/ignite-484-1
Commit: 5d6bb532c7de35cfea7674b5fc1446e72a5fa985
Parents: f00a9e9
Author: agura <ag...@gridgain.com>
Authored: Thu May 28 18:30:08 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Thu May 28 18:30:08 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/cache/query/ScanQuery.java | 12 +-
.../cache/query/GridCacheQueryAdapter.java | 122 +++----------------
.../cache/query/GridCacheQueryManager.java | 9 +-
3 files changed, 28 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
index f56b0c7..e6b69bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
@@ -46,6 +46,11 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
this(null, null);
}
+ /**
+ * Creates partition scan query returning all entries for given partition.
+ *
+ * @param part Partition.
+ */
public ScanQuery(int part) {
this(part, null);
}
@@ -62,9 +67,10 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
/**
* Create scan query with filter.
*
+ * @param part Partition.
* @param filter Filter. If {@code null} then all entries will be returned.
*/
- public ScanQuery(Integer part, @Nullable IgniteBiPredicate<K, V> filter) {
+ public ScanQuery(@Nullable Integer part, @Nullable IgniteBiPredicate<K, V> filter) {
setPartition(part);
setFilter(filter);
}
@@ -96,7 +102,7 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
*
* @return Partition number or {@code null}.
*/
- public Integer getPartition() {
+ @Nullable public Integer getPartition() {
return part;
}
@@ -106,7 +112,7 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
*
* @param part Partition number over which this query should iterate.
*/
- public void setPartition(Integer part) {
+ public void setPartition(@Nullable Integer part) {
this.part = part;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 6574f0a..2f32faa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -26,14 +26,15 @@ import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.query.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.security.*;
+
import org.jetbrains.annotations.*;
import java.util.*;
-import java.util.concurrent.*;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
@@ -457,7 +458,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
qryMgr.queryFieldsDistributed(bean, nodes));
else if (type == SCAN && part != null && nodes.size() > 1)
- return new CacheQueryFallbackFuture(nodes, bean, qryMgr);
+ return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr);
else
return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
}
@@ -524,9 +525,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/**
* Wrapper for queries with fallback.
*/
- private static class CacheQueryFallbackFuture<R> extends GridCacheQueryFutureAdapter<Object, Object, R> {
- /** Target. */
- private GridCacheQueryFutureAdapter<?, ?, R> fut;
+ private static class CacheQueryFallbackFuture<R> extends GridFutureAdapter<Collection<R>>
+ implements CacheQueryFuture<R> {
+ /** Query future. */
+ private volatile GridCacheQueryFutureAdapter<?, ?, R> fut;
/** Backups. */
private final Queue<ClusterNode> nodes;
@@ -559,13 +561,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE));
- if (node != null) {
+ if (node != null)
fallbacks.add(node);
- fallbacks.addAll(F.view(nodes, F.not(IS_LOC_NODE)));
- }
- else
- fallbacks.addAll(nodes);
+ fallbacks.addAll(node != null ? F.view(nodes, F.not(IS_LOC_NODE)) : nodes);
return fallbacks;
}
@@ -576,10 +575,11 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
private void init() {
ClusterNode node = nodes.poll();
- fut = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) :
- qryMgr.queryDistributed(bean, Collections.singleton(node)));
+ GridCacheQueryFutureAdapter<?, ?, R> fut0 =
+ (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) :
+ qryMgr.queryDistributed(bean, Collections.singleton(node)));
- fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
+ fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
@Override public void apply(IgniteInternalFuture<Collection<R>> fut) {
try {
onDone(fut.get());
@@ -592,26 +592,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
}
});
- }
-
- /** {@inheritDoc} */
- @Override protected boolean onPage(UUID nodeId, boolean last) {
- return fut.onPage(nodeId, last);
- }
- /** {@inheritDoc} */
- @Override protected void loadPage() {
- fut.loadPage();
- }
-
- /** {@inheritDoc} */
- @Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
- fut.loadAllPages();
- }
-
- /** {@inheritDoc} */
- @Override protected void cancelQuery() throws IgniteCheckedException {
- fut.cancelQuery();
+ fut = fut0;
}
/** {@inheritDoc} */
@@ -625,84 +607,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
/** {@inheritDoc} */
- @Override void clear() {
- fut.clear();
- }
-
- /** {@inheritDoc} */
- @Override public long endTime() {
- return fut.endTime();
- }
-
- /** {@inheritDoc} */
- @Override protected void enqueue(Collection<?> col) {
- fut.enqueue(col);
- }
-
- /** {@inheritDoc} */
- @Override boolean fields() {
- return fut.fields();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<R> get() throws IgniteCheckedException {
- return fut.get();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException {
- return fut.get(timeout, unit);
- }
-
- /** {@inheritDoc} */
@Override public R next() {
return fut.next();
}
-
- /** {@inheritDoc} */
- @Override public Collection<R> nextPage() throws IgniteCheckedException {
- return fut.nextPage();
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(Collection<R> res, Throwable err) {
- return fut.onDone(res, err);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<R> nextPage(long timeout) throws IgniteCheckedException {
- return fut.nextPage(timeout);
- }
-
- /** {@inheritDoc} */
- @Override protected void onNodeLeft(UUID evtNodeId) {
- fut.onNodeLeft(evtNodeId);
- }
-
- /** {@inheritDoc} */
- @Override public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data,
- @Nullable Throwable err, boolean finished) {
- fut.onPage(nodeId, data, err, finished);
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- fut.onTimeout();
- }
-
- /** {@inheritDoc} */
- @Override public void printMemoryStats() {
- fut.printMemoryStats();
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheQueryBean query() {
- return fut.query();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid timeoutId() {
- return fut.timeoutId();
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index fac3d8f..652d62e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -795,7 +795,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
!locPart.reserve())
throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved");
-
iter = new Iterator<K>() {
private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
@@ -1329,9 +1328,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
K key = row.getKey();
- // Filter backups for SCAN queries. Other types are filtered in indexing manager.
- if (!cctx.isReplicated() && cctx.config().getCacheMode() != LOCAL && qry.type() == SCAN &&
- !incBackups && !cctx.affinity().primary(cctx.localNode(), key, topVer)) {
+ // Filter backups for SCAN queries, if it isn't partition scan.
+ // Other types are filtered in indexing manager.
+ if (!cctx.isReplicated() && qry.type() == SCAN && qry.partition() == null &&
+ cctx.config().getCacheMode() != LOCAL && !incBackups &&
+ !cctx.affinity().primary(cctx.localNode(), key, topVer)) {
if (log.isDebugEnabled())
log.debug("Ignoring backup element [row=" + row +
", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups +