You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/11/04 15:10:47 UTC
[04/36] ignite git commit: IGNITE-426 WIP
IGNITE-426 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/76b9ed66
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/76b9ed66
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/76b9ed66
Branch: refs/heads/ignite-462-2
Commit: 76b9ed666a0f3c5a23dd8da2b69c14ef33943038
Parents: 3a57d71
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Sep 11 18:57:59 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Nov 4 17:02:31 2015 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 21 +-
.../internal/GridMessageListenHandler.java | 17 +
.../communication/GridIoMessageFactory.java | 18 +
.../processors/cache/GridCacheEntryEx.java | 4 +-
.../processors/cache/GridCacheMapEntry.java | 142 ++++++--
.../GridCachePartitionExchangeManager.java | 4 +-
.../cache/GridCacheUpdateAtomicResult.java | 15 +-
.../dht/atomic/GridDhtAtomicCache.java | 83 +++--
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 22 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 124 +++++--
.../GridDhtPartitionsExchangeFuture.java | 35 +-
.../preloader/GridDhtPartitionsFullMessage.java | 57 ++-
.../GridDhtPartitionsSingleMessage.java | 49 ++-
.../distributed/near/GridNearAtomicCache.java | 14 +-
.../continuous/CacheContinuousQueryEntry.java | 7 +
.../CacheContinuousQueryFilteredEntry.java | 228 ++++++++++++
.../continuous/CacheContinuousQueryHandler.java | 294 ++++++++++++---
.../CacheContinuousQueryListener.java | 8 +
.../CacheContinuousQueryLostPartition.java | 156 ++++++++
.../continuous/CacheContinuousQueryManager.java | 22 +-
.../continuous/GridContinuousBatch.java | 39 +-
.../continuous/GridContinuousBatchAdapter.java | 43 ++-
.../continuous/GridContinuousHandler.java | 22 ++
.../continuous/GridContinuousProcessor.java | 5 +
.../processors/cache/GridCacheTestEntryEx.java | 4 +-
...acheContinuousQueryFailoverAbstractTest.java | 357 ++++++++++++++++++-
.../testframework/junits/GridAbstractTest.java | 2 +-
27 files changed, 1612 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index b4ce4ab..ade7597 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -38,6 +38,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.util.typedef.F;
@@ -213,8 +215,8 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
}
- ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false,
- false);
+ ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
+ false, false);
}
catch (ClusterTopologyCheckedException ignored) {
// No-op.
@@ -377,6 +379,21 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public GridContinuousBatch createBatch() {
+ return new GridContinuousBatchAdapter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void partitionLost(String cacheName, int partId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public Object orderedTopic() {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index ff38949..e038794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -26,6 +26,8 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -167,6 +169,21 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public GridContinuousBatch createBatch() {
+ return new GridContinuousBatchAdapter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void partitionLost(String cacheName, int partId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public Object orderedTopic() {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 079015c..6eb9e17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -90,7 +90,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlo
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilteredEntry;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartition;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.TxEntryValueHolder;
@@ -684,6 +687,21 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 114:
+ msg = new CacheContinuousQueryBatchAck();
+
+ break;
+
+ case 115:
+ msg = new CacheContinuousQueryFilteredEntry();
+
+ break;
+
+ case 116:
+ msg = new CacheContinuousQueryLostPartition();
+
+ break;
+
// [-3..112] - this
// [120..123] - DR
// [-4..-22] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 50b01c8..eb40d20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -480,7 +480,9 @@ public interface GridCacheEntryEx {
boolean conflictResolve,
boolean intercept,
@Nullable UUID subjId,
- String taskName
+ String taskName,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateIdx
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 2111594..d8fa93c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
@@ -1076,6 +1077,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Object key0 = null;
Object val0 = null;
+ long updateIdx0;
+
synchronized (this) {
checkObsolete();
@@ -1153,6 +1156,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
deletedUnlocked(false);
}
+ updateIdx0 = nextPartIndex(topVer);
+
update(val, expireTime, ttl, newVer);
drReplicate(drType, val, newVer);
@@ -1178,8 +1183,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
subjId, null, taskName);
}
- if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
- cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
+ if (!isNear())
+ cctx.continuousQueries().onEntryUpdated(this, key, val, old, tx.local(), false, updateIdx0, topVer);
cctx.dataStructures().onEntryUpdated(key, false);
}
@@ -1244,6 +1249,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Cache.Entry entry0 = null;
+ Long updateIdx0;
+
synchronized (this) {
checkObsolete();
@@ -1256,7 +1263,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
- "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
+ "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
boolean startVer = isStartVersion();
@@ -1313,6 +1320,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
+ updateIdx0 = nextPartIndex(topVer);
+
+// if (updateIdx != null)
+// updateIdx0 = updateIdx;
+
drReplicate(drType, null, newVer);
if (metrics && cctx.cache().configuration().isStatisticsEnabled())
@@ -1345,8 +1357,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
taskName);
}
- if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
- cctx.continuousQueries().onEntryUpdated(this, key, null, old, false);
+ if (!isNear())
+ cctx.continuousQueries().onEntryUpdated(this, key, null, old, tx.local(), false, updateIdx0, topVer);
cctx.dataStructures().onEntryUpdated(key, true);
}
@@ -1688,7 +1700,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (res)
updateMetrics(op, metrics);
- cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
+ if (!isNear()) {
+ long updateIdx = nextPartIndex(AffinityTopologyVersion.NONE);
+
+ cctx.continuousQueries().onEntryUpdated(this, key, val, old, true, false, updateIdx,
+ AffinityTopologyVersion.NONE);
+ }
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
@@ -1731,7 +1748,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean conflictResolve,
boolean intercept,
@Nullable UUID subjId,
- String taskName
+ String taskName,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateIdx
) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
assert cctx.atomic();
@@ -1740,7 +1759,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject oldVal;
CacheObject updated;
- GridCacheVersion enqueueVer = null;
+ GridCacheVersion rmvVer = null;
GridCacheVersionConflictContext<?, ?> conflictCtx = null;
@@ -1757,6 +1776,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Object key0 = null;
Object updated0 = null;
+ Long updateIdx0 = null;
+
synchronized (this) {
boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
@@ -1864,7 +1885,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateIdx0);
}
// Will update something.
else {
@@ -1913,6 +1935,38 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
"[entry=" + this + ", newVer=" + newVer + ']');
}
+ if (!cctx.isNear()) {
+ CacheObject evtVal;
+
+ if (op == GridCacheOperation.TRANSFORM) {
+ EntryProcessor<Object, Object, ?> entryProcessor =
+ (EntryProcessor<Object, Object, ?>)writeObj;
+
+ CacheInvokeEntry<Object, Object> entry =
+ new CacheInvokeEntry<>(cctx, key, prevVal, version());
+
+ try {
+ entryProcessor.process(entry, invokeArgs);
+
+ evtVal = entry.modified() ?
+ cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
+ }
+ catch (Exception e) {
+ evtVal = prevVal;
+ }
+ }
+ else
+ evtVal = (CacheObject)writeObj;
+
+ updateIdx0 = nextPartIndex(topVer);
+
+ if (updateIdx != null)
+ updateIdx0 = updateIdx;
+
+ cctx.continuousQueries().onEntryUpdated(this, key, evtVal, prevVal, primary, false,
+ updateIdx0, topVer);
+ }
+
return new GridCacheUpdateAtomicResult(false,
retval ? rawGetOrUnmarshalUnlocked(false) : null,
null,
@@ -1921,7 +1975,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateIdx0);
}
}
else
@@ -1997,7 +2052,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateIdx0);
}
}
@@ -2044,7 +2100,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateIdx0);
}
}
else
@@ -2144,7 +2201,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateIdx0);
else if (interceptorVal != updated0) {
updated0 = cctx.unwrapTemporary(interceptorVal);
@@ -2181,6 +2239,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
update(updated, newExpireTime, newTtl, newVer);
+ updateIdx0 = nextPartIndex(topVer);
+
+ if (updateIdx != null)
+ updateIdx0 = updateIdx;
+
drReplicate(drType, updated, newVer);
recordNodeId(affNodeId, topVer);
@@ -2220,7 +2283,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateIdx0);
}
if (writeThrough)
@@ -2252,7 +2316,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
- enqueueVer = newVer;
+ rmvVer = newVer;
boolean hasValPtr = hasOffHeapPointer();
@@ -2272,6 +2336,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
recordNodeId(affNodeId, topVer);
+ updateIdx0 = nextPartIndex(topVer);
+
+ if (updateIdx != null)
+ updateIdx0 = updateIdx;
+
drReplicate(drType, null, newVer);
if (evt) {
@@ -2301,8 +2370,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (res)
updateMetrics(op, metrics);
- if (cctx.isReplicated() || primary)
- cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false);
+ if (!isNear())
+ cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, primary, false, updateIdx0, topVer);
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
@@ -2326,9 +2395,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
invokeRes,
newSysTtl,
newSysExpireTime,
- enqueueVer,
+ rmvVer,
conflictCtx,
- true);
+ true,
+ updateIdx0);
}
/**
@@ -3149,9 +3219,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
drReplicate(drType, val, ver);
+ long updateIdx = -1;
+
+ if (!preload)
+ updateIdx = nextPartIndex(topVer);
+
if (!skipQryNtf) {
- if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer))
- cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload);
+ cctx.continuousQueries().onEntryUpdated(this, key, val, null, true, preload, updateIdx, topVer);
cctx.dataStructures().onEntryUpdated(key, false);
}
@@ -3168,6 +3242,28 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
+ /**
+ * @param topVer Topology version.
+ * @return Update index.
+ */
+ private long nextPartIndex(AffinityTopologyVersion topVer) {
+ long updateIdx;
+
+ //U.dumpStack();
+
+ if (!cctx.isLocal() && !isNear()) {
+ GridDhtLocalPartition locPart = cctx.topology().localPartition(partition(), topVer, false);
+
+ assert locPart != null;
+
+ updateIdx = locPart.nextContinuousQueryUpdateIndex();
+ }
+ else
+ updateIdx = 0;
+
+ return updateIdx;
+ }
+
/** {@inheritDoc} */
@Override public synchronized boolean initialValue(KeyCacheObject key, GridCacheSwapEntry unswapped) throws
IgniteCheckedException,
@@ -4053,7 +4149,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
*/
protected void deletedUnlocked(boolean deleted) {
assert Thread.holdsLock(this);
- assert cctx.deferredDelete();
+
+ if (!cctx.deferredDelete())
+ return;
if (deleted) {
assert !deletedUnlocked() : this;
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index adc2174..0065403 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -892,7 +892,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = cacheCtx.topology();
if (top != null)
- updated |= top.update(null, entry.getValue()) != null;
+ updated |= top.update(null, entry.getValue(), null) != null;
}
if (!cctx.kernalContext().clientNode() && updated)
@@ -935,7 +935,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = cacheCtx.topology();
if (top != null)
- updated |= top.update(null, entry.getValue()) != null;
+ updated |= top.update(null, entry.getValue(), null) != null;
}
if (updated)
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 3674284..092d990 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -57,6 +57,9 @@ public class GridCacheUpdateAtomicResult {
/** Whether update should be propagated to DHT node. */
private final boolean sndToDht;
+ /** */
+ private final Long updateIdx;
+
/** Value computed by entry processor. */
private IgniteBiTuple<Object, Exception> res;
@@ -72,6 +75,7 @@ public class GridCacheUpdateAtomicResult {
* @param rmvVer Version for deferred delete.
* @param conflictRes DR resolution result.
* @param sndToDht Whether update should be propagated to DHT node.
+ * @param updateIdx Partition update counter.
*/
public GridCacheUpdateAtomicResult(boolean success,
@Nullable CacheObject oldVal,
@@ -81,7 +85,8 @@ public class GridCacheUpdateAtomicResult {
long conflictExpireTime,
@Nullable GridCacheVersion rmvVer,
@Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
- boolean sndToDht) {
+ boolean sndToDht,
+ long updateIdx) {
this.success = success;
this.oldVal = oldVal;
this.newVal = newVal;
@@ -91,6 +96,7 @@ public class GridCacheUpdateAtomicResult {
this.rmvVer = rmvVer;
this.conflictRes = conflictRes;
this.sndToDht = sndToDht;
+ this.updateIdx = updateIdx;
}
/**
@@ -129,6 +135,13 @@ public class GridCacheUpdateAtomicResult {
}
/**
+ * @return Partition update index.
+ */
+ public Long updateIdx() {
+ return updateIdx;
+ }
+
+ /**
* @return Explicit conflict expire time (if any). Set only if it is necessary to propagate concrete expire time
* value to DHT node. Otherwise set to {@link GridCacheUtils#EXPIRE_TIME_CALCULATE}.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4cd9e84..8eabae1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
@@ -1103,7 +1104,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
try {
- topology().readLock();
+ GridDhtPartitionTopology top = topology();
+
+ top.readLock();
try {
if (topology().stopping()) {
@@ -1120,7 +1123,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Also do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
- !needRemap(req.topologyVersion(), topology().topologyVersion())) {
+ !needRemap(req.topologyVersion(), top.topologyVersion())) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
@@ -1135,7 +1138,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (ver == null) {
// Assign next version for update inside entries lock.
- ver = ctx.versions().next(topology().topologyVersion());
+ ver = ctx.versions().next(top.topologyVersion());
if (hasNear)
res.nearVersion(ver);
@@ -1147,6 +1150,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
log.debug("Using cache version for update request on primary node [ver=" + ver +
", req=" + req + ']');
+ boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
+
dhtFut = createDhtFuture(ver, req, res, completionCb, false);
expiry = expiryPolicy(req.expiry());
@@ -1169,7 +1174,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb,
ctx.isDrEnabled(),
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
deleted = updRes.deleted();
dhtFut = updRes.dhtFuture();
@@ -1188,7 +1194,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb,
ctx.isDrEnabled(),
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
retVal = updRes.returnValue();
deleted = updRes.deleted();
@@ -1208,7 +1215,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
remap = true;
}
finally {
- topology().readUnlock();
+ top.readUnlock();
}
}
catch (GridCacheEntryRemovedException e) {
@@ -1283,6 +1290,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param replicate Whether replication is enabled.
* @param taskName Task name.
* @param expiry Expiry policy.
+ * @param sndPrevVal If {@code true} sends previous value to backups.
* @return Deleted entries.
* @throws GridCacheEntryRemovedException Should not be thrown.
*/
@@ -1298,7 +1306,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
- @Nullable IgniteCacheExpiryPolicy expiry
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean sndPrevVal
) throws GridCacheEntryRemovedException {
assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
@@ -1445,7 +1454,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
replicate,
updRes,
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
firstEntryIdx = i;
@@ -1493,7 +1503,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
replicate,
updRes,
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
firstEntryIdx = i;
@@ -1612,7 +1623,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
replicate,
updRes,
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
}
else
assert filtered.isEmpty();
@@ -1689,6 +1701,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param replicate Whether DR is enabled for that cache.
* @param taskName Task name.
* @param expiry Expiry policy.
+ * @param sndPrevVal If {@code true} sends previous value to backups.
* @return Return value.
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
@@ -1703,7 +1716,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
- @Nullable IgniteCacheExpiryPolicy expiry
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean sndPrevVal
) throws GridCacheEntryRemovedException {
GridCacheReturn retVal = null;
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -1760,7 +1774,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.invokeArguments(),
primary && writeThrough() && !req.skipStore(),
!req.skipStore(),
- req.returnValue(),
+ sndPrevVal || req.returnValue(),
expiry,
true,
true,
@@ -1775,7 +1789,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
intercept,
req.subjectId(),
- taskName);
+ taskName,
+ null,
+ null);
if (dhtFut == null && !F.isEmpty(filteredReaders)) {
dhtFut = createDhtFuture(ver, req, res, completionCb, true);
@@ -1792,22 +1808,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else if (conflictCtx.isMerge())
newConflictVer = null; // Conflict version is discarded in case of merge.
- EntryProcessor<Object, Object, Object> entryProcessor = null;
-
if (!readersOnly) {
dhtFut.addWriteEntry(entry,
updRes.newValue(),
- entryProcessor,
+ op == TRANSFORM ? req.entryProcessor(i) : null,
updRes.newTtl(),
updRes.conflictExpireTime(),
- newConflictVer);
+ newConflictVer,
+ sndPrevVal,
+ updRes.oldValue(),
+ updRes.updateIdx());
}
if (!F.isEmpty(filteredReaders))
dhtFut.addNearWriteEntries(filteredReaders,
entry,
updRes.newValue(),
- entryProcessor,
updRes.newTtl(),
updRes.conflictExpireTime());
}
@@ -1907,6 +1923,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param batchRes Batch update result.
* @param taskName Task name.
* @param expiry Expiry policy.
+ * @param sndPrevVal If {@code true} sends previous value to backups.
* @return Deleted entries.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
@@ -1927,7 +1944,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean replicate,
UpdateBatchResult batchRes,
String taskName,
- @Nullable IgniteCacheExpiryPolicy expiry
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean sndPrevVal
) {
assert putMap == null ^ rmvKeys == null;
@@ -2029,7 +2047,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*write-through*/false,
/*read-through*/false,
- /*retval*/false,
+ /*retval*/sndPrevVal,
expiry,
/*event*/true,
/*metrics*/true,
@@ -2044,7 +2062,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/*conflict resolve*/false,
/*intercept*/false,
req.subjectId(),
- taskName);
+ taskName,
+ null,
+ null);
assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null :
"success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry;
@@ -2074,22 +2094,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (dhtFut != null) {
- EntryProcessor<Object, Object, Object> entryProcessor =
- entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
-
if (!batchRes.readersOnly())
dhtFut.addWriteEntry(entry,
writeVal,
- entryProcessor,
+ entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()),
updRes.newTtl(),
CU.EXPIRE_TIME_CALCULATE,
- null);
+ null,
+ sndPrevVal,
+ updRes.oldValue(),
+ updRes.updateIdx());
if (!F.isEmpty(filteredReaders))
dhtFut.addNearWriteEntries(filteredReaders,
entry,
writeVal,
- entryProcessor,
updRes.newTtl(),
CU.EXPIRE_TIME_CALCULATE);
}
@@ -2493,7 +2512,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entry = entryExx(key);
CacheObject val = req.value(i);
+ CacheObject prevVal = req.previousValue(i);
EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+ Long updateIdx = req.updateIdx(i);
GridCacheOperation op = entryProcessor != null ? TRANSFORM :
(val != null) ? UPDATE : DELETE;
@@ -2515,7 +2536,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/*event*/true,
/*metrics*/true,
/*primary*/false,
- /*check version*/!req.forceTransformBackups(),
+ /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
req.topologyVersion(),
CU.empty0(),
replicate ? DR_BACKUP : DR_NONE,
@@ -2525,7 +2546,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
intercept,
req.subjectId(),
- taskName);
+ taskName,
+ prevVal,
+ updateIdx);
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
@@ -2567,7 +2590,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
catch (ClusterTopologyCheckedException ignored) {
U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " +
- req.nodeId());
+ nodeId);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId +
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4ace5c4..0d2f580 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -132,6 +133,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
waitForExchange = !topLocked;
+
+ // We can send entry processor instead of value to backup if updates are ordered.
+ forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM;
}
/** {@inheritDoc} */
@@ -198,16 +202,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
* @param ttl TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
+ * @param updateIdx Partition update index.
*/
public void addWriteEntry(GridDhtCacheEntry entry,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer) {
+ @Nullable GridCacheVersion conflictVer,
+ boolean addPrevVal,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateIdx) {
AffinityTopologyVersion topVer = updateReq.topologyVersion();
- Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
+ int part = entry.partition();
+
+ Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(part, topVer);
if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
@@ -244,7 +254,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
entryProcessor,
ttl,
conflictExpireTime,
- conflictVer);
+ conflictVer,
+ addPrevVal,
+ prevVal,
+ updateIdx);
}
}
}
@@ -253,14 +266,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
* @param readers Entry readers.
* @param entry Entry.
* @param val Value.
- * @param entryProcessor Entry processor..
* @param ttl TTL for near cache update (optional).
* @param expireTime Expire time for near cache update (optional).
*/
public void addNearWriteEntries(Iterable<UUID> readers,
GridDhtCacheEntry entry,
@Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long expireTime) {
CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
@@ -302,7 +313,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
updateReq.addNearWriteValue(entry.key(),
val,
- entryProcessor,
ttl,
expireTime);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index e55cac9..4d27bfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -78,6 +78,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
@GridDirectCollection(CacheObject.class)
private List<CacheObject> vals;
+ /** Previous values. */
+ @GridToStringInclude
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> prevVals;
+
/** Conflict versions. */
@GridDirectCollection(GridCacheVersion.class)
private List<GridCacheVersion> conflictVers;
@@ -139,6 +144,9 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** Task name hash. */
private int taskNameHash;
+ /** Partition. */
+ private GridLongList updateCntrs;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -212,13 +220,18 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
* @param ttl TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
+ * @param addPrevVal If {@code true} adds previous value.
+ * @param prevVal Previous value.
*/
public void addWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer) {
+ @Nullable GridCacheVersion conflictVer,
+ boolean addPrevVal,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateIdx) {
keys.add(key);
if (forceTransformBackups) {
@@ -229,6 +242,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
else
vals.add(val);
+ if (addPrevVal) {
+ if (prevVals == null)
+ prevVals = new ArrayList<>();
+
+ prevVals.add(prevVal);
+ }
+
+ if (updateIdx != null) {
+ if (updateCntrs == null)
+ updateCntrs = new GridLongList();
+
+ updateCntrs.add(updateIdx);
+ }
+
// In case there is no conflict, do not create the list.
if (conflictVer != null) {
if (conflictVers == null) {
@@ -271,36 +298,21 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/**
* @param key Key to add.
* @param val Value, {@code null} if should be removed.
- * @param entryProcessor Entry processor.
* @param ttl TTL.
* @param expireTime Expire time.
*/
public void addNearWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long expireTime)
{
if (nearKeys == null) {
nearKeys = new ArrayList<>();
-
- if (forceTransformBackups) {
- nearEntryProcessors = new ArrayList<>();
- nearEntryProcessorsBytes = new ArrayList<>();
- }
- else
- nearVals = new ArrayList<>();
+ nearVals = new ArrayList<>();
}
nearKeys.add(key);
-
- if (forceTransformBackups) {
- assert entryProcessor != null;
-
- nearEntryProcessors.add(entryProcessor);
- }
- else
- nearVals.add(val);
+ nearVals.add(val);
if (ttl >= 0) {
if (nearTtls == null) {
@@ -411,6 +423,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
}
/**
+ * @param idx Counter index.
+ * @return Update counter.
+ */
+ public Long updateIdx(int idx) {
+ if (idx < updateCntrs.size())
+ return updateCntrs.get(idx);
+
+ return null;
+ }
+
+ /**
* @param idx Near key index.
* @return Key.
*/
@@ -431,6 +454,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/**
* @param idx Key index.
+ * @return Value.
+ */
+ @Nullable public CacheObject previousValue(int idx) {
+ if (prevVals != null)
+ return prevVals.get(idx);
+
+ return null;
+ }
+
+ /**
+ * @param idx Key index.
* @return Entry processor.
*/
@Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
@@ -684,42 +718,54 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
writer.incrementState();
case 16:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeMessage("updateCntrs", updateCntrs))
return false;
writer.incrementState();
case 17:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 18:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 19:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 20:
- if (!writer.writeMessage("ttls", ttls))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 21:
- if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
case 22:
+ if (!writer.writeMessage("ttls", ttls))
+ return false;
+
+ writer.incrementState();
+
+ case 23:
+ if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -846,7 +892,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
case 16:
- subjId = reader.readUuid("subjId");
+ updateCntrs = reader.readMessage("updateCntrs");
if (!reader.isLastRead())
return false;
@@ -854,6 +900,22 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
case 17:
+ prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -865,7 +927,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 18:
+ case 20:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -873,7 +935,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 19:
+ case 21:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -881,7 +943,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 20:
+ case 22:
ttls = reader.readMessage("ttls");
if (!reader.isLastRead())
@@ -889,7 +951,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 21:
+ case 23:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -897,7 +959,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 22:
+ case 24:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -917,7 +979,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 23;
+ return 25;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 77e47a7..cb4bb4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -613,7 +613,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (updateTop) {
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
if (top.cacheId() == cacheCtx.cacheId()) {
- cacheCtx.topology().update(exchId, top.partitionMap(true));
+ cacheCtx.topology().update(exchId,
+ top.partitionMap(true),
+ top.updateCounters());
break;
}
@@ -811,6 +813,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
+ boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT;
+
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
@@ -821,6 +825,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (drCacheCtx.isDrEnabled())
drCacheCtx.dr().beforeExchange(topVer, exchId.isLeft());
+ if (topChanged)
+ cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
+
// Partition release future is done so we can flush the write-behind store.
cacheCtx.store().forceFlush();
@@ -954,14 +961,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @param id ID.
* @throws IgniteCheckedException If failed.
*/
- private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
+ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
+ throws IgniteCheckedException {
GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
clientOnlyExchange,
cctx.versions().last());
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal())
+ if (!cacheCtx.isLocal()) {
m.addLocalPartitionMap(cacheCtx.cacheId(), cacheCtx.topology().localPartitionMap());
+
+ m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
+ }
}
if (log.isDebugEnabled())
@@ -987,15 +998,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0;
- if (ready)
+ if (ready) {
m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+
+ m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
+ }
}
}
// It is important that client topologies be added after contexts.
- for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
+ for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
+ m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
+ }
+
if (log.isDebugEnabled())
log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
", exchId=" + exchId + ", msg=" + m + ']');
@@ -1332,15 +1349,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
Integer cacheId = entry.getKey();
+ Map<Integer, Long> cntrMap = msg.partitionUpdateCounters(cacheId);
+
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
if (cacheCtx != null)
- cacheCtx.topology().update(exchId, entry.getValue());
+ cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
else {
ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal())
- cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+ cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
}
}
}
@@ -1358,7 +1377,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
cctx.exchange().clientTopology(cacheId, this);
- top.update(exchId, entry.getValue());
+ top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index c06d773..758818d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Externalizable;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
@@ -48,6 +49,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** */
private byte[] partsBytes;
+ /** Partitions update counters. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Map<Integer, Map<Integer, Long>> partCntrs = new HashMap<>();
+
+ /** Serialized partitions counters. */
+ private byte[] partCntrsBytes;
+
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -92,13 +101,34 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
parts.put(cacheId, fullMap);
}
- /** {@inheritDoc}
- * @param ctx*/
+ /**
+ * @param cacheId Cache ID.
+ * @param cntrMap Partition update counters.
+ */
+ public void addPartitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) {
+ if (!partCntrs.containsKey(cacheId))
+ partCntrs.put(cacheId, cntrMap);
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @return Partition update counters.
+ */
+ public Map<Integer, Long> partitionUpdateCounters(int cacheId) {
+ Map<Integer, Long> res = partCntrs.get(cacheId);
+
+ return res != null ? res : Collections.<Integer, Long>emptyMap();
+ }
+
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
if (parts != null && partsBytes == null)
partsBytes = ctx.marshaller().marshal(parts);
+
+ if (partCntrs != null)
+ partCntrsBytes = ctx.marshaller().marshal(partCntrs);
}
/**
@@ -121,6 +151,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsBytes != null && parts == null)
parts = ctx.marshaller().unmarshal(partsBytes, ldr);
+
+ if (partCntrsBytes != null)
+ partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr);
}
/** {@inheritDoc} */
@@ -139,12 +172,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
switch (writer.state()) {
case 5:
- if (!writer.writeByteArray("partsBytes", partsBytes))
+ if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
return false;
writer.incrementState();
case 6:
+ if (!writer.writeByteArray("partsBytes", partsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -167,7 +206,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
switch (reader.state()) {
case 5:
- partsBytes = reader.readByteArray("partsBytes");
+ partCntrsBytes = reader.readByteArray("partCntrsBytes");
if (!reader.isLastRead())
return false;
@@ -175,6 +214,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 6:
+ partsBytes = reader.readByteArray("partsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -194,7 +241,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 83fbb1a..547c0f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Externalizable;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
@@ -46,6 +47,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** Serialized partitions. */
private byte[] partsBytes;
+ /** Partitions update counters. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Map<Integer, Map<Integer, Long>> partCntrs = new HashMap<>();
+
+ /** Serialized partitions counters. */
+ private byte[] partCntrsBytes;
+
/** */
private boolean client;
@@ -90,6 +99,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
}
/**
+ * @param cacheId Cache ID.
+ * @param cntrMap Partition update counters.
+ */
+ public void partitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) {
+ partCntrs.put(cacheId, cntrMap);
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @return Partition update counters.
+ */
+ public Map<Integer, Long> partitionUpdateCounters(int cacheId) {
+ Map<Integer, Long> res = partCntrs.get(cacheId);
+
+ return res != null ? res : Collections.<Integer, Long>emptyMap();
+ }
+
+ /**
* @return Local partitions.
*/
public Map<Integer, GridDhtPartitionMap> partitions() {
@@ -103,6 +130,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (partsBytes == null && parts != null)
partsBytes = ctx.marshaller().marshal(parts);
+
+ if (partCntrs != null)
+ partCntrsBytes = ctx.marshaller().marshal(partCntrs);
}
/** {@inheritDoc} */
@@ -111,6 +141,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (partsBytes != null && parts == null)
parts = ctx.marshaller().unmarshal(partsBytes, ldr);
+
+ if (partCntrsBytes != null)
+ partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr);
}
/** {@inheritDoc} */
@@ -135,6 +168,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
writer.incrementState();
case 6:
+ if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
@@ -165,6 +204,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
case 6:
+ partCntrsBytes = reader.readByteArray("partCntrsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())
@@ -184,7 +231,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 1bf03a9..f8bb8fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -249,7 +249,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
/*write-through*/false,
/*read-through*/false,
/*retval*/false,
- /**expiry policy*/null,
+ /*expiry policy*/null,
/*event*/true,
/*metrics*/true,
/*primary*/false,
@@ -263,7 +263,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
false,
false,
subjId,
- taskName);
+ taskName,
+ null,
+ null);
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
@@ -351,7 +353,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
/*event*/true,
/*metrics*/true,
/*primary*/false,
- /*check version*/!req.forceTransformBackups(),
+ /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
req.topologyVersion(),
CU.empty0(),
DR_NONE,
@@ -359,9 +361,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
expireTime,
null,
false,
- intercept,
+ /*intercept*/false,
req.subjectId(),
- taskName);
+ taskName,
+ null,
+ null);
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 9ea9b73..470aa09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -159,6 +159,13 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
}
/**
+ * @return Filtered entry.
+ */
+ boolean filtered() {
+ return false;
+ }
+
+ /**
* @param cctx Cache context.
* @throws IgniteCheckedException In case of error.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/76b9ed66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
new file mode 100644
index 0000000..14d8f51
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.nio.ByteBuffer;
+import javax.cache.event.EventType;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Continuous query entry.
+ */
+public class CacheContinuousQueryFilteredEntry extends CacheContinuousQueryEntry {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private EventType evtType;
+
+ /** Cache name. */
+ private int cacheId;
+
+ /** Partition. */
+ private int part;
+
+ /** Update index. */
+ private long updateIdx;
+
+ /** */
+ @GridToStringInclude
+ @GridDirectTransient
+ private AffinityTopologyVersion topVer;
+
+ /**
+ * Required by {@link Message}.
+ */
+ public CacheContinuousQueryFilteredEntry() {
+ // No-op.
+ }
+
+ /**
+ * @param e Cache continuous query entry.
+ */
+ CacheContinuousQueryFilteredEntry(CacheContinuousQueryEntry e) {
+ this.cacheId = e.cacheId();
+ this.evtType = e.eventType();
+ this.part = e.partition();
+ this.updateIdx = e.updateIndex();
+ this.topVer = e.topologyVersion();
+ }
+
+ /**
+ * @return Topology version if applicable.
+ */
+ @Nullable AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Cache ID.
+ */
+ int cacheId() {
+ return cacheId;
+ }
+
+ /**
+ * @return Event type.
+ */
+ EventType eventType() {
+ return evtType;
+ }
+
+ /**
+ * @return Partition.
+ */
+ int partition() {
+ return part;
+ }
+
+ /**
+ * @return Update index.
+ */
+ long updateIndex() {
+ return updateIdx;
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean filtered() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 115;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeInt("cacheId", cacheId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeByte("evtType", evtType != null ? (byte)evtType.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeLong("updateIdx", updateIdx))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cacheId = reader.readInt("cacheId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ byte evtTypeOrd;
+
+ evtTypeOrd = reader.readByte("evtType");
+
+ if (!reader.isLastRead())
+ return false;
+
+ evtType = CacheContinuousQueryEntry.eventTypeFromOrdinal(evtTypeOrd);
+
+ reader.incrementState();
+
+ case 2:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ updateIdx = reader.readLong("updateIdx");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(CacheContinuousQueryFilteredEntry.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryFilteredEntry.class, this);
+ }
+}
\ No newline at end of file