You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/12 14:09:12 UTC
[06/22] ignite git commit: ignite-2587 Fixed continuous query
notifications in offheap mode and BinaryObjectOffheapImpl usage.
ignite-2587 Fixed continuous query notifications in offheap mode and BinaryObjectOffheapImpl usage.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c05fc02
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c05fc02
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c05fc02
Branch: refs/heads/ignite-2407
Commit: 4c05fc0254f446ef040f5d22a066a0d4916a589e
Parents: 0b47d5c
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 10 14:07:40 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 10 14:07:40 2016 +0300
----------------------------------------------------------------------
.../processors/cache/CacheLazyEntry.java | 3 +
.../processors/cache/GridCacheContext.java | 4 +-
.../processors/cache/GridCacheMapEntry.java | 118 +++-
.../binary/CacheObjectBinaryProcessorImpl.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 79 ++-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 85 ++-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 38 +-
.../cache/query/GridCacheQueryManager.java | 30 +-
.../continuous/CacheContinuousQueryHandler.java | 3 +-
.../CacheContinuousQueryListener.java | 2 +-
.../continuous/CacheContinuousQueryManager.java | 120 +++-
.../continuous/GridContinuousProcessor.java | 16 +-
.../IgniteCacheEntryListenerAbstractTest.java | 454 ++++++++----
...cheEntryListenerAtomicOffheapTieredTest.java | 32 +
...cheEntryListenerAtomicOffheapValuesTest.java | 32 +
...teCacheEntryListenerTxOffheapTieredTest.java | 32 +
...teCacheEntryListenerTxOffheapValuesTest.java | 32 +
.../cache/IgniteCacheEntryListenerTxTest.java | 1 +
...ContinuousQueryFailoverAbstractSelfTest.java | 10 +
...tomicPrimaryWriteOrderOffheapTieredTest.java | 33 +
...tinuousQueryFailoverTxOffheapTieredTest.java | 32 +
...acheContinuousQueryRandomOperationsTest.java | 684 +++++++++++++++++++
...ridCacheContinuousQueryAbstractSelfTest.java | 19 +-
...eContinuousQueryAtomicOffheapTieredTest.java | 32 +
...eContinuousQueryAtomicOffheapValuesTest.java | 32 +
...CacheContinuousQueryTxOffheapTieredTest.java | 32 +
...CacheContinuousQueryTxOffheapValuesTest.java | 32 +
.../junits/common/GridCommonAbstractTest.java | 2 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 8 +
.../IgniteCacheQuerySelfTestSuite.java | 14 +
30 files changed, 1743 insertions(+), 274 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
index 05a6fef..30933e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
@@ -50,6 +50,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
* @param cctx Cache context.
* @param keyObj Key cache object.
* @param valObj Cache object value.
+ * @param keepBinary Keep binary flag.
*/
public CacheLazyEntry(GridCacheContext cctx, KeyCacheObject keyObj, CacheObject valObj, boolean keepBinary) {
this.cctx = cctx;
@@ -61,6 +62,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
/**
* @param keyObj Key cache object.
* @param val Value.
+ * @param keepBinary Keep binary flag.
* @param cctx Cache context.
*/
public CacheLazyEntry(GridCacheContext cctx, KeyCacheObject keyObj, V val, boolean keepBinary) {
@@ -75,6 +77,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
* @param keyObj Key cache object.
* @param key Key value.
* @param valObj Cache object
+ * @param keepBinary Keep binary flag.
* @param val Cache value.
*/
public CacheLazyEntry(GridCacheContext<K, V> ctx,
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index e875df0..5729959 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1729,10 +1729,10 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return Heap-based object.
*/
@Nullable public <T> T unwrapTemporary(@Nullable Object obj) {
- if (!offheapTiered())
+ if (!useOffheapEntry())
return (T)obj;
- return (T) cacheObjects().unwrapTemporary(this, obj);
+ return (T)cacheObjects().unwrapTemporary(this, obj);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ae40295..9336e0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.Cache;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtr
import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -1122,7 +1124,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert newVer != null : "Failed to get write version for tx: " + tx;
- old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val;
+ boolean internal = isInternal() || !context().userCache();
+
+ Map<UUID, CacheContinuousQueryListener> lsnrCol =
+ notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
+
+ old = (retval || intercept || lsnrCol != null) ?
+ rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : this.val;
if (intercept) {
val0 = CU.value(val, cctx, false);
@@ -1206,10 +1214,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
keepBinary);
}
- if (cctx.isLocal() || cctx.isReplicated() ||
- (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
- cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
- partition(), tx.local(), false, updateCntr0, topVer);
+ if (lsnrCol != null) {
+ cctx.continuousQueries().onEntryUpdated(
+ lsnrCol,
+ key,
+ val,
+ old,
+ internal,
+ partition(),
+ tx.local(),
+ false,
+ updateCntr0,
+ topVer);
+ }
cctx.dataStructures().onEntryUpdated(key, false, keepBinary);
}
@@ -1304,7 +1321,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion();
- old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val;
+ boolean internal = isInternal() || !context().userCache();
+
+ Map<UUID, CacheContinuousQueryListener> lsnrCol =
+ notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
+
+ old = (retval || intercept || lsnrCol != null) ?
+ rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
if (intercept) {
entry0 = new CacheLazyEntry(cctx, key, old, keepBinary);
@@ -1388,10 +1411,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
keepBinary);
}
- if (cctx.isLocal() || cctx.isReplicated() ||
- (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
- cctx.continuousQueries().onEntryUpdated(key, null, old, isInternal()
- || !context().userCache(),partition(), tx.local(), false, updateCntr0, topVer);
+ if (lsnrCol != null) {
+ cctx.continuousQueries().onEntryUpdated(
+ lsnrCol,
+ key,
+ null,
+ old,
+ internal,
+ partition(),
+ tx.local(),
+ false,
+ updateCntr0,
+ topVer);
+ }
cctx.dataStructures().onEntryUpdated(key, true, keepBinary);
@@ -1440,6 +1472,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return new GridCacheUpdateTxResult(false, null);
}
+ /**
+ * @param tx Transaction.
+ * @return {@code True} if should notify continuous query manager.
+ */
+ private boolean notifyContinuousQueries(@Nullable IgniteInternalTx tx) {
+ return cctx.isLocal() ||
+ cctx.isReplicated() ||
+ (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local()));
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public GridTuple3<Boolean, Object, EntryProcessorResult<Object>> innerUpdateLocal(
@@ -1470,7 +1512,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
EntryProcessorResult<Object> invokeRes = null;
synchronized (this) {
- boolean needVal = retval || intercept || op == GridCacheOperation.TRANSFORM || !F.isEmpty(filter);
+ boolean internal = isInternal() || !context().userCache();
+
+ Map<UUID, CacheContinuousQueryListener> lsnrCol =
+ cctx.continuousQueries().updateListeners(internal, false);
+
+ boolean needVal = retval ||
+ intercept ||
+ op == GridCacheOperation.TRANSFORM ||
+ !F.isEmpty(filter) ||
+ lsnrCol != null;
checkObsolete();
@@ -1479,7 +1530,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
unswap(retval);
// Possibly get old value form store.
- old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+ old = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
boolean readFromStore = false;
@@ -1731,11 +1782,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (res)
updateMetrics(op, metrics);
- if (!isNear()) {
+ if (lsnrCol != null) {
long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE);
- cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
- partition(), true, false, updateCntr, AffinityTopologyVersion.NONE);
+ cctx.continuousQueries().onEntryUpdated(
+ lsnrCol,
+ key,
+ val,
+ old,
+ internal,
+ partition(),
+ true,
+ false,
+ updateCntr,
+ AffinityTopologyVersion.NONE);
}
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
@@ -1997,8 +2057,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (updateCntr != null)
updateCntr0 = updateCntr;
- cctx.continuousQueries().onEntryUpdated(key, evtVal, prevVal, isInternal()
- || !context().userCache(), partition(), primary, false, updateCntr0, topVer);
+ cctx.continuousQueries().onEntryUpdated(
+ key,
+ evtVal,
+ prevVal,
+ isInternal() || !context().userCache(),
+ partition(),
+ primary,
+ false,
+ updateCntr0,
+ topVer);
}
return new GridCacheUpdateAtomicResult(false,
@@ -2019,7 +2087,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
// Prepare old value and value bytes.
- oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+ oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
// Possibly read value from store.
boolean readFromStore = false;
@@ -2937,7 +3005,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @return {@code True} if values should be stored off-heap.
*/
- protected boolean isOffHeapValuesOnly() {
+ protected final boolean isOffHeapValuesOnly() {
return cctx.config().getMemoryMode() == CacheMemoryMode.OFFHEAP_VALUES;
}
@@ -3236,8 +3304,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
drReplicate(drType, val, ver);
if (!skipQryNtf) {
- cctx.continuousQueries().onEntryUpdated(key, val, null, this.isInternal()
- || !this.context().userCache(), this.partition(), true, preload, updateCntr, topVer);
+ cctx.continuousQueries().onEntryUpdated(
+ key,
+ val,
+ null,
+ this.isInternal() || !this.context().userCache(),
+ this.partition(),
+ true,
+ preload,
+ updateCntr,
+ topVer);
cctx.dataStructures().onEntryUpdated(key, false, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 0fef6f8..f091fc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -822,10 +822,10 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
Object val = unmarshal(valPtr, !tmp);
- if (val instanceof BinaryObjectOffheapImpl)
- return (BinaryObjectOffheapImpl)val;
+ if (val instanceof CacheObject)
+ return (CacheObject)val;
- return new CacheObjectImpl(val, null);
+ return toCacheObject(ctx.cacheObjectContext(), val, false);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6c7bac5..fec61df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -1992,6 +1993,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
+ boolean initLsnrs = false;
+ Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+ boolean internal = false;
+
// Avoid iterator creation.
for (int i = 0; i < keys.size(); i++) {
KeyCacheObject k = keys.get(i);
@@ -2006,6 +2011,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (entry == null)
continue;
+ if (!initLsnrs) {
+ internal = entry.isInternal() || !context().userCache();
+
+ lsnrs = ctx.continuousQueries().updateListeners(internal, false);
+
+ initLsnrs = true;
+ }
+
GridCacheVersion newConflictVer = req.conflictVersion(i);
long newConflictTtl = req.conflictTtl(i);
long newConflictExpireTime = req.conflictExpireTime(i);
@@ -2034,7 +2047,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.invokeArguments(),
primary && writeThrough() && !req.skipStore(),
!req.skipStore(),
- sndPrevVal || req.returnValue(),
+ lsnrs != null || sndPrevVal || req.returnValue(),
req.keepBinary(),
expiry,
true,
@@ -2061,6 +2074,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (dhtFut != null) {
+ dhtFut.listeners(lsnrs);
+
if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2097,10 +2112,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
"[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
}
}
- else if (!entry.isNear() && updRes.success()) {
- ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(), updRes.oldValue(),
- entry.isInternal() || !context().userCache(), entry.partition(), primary, false,
- updRes.updateCounter(), topVer);
+ else if (lsnrs != null && updRes.success()) {
+ ctx.continuousQueries().onEntryUpdated(
+ lsnrs,
+ entry.key(),
+ updRes.newValue(),
+ updRes.oldValue(),
+ internal,
+ entry.partition(),
+ primary,
+ false,
+ updRes.updateCounter(),
+ topVer);
}
if (hasNear) {
@@ -2275,6 +2298,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
+ boolean initLsnrs = false;
+ Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+
// Avoid iterator creation.
for (int i = 0; i < entries.size(); i++) {
GridDhtCacheEntry entry = entries.get(i);
@@ -2308,6 +2334,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
}
+ if (!initLsnrs) {
+ lsnrs = ctx.continuousQueries().updateListeners(
+ entry.isInternal() || !context().userCache(),
+ false);
+
+ initLsnrs = true;
+ }
+
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
node.id(),
@@ -2317,7 +2351,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*write-through*/false,
/*read-through*/false,
- /*retval*/sndPrevVal,
+ /*retval*/sndPrevVal || lsnrs != null,
req.keepBinary(),
expiry,
/*event*/true,
@@ -2366,6 +2400,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (dhtFut != null) {
+ dhtFut.listeners(lsnrs);
+
EntryProcessor<Object, Object, Object> entryProcessor =
entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
@@ -2763,6 +2799,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+ boolean initLsnrs = false;
+ Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+ boolean internal = false;
+
for (int i = 0; i < req.size(); i++) {
KeyCacheObject key = req.key(i);
@@ -2785,6 +2825,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
long ttl = req.ttl(i);
long expireTime = req.conflictExpireTime(i);
+ if (!initLsnrs) {
+ internal = entry.isInternal() || !context().userCache();
+
+ lsnrs = ctx.continuousQueries().updateListeners(internal, false);
+
+ initLsnrs = true;
+ }
+
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
nodeId,
@@ -2794,7 +2842,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op == TRANSFORM ? req.invokeArguments() : null,
/*write-through*/false,
/*read-through*/false,
- /*retval*/false,
+ /*retval*/lsnrs != null,
req.keepBinary(),
/*expiry policy*/null,
/*event*/true,
@@ -2817,10 +2865,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
- if (updRes.success() && !entry.isNear())
- ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(),
- updRes.oldValue(), entry.isInternal() || !context().userCache(), entry.partition(),
- false, false, updRes.updateCounter(), req.topologyVersion());
+ if (lsnrs != null && updRes.success()) {
+ ctx.continuousQueries().onEntryUpdated(
+ lsnrs,
+ entry.key(),
+ updRes.newValue(),
+ updRes.oldValue(),
+ internal,
+ entry.partition(),
+ false,
+ false,
+ updRes.updateCounter(),
+ req.topologyVersion());
+ }
entry.onUnlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 06c8441..58d704d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -102,6 +103,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** Response count. */
private volatile int resCnt;
+ /** */
+ private Map<UUID, CacheContinuousQueryListener> lsnrs;
+
/**
* @param cctx Cache context.
* @param completionCb Callback to invoke when future is completed.
@@ -136,6 +140,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
waitForExchange = !topLocked;
}
+ /**
+ * @param lsnrs Continuous query listeners.
+ */
+ void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) {
+ this.lsnrs = lsnrs;
+ }
+
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futVer.asGridUuid();
@@ -215,6 +226,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
* @param ttl TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
+ * @param addPrevVal If {@code true} sends previous value to backups.
+ * @param prevVal Previous value.
* @param updateCntr Partition update counter.
*/
public void addWriteEntry(GridDhtCacheEntry entry,
@@ -270,13 +283,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
addPrevVal,
entry.partition(),
prevVal,
- updateCntr);
+ updateCntr,
+ lsnrs != null);
}
- else if (dhtNodes.size() == 1) {
+ else if (lsnrs != null && dhtNodes.size() == 1) {
try {
- cctx.continuousQueries().onEntryUpdated(entry.key(), val, prevVal,
- entry.key().internal() || !cctx.userCache(), entry.partition(), true, false,
- updateCntr, updateReq.topologyVersion());
+ cctx.continuousQueries().onEntryUpdated(
+ lsnrs,
+ entry.key(),
+ val,
+ prevVal,
+ entry.key().internal() || !cctx.userCache(),
+ entry.partition(),
+ true,
+ false,
+ updateCntr,
+ updateReq.topologyVersion());
}
catch (IgniteCheckedException e) {
U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal="
@@ -352,7 +374,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
cctx.mvcc().removeAtomicFuture(version());
if (err != null) {
- if (!mappings.isEmpty()) {
+ if (!mappings.isEmpty() && lsnrs != null) {
Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
@@ -362,7 +384,11 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
if (!hndKeys.contains(key)) {
updateRes.addFailedKey(key, err);
- cctx.continuousQueries().skipUpdateEvent(key, req.partitionId(i), req.updateCounter(i),
+ cctx.continuousQueries().skipUpdateEvent(
+ lsnrs,
+ key,
+ req.partitionId(i),
+ req.updateCounter(i),
updateReq.topologyVersion());
hndKeys.add(key);
@@ -378,27 +404,38 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
updateRes.addFailedKey(key, err);
}
else {
- Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
+ if (lsnrs != null) {
+ Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
- exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
- for (int i = 0; i < req.size(); i++) {
- KeyCacheObject key = req.key(i);
+ exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+ for (int i = 0; i < req.size(); i++) {
+ KeyCacheObject key = req.key(i);
- if (!hndKeys.contains(key)) {
- try {
- cctx.continuousQueries().onEntryUpdated(key, req.value(i), req.localPreviousValue(i),
- key.internal() || !cctx.userCache(), req.partitionId(i), true, false,
- req.updateCounter(i), updateReq.topologyVersion());
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send continuous query message. [key=" + key + ", newVal="
- + req.value(i) + ", err=" + e + "]");
- }
+ if (!hndKeys.contains(key)) {
+ try {
+ cctx.continuousQueries().onEntryUpdated(
+ lsnrs,
+ key,
+ req.value(i),
+ req.localPreviousValue(i),
+ key.internal() || !cctx.userCache(),
+ req.partitionId(i),
+ true,
+ false,
+ req.updateCounter(i),
+ updateReq.topologyVersion());
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Failed to send continuous query message. [key=" + key +
+ ", newVal=" + req.value(i) +
+ ", err=" + e + "]");
+ }
- hndKeys.add(key);
+ hndKeys.add(key);
- if (hndKeys.size() == keys.size())
- break exit;
+ if (hndKeys.size() == keys.size())
+ break exit;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7cc276f..e417cdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -49,6 +50,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Lite dht cache backup update request.
*/
+@IgniteCodeGeneratingFail // Need add 'cleanup' call in 'writeTo' method.
public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
/** */
private static final long serialVersionUID = 0L;
@@ -215,7 +217,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
keys = new ArrayList<>();
partIds = new ArrayList<>();
- locPrevVals = new ArrayList<>();
if (forceTransformBackups) {
entryProcessors = new ArrayList<>();
@@ -240,7 +241,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
* @param addPrevVal If {@code true} adds previous value.
+ * @param partId Partition.
* @param prevVal Previous value.
+ * @param updateCntr Update counter.
+ * @param storeLocPrevVal If {@code true} stores previous value.
*/
public void addWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
@@ -251,12 +255,18 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
boolean addPrevVal,
int partId,
@Nullable CacheObject prevVal,
- @Nullable Long updateIdx) {
+ @Nullable Long updateCntr,
+ boolean storeLocPrevVal) {
keys.add(key);
partIds.add(partId);
- locPrevVals.add(prevVal);
+ if (storeLocPrevVal) {
+ if (locPrevVals == null)
+ locPrevVals = new ArrayList<>();
+
+ locPrevVals.add(prevVal);
+ }
if (forceTransformBackups) {
assert entryProcessor != null;
@@ -273,11 +283,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
prevVals.add(prevVal);
}
- if (updateIdx != null) {
+ if (updateCntr != null) {
if (updateCntrs == null)
updateCntrs = new GridLongList();
- updateCntrs.add(updateIdx);
+ updateCntrs.add(updateCntr);
}
// In case there is no conflict, do not create the list.
@@ -521,6 +531,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
* @return Value.
*/
@Nullable public CacheObject localPreviousValue(int idx) {
+ assert locPrevVals != null;
+
return locPrevVals.get(idx);
}
@@ -849,6 +861,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
}
+ cleanup();
+
return true;
}
@@ -1048,6 +1062,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class);
}
+ /**
+ * Cleanup values not needed after message was sent.
+ */
+ private void cleanup() {
+ nearVals = null;
+ prevVals = null;
+
+ // Do not keep values if they are not needed for continuous query notification.
+ if (locPrevVals == null) {
+ vals = null;
+ locPrevVals = null;
+ }
+ }
+
/** {@inheritDoc} */
@Override public byte directType() {
return 38;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 8f0cab7..0d8f795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1107,7 +1107,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
next = null;
while (it.hasNext()) {
- final LazySwapEntry e = new LazySwapEntry(it.next(), keepBinary);
+ final LazySwapEntry e = new LazySwapEntry(it.next());
if (filter != null) {
K key = (K)cctx.unwrapBinaryIfNeeded(e.key(), keepBinary);
@@ -2524,15 +2524,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/** */
private final Map.Entry<byte[], byte[]> e;
- /** */
- private boolean keepBinary;
-
/**
* @param e Entry with
*/
- LazySwapEntry(Map.Entry<byte[], byte[]> e, boolean keepBinary) {
+ LazySwapEntry(Map.Entry<byte[], byte[]> e) {
this.e = e;
- this.keepBinary = keepBinary;
}
/** {@inheritDoc} */
@@ -2545,9 +2541,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
@Override protected V unmarshalValue() throws IgniteCheckedException {
IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue());
- CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
-
- return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(obj, keepBinary);
+ return (V)cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
}
/** {@inheritDoc} */
@@ -2597,13 +2591,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
@Override protected V unmarshalValue() throws IgniteCheckedException {
long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2());
- CacheObject obj = cctx.fromOffheap(ptr, false);
-
- V val = CU.value(obj, cctx, false);
-
- assert val != null;
-
- return val;
+ return (V)cctx.fromOffheap(ptr, false);
}
/** {@inheritDoc} */
@@ -2661,7 +2649,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (!filter.apply(key, val))
return null;
- return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value()));
+ if (key instanceof CacheObject)
+ ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext());
+
+ val = (V)cctx.unwrapTemporary(e.value());
+
+ if (val instanceof CacheObject)
+ ((CacheObject)val).prepareMarshal(cctx.cacheObjectContext());
+
+ return new IgniteBiTuple<>(e.key(), val);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 7e66ad3..cf9b439 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -882,8 +882,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @return Continuous query entry.
*/
private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
- if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) {
-
+ if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1L) {
e.markFiltered();
return e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 86abbef..dce04de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
/**
* Continuous query listener.
*/
-interface CacheContinuousQueryListener<K, V> {
+public interface CacheContinuousQueryListener<K, V> {
/**
* Query execution callback.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 0e4cb40..cc59989 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import static javax.cache.event.EventType.CREATED;
@@ -155,37 +156,102 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * @param lsnrs Listeners to notify.
+ * @param key Entry key.
* @param partId Partition id.
* @param updCntr Updated counter.
* @param topVer Topology version.
*/
- public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
- if (lsnrCnt.get() > 0) {
- for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
- CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
- cctx.cacheId(),
- UPDATED,
- key,
- null,
- null,
- lsnr.keepBinary(),
- partId,
- updCntr,
- topVer);
+ public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
+ KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
+ assert lsnrs != null;
- CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
- cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+ for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
+ CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+ cctx.cacheId(),
+ UPDATED,
+ key,
+ null,
+ null,
+ lsnr.keepBinary(),
+ partId,
+ updCntr,
+ topVer);
- lsnr.skipUpdateEvent(evt, topVer);
- }
+ CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
+ cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+
+ lsnr.skipUpdateEvent(evt, topVer);
}
}
/**
+ * @param internal Internal entry flag (internal key or not user cache).
+ * @param preload Whether update happened during preloading.
+ * @return Registered listeners.
+ */
+ @Nullable public Map<UUID, CacheContinuousQueryListener> updateListeners(
+ boolean internal,
+ boolean preload) {
+ if (preload && !internal)
+ return null;
+
+ ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol;
+
+ if (internal)
+ lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
+ else
+ lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
+
+ return F.isEmpty(lsnrCol) ? null : lsnrCol;
+ }
+
+ /**
+ * @param key Key.
+ * @param newVal New value.
+ * @param oldVal Old value.
+ * @param internal Internal entry (internal key or not user cache).
+ * @param partId Partition.
+ * @param primary {@code True} if called on primary node.
+ * @param preload Whether update happened during preloading.
+ * @param updateCntr Update counter.
+ * @param topVer Topology version.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void onEntryUpdated(
+ KeyCacheObject key,
+ CacheObject newVal,
+ CacheObject oldVal,
+ boolean internal,
+ int partId,
+ boolean primary,
+ boolean preload,
+ long updateCntr,
+ AffinityTopologyVersion topVer) throws IgniteCheckedException {
+ Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload);
+
+ if (lsnrCol != null) {
+ onEntryUpdated(
+ lsnrCol,
+ key,
+ newVal,
+ oldVal,
+ internal,
+ partId,
+ primary,
+ preload,
+ updateCntr,
+ topVer);
+ }
+ }
+
+ /**
+ * @param lsnrCol Listeners to notify.
* @param key Key.
* @param newVal New value.
* @param oldVal Old value.
* @param internal Internal entry (internal key or not user cache),
+ * @param partId Partition.
* @param primary {@code True} if called on primary node.
* @param preload Whether update happened during preloading.
* @param updateCntr Update counter.
@@ -193,6 +259,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @throws IgniteCheckedException In case of error.
*/
public void onEntryUpdated(
+ Map<UUID, CacheContinuousQueryListener> lsnrCol,
KeyCacheObject key,
CacheObject newVal,
CacheObject oldVal,
@@ -205,25 +272,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
throws IgniteCheckedException
{
assert key != null;
-
- if (preload && !internal)
- return;
-
- ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol;
-
- if (internal)
- lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
- else
- lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
-
- if (F.isEmpty(lsnrCol))
- return;
+ assert lsnrCol != null;
boolean hasNewVal = newVal != null;
boolean hasOldVal = oldVal != null;
- if (!hasNewVal && !hasOldVal)
+ if (!hasNewVal && !hasOldVal) {
+ skipUpdateEvent(lsnrCol, key, partId, updateCntr, topVer);
+
return;
+ }
EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 7c7e3e3..0218897 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -893,11 +894,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
// Load partition counters.
- if (hnd.isQuery() && ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName()) != null) {
- Map<Integer, Long> cntrs = ctx.cache().internalCache(hnd.cacheName())
- .context().topology().updateCounters();
+ if (hnd.isQuery()) {
+ GridCacheProcessor proc = ctx.cache();
- req.addUpdateCounters(cntrs);
+ if (proc != null) {
+ GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
+
+ if (cache != null && !cache.isLocal()) {
+ Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
+
+ req.addUpdateCounters(cntrs);
+ }
+ }
}
if (err != null)