You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 12:31:22 UTC
[06/50] incubator-ignite git commit: ignite-389 Change fallback nodes
order: local first
ignite-389 Change fallback nodes order: local first
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d151244e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d151244e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d151244e
Branch: refs/heads/ignite-929
Commit: d151244ee95ad2bba986136561e2326a434c3b5b
Parents: 5a7dd02
Author: agura <ag...@gridgain.com>
Authored: Wed May 27 22:16:28 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed May 27 22:16:28 2015 +0300
----------------------------------------------------------------------
.../cache/query/GridCacheQueryAdapter.java | 132 ++++++++++---------
...CacheScanPartitionQueryFallbackSelfTest.java | 2 +-
2 files changed, 72 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d151244e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 1f7b736..6574f0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -41,6 +41,13 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
* Query adapter.
*/
public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
+ /** Is local node predicate. */
+ private static final IgnitePredicate<ClusterNode> IS_LOC_NODE = new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode n) {
+ return n.isLocal();
+ }
+ };
+
/** */
private final GridCacheContext<?, ?> cctx;
@@ -449,24 +456,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
if (type == SQL_FIELDS || type == SPI)
return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
qryMgr.queryFieldsDistributed(bean, nodes));
- else {
- final CacheQueryFuture<R> fut =
- (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
-
- if (type == SCAN && part != null) {
- assert nodes.size() == 1;
-
- final Queue<ClusterNode> backups = new LinkedList<>(
- cctx.affinity().backups(part, cctx.affinity().affinityTopologyVersion()));
-
- if (F.isEmpty(backups))
- return fut;
-
- return new CacheQueryFallbackFuture<>(backups, bean, qryMgr, fut);
- }
-
- return fut;
- }
+ else if (type == SCAN && part != null && nodes.size() > 1)
+ return new CacheQueryFallbackFuture(nodes, bean, qryMgr);
+ else
+ return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
}
/**
@@ -484,7 +477,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
return Collections.singletonList(cctx.localNode());
case REPLICATED:
- if (prj != null)
+ if (prj != null || partition() != null)
return nodes(cctx, prj, partition());
return cctx.affinityNode() ?
@@ -508,12 +501,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
@Nullable final ClusterGroup prj, @Nullable final Integer part) {
assert cctx != null;
- final List<ClusterNode> owners = part == null ? null :
- cctx.topology().owners(part, cctx.affinity().affinityTopologyVersion());
+ final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+
+ final Set<ClusterNode> owners =
+ part == null ? Collections.<ClusterNode>emptySet() : new HashSet<>(cctx.topology().owners(part, topVer));
return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
- AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
(prj == null || prj.node(n.id()) != null) &&
@@ -532,10 +526,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
*/
private static class CacheQueryFallbackFuture<R> extends GridCacheQueryFutureAdapter<Object, Object, R> {
/** Target. */
- private GridCacheQueryFutureAdapter<?, ?, R> target;
+ private GridCacheQueryFutureAdapter<?, ?, R> fut;
/** Backups. */
- private final Queue<ClusterNode> backups;
+ private final Queue<ClusterNode> nodes;
/** Bean. */
private final GridCacheQueryBean bean;
@@ -544,41 +538,57 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
private final GridCacheQueryManager qryMgr;
/**
- * @param backups Backups.
+ * @param nodes Backups.
* @param bean Bean.
* @param qryMgr Query manager.
- * @param fut Future.
*/
- public CacheQueryFallbackFuture(Queue<ClusterNode> backups, GridCacheQueryBean bean,
- GridCacheQueryManager qryMgr, CacheQueryFuture<R> fut) {
- this.backups = backups;
+ public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, GridCacheQueryBean bean,
+ GridCacheQueryManager qryMgr) {
+ this.nodes = fallbacks(nodes);
this.bean = bean;
this.qryMgr = qryMgr;
- this.target = (GridCacheQueryFutureAdapter<?, ?, R>)fut;
init();
}
/**
+ * @param nodes Nodes.
+ */
+ private Queue<ClusterNode> fallbacks(Collection<ClusterNode> nodes) {
+ Queue<ClusterNode> fallbacks = new LinkedList<>();
+
+ ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE));
+
+ if (node != null) {
+ fallbacks.add(node);
+
+ fallbacks.addAll(F.view(nodes, F.not(IS_LOC_NODE)));
+ }
+ else
+ fallbacks.addAll(nodes);
+
+ return fallbacks;
+ }
+
+ /**
*
*/
private void init() {
- target.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
+ ClusterNode node = nodes.poll();
+
+ fut = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) :
+ qryMgr.queryDistributed(bean, Collections.singleton(node)));
+
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
@Override public void apply(IgniteInternalFuture<Collection<R>> fut) {
try {
onDone(fut.get());
}
catch (IgniteCheckedException e) {
- if (F.isEmpty(backups))
+ if (F.isEmpty(nodes))
onDone(e);
- else {
- Set<ClusterNode> backup = Collections.singleton(backups.poll());
-
- target =
- (GridCacheQueryFutureAdapter<?, ?, R>)qryMgr.queryDistributed(bean, backup);
-
+ else
init();
- }
}
}
});
@@ -586,113 +596,113 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** {@inheritDoc} */
@Override protected boolean onPage(UUID nodeId, boolean last) {
- return target.onPage(nodeId, last);
+ return fut.onPage(nodeId, last);
}
/** {@inheritDoc} */
@Override protected void loadPage() {
- target.loadPage();
+ fut.loadPage();
}
/** {@inheritDoc} */
@Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
- target.loadAllPages();
+ fut.loadAllPages();
}
/** {@inheritDoc} */
@Override protected void cancelQuery() throws IgniteCheckedException {
- target.cancelQuery();
+ fut.cancelQuery();
}
/** {@inheritDoc} */
@Override public int available() {
- return target.available();
+ return fut.available();
}
/** {@inheritDoc} */
@Override public boolean cancel() throws IgniteCheckedException {
- return target.cancel();
+ return fut.cancel();
}
/** {@inheritDoc} */
@Override void clear() {
- target.clear();
+ fut.clear();
}
/** {@inheritDoc} */
@Override public long endTime() {
- return target.endTime();
+ return fut.endTime();
}
/** {@inheritDoc} */
@Override protected void enqueue(Collection<?> col) {
- target.enqueue(col);
+ fut.enqueue(col);
}
/** {@inheritDoc} */
@Override boolean fields() {
- return target.fields();
+ return fut.fields();
}
/** {@inheritDoc} */
@Override public Collection<R> get() throws IgniteCheckedException {
- return target.get();
+ return fut.get();
}
/** {@inheritDoc} */
@Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException {
- return target.get(timeout, unit);
+ return fut.get(timeout, unit);
}
/** {@inheritDoc} */
@Override public R next() {
- return target.next();
+ return fut.next();
}
/** {@inheritDoc} */
@Override public Collection<R> nextPage() throws IgniteCheckedException {
- return target.nextPage();
+ return fut.nextPage();
}
/** {@inheritDoc} */
@Override public boolean onDone(Collection<R> res, Throwable err) {
- return target.onDone(res, err);
+ return fut.onDone(res, err);
}
/** {@inheritDoc} */
@Override public Collection<R> nextPage(long timeout) throws IgniteCheckedException {
- return target.nextPage(timeout);
+ return fut.nextPage(timeout);
}
/** {@inheritDoc} */
@Override protected void onNodeLeft(UUID evtNodeId) {
- target.onNodeLeft(evtNodeId);
+ fut.onNodeLeft(evtNodeId);
}
/** {@inheritDoc} */
@Override public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data,
@Nullable Throwable err, boolean finished) {
- target.onPage(nodeId, data, err, finished);
+ fut.onPage(nodeId, data, err, finished);
}
/** {@inheritDoc} */
@Override public void onTimeout() {
- target.onTimeout();
+ fut.onTimeout();
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
- target.printMemoryStats();
+ fut.printMemoryStats();
}
/** {@inheritDoc} */
@Override public GridCacheQueryBean query() {
- return target.query();
+ return fut.query();
}
/** {@inheritDoc} */
@Override public IgniteUuid timeoutId() {
- return target.timeoutId();
+ return fut.timeoutId();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d151244e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index 3b1b842..31336e6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -41,7 +41,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
private static final int GRID_CNT = 5;
/** Kys count. */
- private static final int KEYS_CNT = 1;
+ private static final int KEYS_CNT = 5000;
/** Backups. */
private int backups;