You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/05 01:45:29 UTC
[05/30] ignite git commit: ignite-1366 Start cache processor before
query processor. Initialize topology version for GridCacheQueryRequest to do
not miss messages before message handler is registered.
ignite-1366 Start cache processor before query processor. Initialize topology version for GridCacheQueryRequest to do not miss messages before message handler is registered.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15f3edb5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15f3edb5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15f3edb5
Branch: refs/heads/ignite-264
Commit: 15f3edb546c9f08ed46e3baa51f41250d57b1d98
Parents: f1f6be8
Author: sboikov <sb...@gridgain.com>
Authored: Fri Sep 4 10:30:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Sep 4 10:30:18 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../dht/GridPartitionedGetFuture.java | 14 +-
.../distributed/near/GridNearGetFuture.java | 13 ++
.../query/GridCacheDistributedQueryFuture.java | 5 +-
.../query/GridCacheDistributedQueryManager.java | 9 +-
.../cache/query/GridCacheQueryManager.java | 177 ++++++-------------
.../cache/query/GridCacheQueryRequest.java | 59 ++++++-
.../IgniteCacheNodeJoinAbstractTest.java | 42 +++++
8 files changed, 185 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index ad4940a..7deede7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -876,8 +876,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(new GridAffinityProcessor(ctx));
startProcessor(createComponent(GridSegmentationProcessor.class, ctx));
startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
- startProcessor(new GridQueryProcessor(ctx));
startProcessor(new GridCacheProcessor(ctx));
+ startProcessor(new GridQueryProcessor(ctx));
startProcessor(new GridTaskSessionProcessor(ctx));
startProcessor(new GridJobProcessor(ctx));
startProcessor(new GridTaskProcessor(ctx));
http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 2f0de86..3ddf6d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -770,6 +770,18 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
if (log.isDebugEnabled())
log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']');
+ if (!canRemap) {
+ map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
+ @Override public boolean apply(KeyCacheObject key) {
+ return invalidParts.contains(cctx.affinity().partition(key));
+ }
+ }), F.t(node, keys), topVer);
+
+ onDone(createResultMap(res.entries()));
+
+ return;
+ }
+
// Need to wait for next topology version to remap.
IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
@@ -779,7 +791,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get());
// This will append new futures to compound list.
- map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
+ map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
@Override public boolean apply(KeyCacheObject key) {
return invalidParts.contains(cctx.affinity().partition(key));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 9d2113e..a7875f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -904,6 +904,19 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
if (log.isDebugEnabled())
log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']');
+ if (!canRemap) {
+ map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
+ @Override public boolean apply(KeyCacheObject key) {
+ return invalidParts.contains(cctx.affinity().partition(key));
+ }
+ }), F.t(node, keys), topVer);
+
+ // It is critical to call onDone after adding futures to compound list.
+ onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer));
+
+ return;
+ }
+
// Need to wait for next topology version to remap.
IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 32a4599..1d547c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -98,7 +98,10 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
subgrid.clear();
}
- final GridCacheQueryRequest req = new GridCacheQueryRequest(cctx.cacheId(), reqId, fields());
+ final GridCacheQueryRequest req = new GridCacheQueryRequest(cctx.cacheId(),
+ reqId,
+ fields(),
+ qryMgr.queryTopologyVersion());
// Process cancel query directly (without sending) for local node,
cctx.closures().callLocalSafe(new Callable<Object>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index d1fdfcf..4422952 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -566,7 +566,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
false,
qry.query().keepPortable(),
qry.query().subjectId(),
- qry.query().taskHash());
+ qry.query().taskHash(),
+ queryTopologyVersion());
addQueryFuture(req.id(), fut);
@@ -610,7 +611,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
all,
qry.keepPortable(),
qry.subjectId(),
- qry.taskHash());
+ qry.taskHash(),
+ queryTopologyVersion());
sendRequest(fut, req, nodes);
}
@@ -675,7 +677,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
qry.query().includeMetadata(),
qry.query().keepPortable(),
qry.query().subjectId(),
- qry.query().taskHash());
+ qry.query().taskHash(),
+ queryTopologyVersion());
addQueryFuture(req.id(), fut);
http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index b3f8720..2041464 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -168,6 +168,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/** */
private boolean enabled;
+ /** */
+ private AffinityTopologyVersion qryTopVer;
+
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
qryProc = cctx.kernalContext().query();
@@ -182,12 +185,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (futs != null) {
for (Map.Entry<Long, GridFutureAdapter<QueryResult<K, V>>> entry : futs.entrySet()) {
- final Object recipient = recipient(nodeId, entry.getKey());
+ final Object rcpt = recipient(nodeId, entry.getKey());
entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() {
@Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
throws IgniteCheckedException {
- f.get().closeIfNotShared(recipient);
+ f.get().closeIfNotShared(rcpt);
}
});
}
@@ -197,12 +200,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (fieldsFuts != null) {
for (Map.Entry<Long, GridFutureAdapter<FieldsResult>> entry : fieldsFuts.entrySet()) {
- final Object recipient = recipient(nodeId, entry.getKey());
+ final Object rcpt = recipient(nodeId, entry.getKey());
entry.getValue().listen(new CIX1<IgniteInternalFuture<FieldsResult>>() {
@Override public void applyx(IgniteInternalFuture<FieldsResult> f)
throws IgniteCheckedException {
- f.get().closeIfNotShared(recipient);
+ f.get().closeIfNotShared(rcpt);
}
});
}
@@ -213,6 +216,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
enabled = GridQueryProcessor.isEnabled(cctx.config());
+
+ qryTopVer = cctx.startTopologyVersion();
+
+ if (qryTopVer == null)
+ qryTopVer = new AffinityTopologyVersion(cctx.localNode().order(), 0);
}
/**
@@ -281,16 +289,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/**
* Rebuilds all search indexes of given value type.
*
- * @param valType Value type.
- * @return Future that will be completed when rebuilding of all indexes is finished.
- */
- public IgniteInternalFuture<?> rebuildIndexes(Class<?> valType) {
- return rebuildIndexes(valType.getName());
- }
-
- /**
- * Rebuilds all search indexes of given value type.
- *
* @param typeName Value type name.
* @return Future that will be completed when rebuilding of all indexes is finished.
*/
@@ -307,23 +305,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
- * Rebuilds all search indexes of all types.
- *
- * @return Future that will be completed when rebuilding of all indexes is finished.
- */
- public IgniteInternalFuture<?> rebuildAllIndexes() {
- if (!enterBusy())
- throw new IllegalStateException("Failed to rebuild indexes (grid is stopping).");
-
- try {
- return qryProc.rebuildAllIndexes();
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
* Marks this request as canceled.
*
* @param reqId Request id.
@@ -531,12 +512,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param loc Local query or not.
* @param subjId Security subject ID.
* @param taskName Task name.
- * @param recipient ID of the recipient.
+ * @param rcpt ID of the recipient.
* @return Collection of found keys.
* @throws IgniteCheckedException In case of error.
*/
+ @SuppressWarnings("unchecked")
private QueryResult<K, V> executeQuery(GridCacheQueryAdapter<?> qry,
- @Nullable Object[] args, boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object recipient)
+ @Nullable Object[] args, boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object rcpt)
throws IgniteCheckedException {
if (qry.type() == null) {
assert !loc;
@@ -555,16 +537,16 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
res = (QueryResult<K, V>)qryResCache.get(resKey);
- if (res != null && res.addRecipient(recipient))
+ if (res != null && res.addRecipient(rcpt))
return res;
- res = new QueryResult<>(qry.type(), recipient);
+ res = new QueryResult<>(qry.type(), rcpt);
if (qryResCache.putIfAbsent(resKey, res) != null)
resKey = null;
}
else
- res = new QueryResult<>(qry.type(), recipient);
+ res = new QueryResult<>(qry.type(), rcpt);
GridCloseableIterator<IgniteBiTuple<K, V>> iter;
@@ -667,12 +649,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param loc Local query or not.
* @param subjId Security subject ID.
* @param taskName Task name.
- * @param recipient ID of the recipient.
+ * @param rcpt ID of the recipient.
* @return Collection of found keys.
* @throws IgniteCheckedException In case of error.
*/
private FieldsResult executeFieldsQuery(GridCacheQueryAdapter<?> qry, @Nullable Object[] args,
- boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object recipient) throws IgniteCheckedException {
+ boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object rcpt) throws IgniteCheckedException {
assert qry != null;
FieldsResult res;
@@ -709,10 +691,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
res = (FieldsResult)qryResCache.get(resKey);
- if (res != null && res.addRecipient(recipient))
+ if (res != null && res.addRecipient(rcpt))
return res; // Cached result found.
- res = new FieldsResult(recipient);
+ res = new FieldsResult(rcpt);
if (qryResCache.putIfAbsent(resKey, res) != null)
resKey = null; // Failed to cache result.
@@ -736,7 +718,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
taskName));
}
- res = new FieldsResult(recipient);
+ res = new FieldsResult(rcpt);
}
try {
@@ -1191,7 +1173,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
// If metadata needs to be returned to user and cleaned from internal fields - copy it.
List<GridQueryFieldMetadata> meta = qryInfo.includeMetaData() ?
- (res.metaData() != null ? new ArrayList<GridQueryFieldMetadata>(res.metaData()) : null) :
+ (res.metaData() != null ? new ArrayList<>(res.metaData()) : null) :
res.metaData();
if (!qryInfo.includeMetaData())
@@ -1996,6 +1978,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
+ * @return Topology version for query requests.
+ */
+ public AffinityTopologyVersion queryTopologyVersion() {
+ return qryTopVer;
+ }
+
+ /**
* @param qry Query.
* @return Filter.
*/
@@ -2347,10 +2336,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/**
* @param type Query type.
- * @param recipient ID of the recipient.
+ * @param rcpt ID of the recipient.
*/
- private QueryResult(GridCacheQueryType type, Object recipient) {
- super(recipient);
+ private QueryResult(GridCacheQueryType type, Object rcpt) {
+ super(rcpt);
this.type = type;
}
@@ -2374,10 +2363,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private List<GridQueryFieldMetadata> meta;
/**
- * @param recipient ID of the recipient.
+ * @param rcpt ID of the recipient.
*/
- FieldsResult(Object recipient) {
- super(recipient);
+ FieldsResult(Object rcpt) {
+ super(rcpt);
}
/**
@@ -2674,39 +2663,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
- *
- */
- private class GridCacheScanSwapEntry implements Cache.Entry<K, V> {
- /** */
- private final AbstractLazySwapEntry e;
-
- /**
- * @param e Entry.
- */
- private GridCacheScanSwapEntry(AbstractLazySwapEntry e) {
- this.e = e;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public V getValue() {
- return e.value();
- }
-
- /** {@inheritDoc} */
- @Override public K getKey() {
- return e.key();
- }
-
- /** {@inheritDoc} */
- @Override public <T> T unwrap(Class<T> clazz) {
- if (clazz.isAssignableFrom(getClass()))
- return clazz.cast(this);
-
- throw new IllegalArgumentException();
- }
- }
-
- /**
* Cached result.
*/
private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> {
@@ -2720,10 +2676,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
/**
- * @param recipient ID of the recipient.
+ * @param rcpt ID of the recipient.
*/
- protected CachedResult(Object recipient) {
- boolean res = addRecipient(recipient);
+ protected CachedResult(Object rcpt) {
+ boolean res = addRecipient(rcpt);
assert res;
}
@@ -2731,17 +2687,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/**
* Close if this result does not have any other recipients.
*
- * @param recipient ID of the recipient.
+ * @param rcpt ID of the recipient.
* @throws IgniteCheckedException If failed.
*/
- public void closeIfNotShared(Object recipient) throws IgniteCheckedException {
+ public void closeIfNotShared(Object rcpt) throws IgniteCheckedException {
assert isDone();
synchronized (recipients) {
if (recipients.isEmpty())
return;
- recipients.remove(recipient);
+ recipients.remove(rcpt);
if (recipients.isEmpty())
get().close();
@@ -2749,17 +2705,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
- * @param recipient ID of the recipient.
+ * @param rcpt ID of the recipient.
* @return {@code true} If the recipient successfully added.
*/
- public boolean addRecipient(Object recipient) {
+ public boolean addRecipient(Object rcpt) {
synchronized (recipients) {
if (isDone())
return false;
- assert !recipients.containsKey(recipient) : recipient + " -> " + recipients;
+ assert !recipients.containsKey(rcpt) : rcpt + " -> " + recipients;
- recipients.put(recipient, new QueueIterator(recipient));
+ recipients.put(rcpt, new QueueIterator(rcpt));
}
return true;
@@ -2798,18 +2754,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
- * @param recipient ID of the recipient.
+ * @param rcpt ID of the recipient.
* @throws IgniteCheckedException If failed.
*/
- public IgniteSpiCloseableIterator<R> iterator(Object recipient) throws IgniteCheckedException {
- assert recipient != null;
+ public IgniteSpiCloseableIterator<R> iterator(Object rcpt) throws IgniteCheckedException {
+ assert rcpt != null;
IgniteSpiCloseableIterator<R> it = get();
assert it != null;
synchronized (recipients) {
- return queue == null ? it : recipients.get(recipient);
+ return queue == null ? it : recipients.get(rcpt);
}
}
@@ -2825,7 +2781,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private static final int NEXT_SIZE = 64;
/** */
- private final Object recipient;
+ private final Object rcpt;
/** */
private int pos;
@@ -2834,10 +2790,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private Queue<R> next;
/**
- * @param recipient ID of the recipient.
+ * @param rcpt ID of the recipient.
*/
- private QueueIterator(Object recipient) {
- this.recipient = recipient;
+ private QueueIterator(Object rcpt) {
+ this.rcpt = rcpt;
}
/**
@@ -2850,7 +2806,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/** {@inheritDoc} */
@Override public void close() throws IgniteCheckedException {
- closeIfNotShared(recipient);
+ closeIfNotShared(rcpt);
}
/** {@inheritDoc} */
@@ -3101,25 +3057,4 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
false,
keepPortable);
}
-
- /**
- * Creates SQL fields query which will include results metadata if needed.
- *
- * @param qry SQL query.
- * @param incMeta Whether to include results metadata.
- * @param keepPortable Keep portable flag.
- * @return Created query.
- */
- public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean incMeta, boolean keepPortable) {
- assert qry != null;
-
- return new GridCacheQueryAdapter<>(cctx,
- SQL_FIELDS,
- null,
- qry,
- null,
- null,
- incMeta,
- keepPortable);
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index c21ac66..f7ef76f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
@@ -121,6 +122,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/** Partition. */
private int part;
+ /** */
+ private AffinityTopologyVersion topVer;
+
/**
* Required by {@link Externalizable}
*/
@@ -129,13 +133,21 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
}
/**
+ * Creates cancel query request.
+ *
+ * @param cacheId Cache ID.
* @param id Request to cancel.
* @param fields Fields query flag.
+ * @param topVer Topology version.
*/
- public GridCacheQueryRequest(int cacheId, long id, boolean fields) {
+ public GridCacheQueryRequest(int cacheId,
+ long id,
+ boolean fields,
+ AffinityTopologyVersion topVer) {
this.cacheId = cacheId;
this.id = id;
this.fields = fields;
+ this.topVer = topVer;
cancel = true;
}
@@ -151,6 +163,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
* @param fields Fields query flag.
* @param all Whether to load all pages.
* @param keepPortable Whether to keep portables.
+ * @param subjId Subject ID.
+ * @param taskHash Task name hash code.
+ * @param topVer Topology version.
*/
public GridCacheQueryRequest(
int cacheId,
@@ -162,7 +177,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
boolean all,
boolean keepPortable,
UUID subjId,
- int taskHash
+ int taskHash,
+ AffinityTopologyVersion topVer
) {
this.cacheId = cacheId;
this.id = id;
@@ -174,6 +190,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
this.keepPortable = keepPortable;
this.subjId = subjId;
this.taskHash = taskHash;
+ this.topVer = topVer;
}
/**
@@ -192,6 +209,10 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
* @param incBackups {@code true} if need to include backups.
* @param args Query arguments.
* @param incMeta Include meta data or not.
+ * @param keepPortable Keep portable flag.
+ * @param subjId Subject ID.
+ * @param taskHash Task name hash code.
+ * @param topVer Topology version.
*/
public GridCacheQueryRequest(
int cacheId,
@@ -211,7 +232,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
boolean incMeta,
boolean keepPortable,
UUID subjId,
- int taskHash
+ int taskHash,
+ AffinityTopologyVersion topVer
) {
assert type != null || fields;
assert clause != null || (type == SCAN || type == SET || type == SPI);
@@ -235,10 +257,15 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
this.keepPortable = keepPortable;
this.subjId = subjId;
this.taskHash = taskHash;
+ this.topVer = topVer;
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
@@ -554,12 +581,18 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
writer.incrementState();
case 20:
- if (!writer.writeByteArray("transBytes", transBytes))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
case 21:
+ if (!writer.writeByteArray("transBytes", transBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 22:
if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1))
return false;
@@ -718,7 +751,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
reader.incrementState();
case 20:
- transBytes = reader.readByteArray("transBytes");
+ topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
return false;
@@ -726,6 +759,14 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
reader.incrementState();
case 21:
+ transBytes = reader.readByteArray("transBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 22:
byte typeOrd;
typeOrd = reader.readByte("type");
@@ -749,11 +790,11 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 22;
+ return 23;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheQueryRequest.class, this, super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
index 58aa571..2e7f2ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
@@ -106,4 +107,45 @@ public abstract class IgniteCacheNodeJoinAbstractTest extends IgniteCacheAbstrac
stopGrid(1);
}
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testScanQuery() throws Exception {
+ final IgniteCache<Integer, Integer> cache = jcache(0);
+
+ for (int i = 0; i < 5; i++) {
+ log.info("Iteration: " + i);
+
+ final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ startGrid(1);
+
+ return null;
+ }
+ });
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ScanQuery qry = new ScanQuery();
+
+ while (!stop.get() && !fut.isDone())
+ cache.query(qry).getAll();
+
+ return null;
+ }
+ }, 10, "test-qry");
+
+ try {
+ fut.get(60_000);
+ }
+ finally {
+ stop.set(true);
+ }
+
+ stopGrid(1);
+ }
+ }
}
\ No newline at end of file