You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/11/20 02:49:30 UTC
[20/22] ignite git commit: IGNITE-426 Implemented failover for
Continuous query.
IGNITE-426 Implemented failover for Continuous query.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce636372
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce636372
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce636372
Branch: refs/heads/ignite-direct-marsh-opt
Commit: ce6363729f3553c6854ed2e8cfc3b5c244678fcd
Parents: 8728a5b
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Nov 19 19:50:58 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Nov 19 19:50:58 2015 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 22 +-
.../internal/GridMessageListenHandler.java | 18 +
.../communication/GridIoMessageFactory.java | 8 +-
.../processors/cache/GridCacheEntryEx.java | 12 +-
.../processors/cache/GridCacheMapEntry.java | 147 +-
.../GridCachePartitionExchangeManager.java | 4 +-
.../cache/GridCacheUpdateAtomicResult.java | 15 +-
.../cache/GridCacheUpdateTxResult.java | 24 +-
.../GridDistributedTxRemoteAdapter.java | 22 +-
.../dht/GridClientPartitionTopology.java | 38 +-
.../distributed/dht/GridDhtLocalPartition.java | 35 +
.../dht/GridDhtPartitionTopology.java | 26 +-
.../dht/GridDhtPartitionTopologyImpl.java | 112 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 14 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 112 +-
.../dht/atomic/GridDhtAtomicCache.java | 79 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 75 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 140 +-
.../GridDhtPartitionsExchangeFuture.java | 35 +-
.../preloader/GridDhtPartitionsFullMessage.java | 64 +-
.../GridDhtPartitionsSingleMessage.java | 56 +-
.../distributed/near/GridNearAtomicCache.java | 10 +-
.../distributed/near/GridNearTxRemote.java | 7 +
.../CacheContinuousQueryBatchAck.java | 163 ++
.../continuous/CacheContinuousQueryEntry.java | 196 +-
.../continuous/CacheContinuousQueryEvent.java | 3 +-
.../continuous/CacheContinuousQueryHandler.java | 811 ++++++-
.../CacheContinuousQueryListener.java | 35 +
.../continuous/CacheContinuousQueryManager.java | 151 +-
.../cache/transactions/IgniteTxEntry.java | 20 +
.../cache/transactions/IgniteTxHandler.java | 3 +
.../transactions/IgniteTxLocalAdapter.java | 18 +-
.../cache/transactions/IgniteTxRemoteEx.java | 7 +-
.../continuous/GridContinuousBatch.java | 44 +
.../continuous/GridContinuousBatchAdapter.java | 46 +
.../continuous/GridContinuousHandler.java | 22 +
.../continuous/GridContinuousProcessor.java | 221 +-
.../StartRoutineAckDiscoveryMessage.java | 14 +-
.../StartRoutineDiscoveryMessage.java | 21 +-
.../processors/cache/GridCacheTestEntryEx.java | 10 +-
...ContinuousQueryFailoverAbstractSelfTest.java | 2235 ++++++++++++++++++
...ryFailoverAtomicNearEnabledSelfSelfTest.java | 46 +
...FailoverAtomicPrimaryWriteOrderSelfTest.java | 44 +
...usQueryFailoverAtomicReplicatedSelfTest.java | 40 +
...inuousQueryFailoverTxReplicatedSelfTest.java | 32 +
.../CacheContinuousQueryFailoverTxSelfTest.java | 39 +
...ridCacheContinuousQueryAbstractSelfTest.java | 153 +-
.../GridCacheContinuousQueryTxSelfTest.java | 49 +
...CacheContinuousQueryClientReconnectTest.java | 187 ++
.../IgniteCacheContinuousQueryClientTest.java | 157 +-
...cheContinuousQueryClientTxReconnectTest.java | 32 +
.../p2p/GridP2PSameClassLoaderSelfTest.java | 16 +-
.../testframework/junits/GridAbstractTest.java | 2 +-
.../junits/common/GridCommonAbstractTest.java | 3 +
.../IgniteCacheQuerySelfTestSuite.java | 16 +-
.../yardstick/cache/CacheEntryEventProbe.java | 156 ++
56 files changed, 5753 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index b4ce4ab..3918976 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -23,6 +23,7 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.LinkedList;
+import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
@@ -38,6 +39,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.util.typedef.F;
@@ -127,6 +130,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public void updateCounters(Map<Integer, Long> cntrs) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
assert nodeId != null;
@@ -213,8 +221,8 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
}
- ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false,
- false);
+ ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
+ false, false);
}
catch (ClusterTopologyCheckedException ignored) {
// No-op.
@@ -377,6 +385,16 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public GridContinuousBatch createBatch() {
+ return new GridContinuousBatchAdapter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public Object orderedTopic() {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index ff38949..aa837b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -22,10 +22,13 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -100,6 +103,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public void updateCounters(Map<Integer, Long> cntrs) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException {
ctx.io().addUserMessageListener(topic, pred);
@@ -167,6 +175,16 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public GridContinuousBatch createBatch() {
+ return new GridContinuousBatchAdapter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public Object orderedTopic() {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3548aac..c671582 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlo
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -708,7 +709,12 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- // [-3..114] - this
+ case 118:
+ msg = new CacheContinuousQueryBatchAck();
+
+ break;
+
+ // [-3..118] - this
// [120..123] - DR
// [-4..-22] - SQL
default:
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index af62e39..8d50616 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -360,6 +360,7 @@ public interface GridCacheEntryEx {
* @param subjId Subject ID initiated this update.
* @param taskName Task name.
* @param dhtVer Dht version for near cache entry.
+ * @param updateCntr Update counter.
* @return Tuple containing success flag and old value. If success is {@code false},
* then value is {@code null}.
* @throws IgniteCheckedException If storing value failed.
@@ -382,7 +383,8 @@ public interface GridCacheEntryEx {
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updateCntr
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
@@ -417,7 +419,8 @@ public interface GridCacheEntryEx {
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updateCntr
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
@@ -446,6 +449,7 @@ public interface GridCacheEntryEx {
* @param intercept If {@code true} then calls cache interceptor.
* @param subjId Subject ID initiated this update.
* @param taskName Task name.
+ * @param updateCntr Update counter.
* @return Tuple where first value is flag showing whether operation succeeded,
* second value is old entry value if return value is requested, third is updated entry value,
* fourth is the version to enqueue for deferred delete the fifth is DR conflict context
@@ -478,7 +482,9 @@ public interface GridCacheEntryEx {
boolean conflictResolve,
boolean intercept,
@Nullable UUID subjId,
- String taskName
+ String taskName,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateCntr
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 0786a50..8d363ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
@@ -1060,7 +1061,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updateCntr
) throws IgniteCheckedException, GridCacheEntryRemovedException {
CacheObject old;
@@ -1077,6 +1079,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Object key0 = null;
Object val0 = null;
+ long updateCntr0;
+
synchronized (this) {
checkObsolete();
@@ -1155,6 +1159,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
deletedUnlocked(false);
}
+ updateCntr0 = nextPartCounter(topVer);
+
+ if (updateCntr != null && updateCntr != 0)
+ updateCntr0 = updateCntr;
+
update(val, expireTime, ttl, newVer);
drReplicate(drType, val, newVer);
@@ -1180,8 +1189,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
subjId, null, taskName);
}
- if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
- cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
+ if (cctx.isLocal() || cctx.isReplicated() ||
+ (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
+ cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
+ partition(), tx.local(), false, updateCntr0, topVer);
cctx.dataStructures().onEntryUpdated(key, false);
}
@@ -1197,7 +1208,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (intercept)
cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0));
- return valid ? new GridCacheUpdateTxResult(true, retval ? old : null) :
+ return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateCntr0) :
new GridCacheUpdateTxResult(false, null);
}
@@ -1223,7 +1234,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updateCntr
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert cctx.transactional();
@@ -1245,6 +1257,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Cache.Entry entry0 = null;
+ Long updateCntr0;
+
boolean deferred;
boolean marked = false;
@@ -1261,7 +1275,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
- "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
+ "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
boolean startVer = isStartVersion();
@@ -1318,6 +1332,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
+ updateCntr0 = nextPartCounter(topVer);
+
+ if (updateCntr != null && updateCntr != 0)
+ updateCntr0 = updateCntr;
+
drReplicate(drType, null, newVer);
if (metrics && cctx.cache().configuration().isStatisticsEnabled())
@@ -1350,8 +1369,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
taskName);
}
- if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
- cctx.continuousQueries().onEntryUpdated(this, key, null, old, false);
+ if (cctx.isLocal() || cctx.isReplicated() ||
+ (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
+ cctx.continuousQueries().onEntryUpdated(key, null, old, isInternal()
+ || !context().userCache(),partition(), tx.local(), false, updateCntr0, topVer);
cctx.dataStructures().onEntryUpdated(key, true);
@@ -1394,7 +1415,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else
ret = old;
- return new GridCacheUpdateTxResult(true, ret);
+ return new GridCacheUpdateTxResult(true, ret, updateCntr0);
}
else
return new GridCacheUpdateTxResult(false, null);
@@ -1686,7 +1707,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (res)
updateMetrics(op, metrics);
- cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
+ if (!isNear()) {
+ long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE);
+
+ cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
+ partition(), true, false, updateCntr, AffinityTopologyVersion.NONE);
+ }
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
@@ -1729,7 +1755,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean conflictResolve,
boolean intercept,
@Nullable UUID subjId,
- String taskName
+ String taskName,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateCntr
) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
assert cctx.atomic();
@@ -1755,6 +1783,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Object key0 = null;
Object updated0 = null;
+ Long updateCntr0 = null;
+
synchronized (this) {
boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
@@ -1862,7 +1892,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateCntr0 == null ? 0 : updateCntr0);
}
// Will update something.
else {
@@ -1911,6 +1942,38 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
"[entry=" + this + ", newVer=" + newVer + ']');
}
+ if (!cctx.isNear()) {
+ CacheObject evtVal;
+
+ if (op == GridCacheOperation.TRANSFORM) {
+ EntryProcessor<Object, Object, ?> entryProcessor =
+ (EntryProcessor<Object, Object, ?>)writeObj;
+
+ CacheInvokeEntry<Object, Object> entry =
+ new CacheInvokeEntry<>(cctx, key, prevVal, version());
+
+ try {
+ entryProcessor.process(entry, invokeArgs);
+
+ evtVal = entry.modified() ?
+ cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
+ }
+ catch (Exception e) {
+ evtVal = prevVal;
+ }
+ }
+ else
+ evtVal = (CacheObject)writeObj;
+
+ updateCntr0 = nextPartCounter(topVer);
+
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
+
+ cctx.continuousQueries().onEntryUpdated(key, evtVal, prevVal, isInternal()
+ || !context().userCache(), partition(), primary, false, updateCntr0, topVer);
+ }
+
return new GridCacheUpdateAtomicResult(false,
retval ? rawGetOrUnmarshalUnlocked(false) : null,
null,
@@ -1919,7 +1982,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateCntr0 == null ? 0 : updateCntr0);
}
}
else
@@ -1995,7 +2059,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateCntr0 == null ? 0 : updateCntr0);
}
}
@@ -2042,7 +2107,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateCntr0 == null ? 0 : updateCntr);
}
}
else
@@ -2142,7 +2208,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateCntr0 == null ? 0 : updateCntr0);
else if (interceptorVal != updated0) {
updated0 = cctx.unwrapTemporary(interceptorVal);
@@ -2179,6 +2246,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
update(updated, newExpireTime, newTtl, newVer);
+ updateCntr0 = nextPartCounter(topVer);
+
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
+
drReplicate(drType, updated, newVer);
recordNodeId(affNodeId, topVer);
@@ -2218,7 +2290,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CU.EXPIRE_TIME_ETERNAL,
null,
null,
- false);
+ false,
+ updateCntr0 == null ? 0 : updateCntr0);
}
if (writeThrough)
@@ -2270,6 +2343,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
recordNodeId(affNodeId, topVer);
+ updateCntr0 = nextPartCounter(topVer);
+
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
+
drReplicate(drType, null, newVer);
if (evt) {
@@ -2299,9 +2377,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (res)
updateMetrics(op, metrics);
- if (cctx.isReplicated() || primary)
- cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false);
-
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
if (intercept) {
@@ -2326,7 +2401,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
newSysExpireTime,
enqueueVer,
conflictCtx,
- true);
+ true,
+ updateCntr0);
}
/**
@@ -3146,11 +3222,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else if (deletedUnlocked())
deletedUnlocked(false);
+ long updateCntr = 0;
+
+ if (!preload)
+ updateCntr = nextPartCounter(topVer);
+
drReplicate(drType, val, ver);
if (!skipQryNtf) {
- if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer))
- cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload);
+ cctx.continuousQueries().onEntryUpdated(key, val, null, this.isInternal()
+ || !this.context().userCache(), this.partition(), true, preload, updateCntr, topVer);
cctx.dataStructures().onEntryUpdated(key, false);
}
@@ -3167,6 +3248,26 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
+ /**
+ * @param topVer Topology version.
+ * @return Update counter.
+ */
+ private long nextPartCounter(AffinityTopologyVersion topVer) {
+ long updateCntr;
+
+ if (!cctx.isLocal() && !isNear()) {
+ GridDhtLocalPartition locPart = cctx.topology().localPartition(partition(), topVer, false);
+
+ assert locPart != null;
+
+ updateCntr = locPart.nextUpdateCounter();
+ }
+ else
+ updateCntr = 0;
+
+ return updateCntr;
+ }
+
/** {@inheritDoc} */
@Override public synchronized boolean initialValue(KeyCacheObject key, GridCacheSwapEntry unswapped) throws
IgniteCheckedException,
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e19b310..cd89416 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -989,7 +989,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = cacheCtx.topology();
if (top != null)
- updated |= top.update(null, entry.getValue()) != null;
+ updated |= top.update(null, entry.getValue(), null) != null;
}
if (!cctx.kernalContext().clientNode() && updated)
@@ -1032,7 +1032,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = cacheCtx.topology();
if (top != null)
- updated |= top.update(null, entry.getValue()) != null;
+ updated |= top.update(null, entry.getValue(), null) != null;
}
if (updated)
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 3674284..9df476e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -57,6 +57,9 @@ public class GridCacheUpdateAtomicResult {
/** Whether update should be propagated to DHT node. */
private final boolean sndToDht;
+ /** */
+ private final Long updateCntr;
+
/** Value computed by entry processor. */
private IgniteBiTuple<Object, Exception> res;
@@ -72,6 +75,7 @@ public class GridCacheUpdateAtomicResult {
* @param rmvVer Version for deferred delete.
* @param conflictRes DR resolution result.
* @param sndToDht Whether update should be propagated to DHT node.
+ * @param updateCntr Partition update counter.
*/
public GridCacheUpdateAtomicResult(boolean success,
@Nullable CacheObject oldVal,
@@ -81,7 +85,8 @@ public class GridCacheUpdateAtomicResult {
long conflictExpireTime,
@Nullable GridCacheVersion rmvVer,
@Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
- boolean sndToDht) {
+ boolean sndToDht,
+ long updateCntr) {
this.success = success;
this.oldVal = oldVal;
this.newVal = newVal;
@@ -91,6 +96,7 @@ public class GridCacheUpdateAtomicResult {
this.rmvVer = rmvVer;
this.conflictRes = conflictRes;
this.sndToDht = sndToDht;
+ this.updateCntr = updateCntr;
}
/**
@@ -129,6 +135,13 @@ public class GridCacheUpdateAtomicResult {
}
/**
+ * @return Partition update index.
+ */
+ public Long updateCounter() {
+ return updateCntr;
+ }
+
+ /**
* @return Explicit conflict expire time (if any). Set only if it is necessary to propagate concrete expire time
* value to DHT node. Otherwise set to {@link GridCacheUtils#EXPIRE_TIME_CALCULATE}.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
index ffda7a2..461baa7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
@@ -32,6 +32,9 @@ public class GridCacheUpdateTxResult {
@GridToStringInclude
private final CacheObject oldVal;
+ /** Partition idx. */
+ private long updateCntr;
+
/**
* Constructor.
*
@@ -44,6 +47,25 @@ public class GridCacheUpdateTxResult {
}
/**
+ * Constructor.
+ *
+ * @param success Success flag.
+ * @param oldVal Old value (if any),
+ */
+ GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal, long updateCntr) {
+ this.success = success;
+ this.oldVal = oldVal;
+ this.updateCntr = updateCntr;
+ }
+
+ /**
+ * @return Partition idx.
+ */
+ public long updatePartitionCounter() {
+ return updateCntr;
+ }
+
+ /**
* @return Success flag.
*/
public boolean success() {
@@ -61,4 +83,4 @@ public class GridCacheUpdateTxResult {
@Override public String toString() {
return S.toString(GridCacheUpdateTxResult.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 0d49584..f9ac2df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -265,6 +265,19 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
}
+ /** {@inheritDoc} */
+ @Override public void setPartitionUpdateCounters(long[] cntrs) {
+ if (writeMap() != null && !writeMap().isEmpty() && cntrs != null && cntrs.length > 0) {
+ int i = 0;
+
+ for (IgniteTxEntry txEntry : writeMap().values()) {
+ txEntry.updateCounter(cntrs[i]);
+
+ ++i;
+ }
+ }
+ }
+
/**
* Adds completed versions to an entry.
*
@@ -529,7 +542,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
near() ? null : explicitVer,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ txEntry.updateCounter());
else {
cached.innerSet(this,
eventNodeId(),
@@ -547,7 +561,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
near() ? null : explicitVer,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ txEntry.updateCounter());
// Keep near entry up to date.
if (nearCached != null) {
@@ -575,7 +590,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
near() ? null : explicitVer,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ txEntry.updateCounter());
// Keep near entry up to date.
if (nearCached != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 162c116..b7169bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -94,6 +94,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** Lock. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ /** Partition update counters. */
+ private Map<Integer, Long> cntrMap = new HashMap<>();
+
/**
* @param cctx Context.
* @param cacheId Cache ID.
@@ -527,7 +530,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionFullMap partMap) {
+ GridDhtPartitionFullMap partMap,
+ Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
@@ -602,6 +606,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
part2node = p2n;
+ if (cntrMap != null)
+ this.cntrMap = new HashMap<>(cntrMap);
+
consistencyCheck();
if (log.isDebugEnabled())
@@ -617,7 +624,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts) {
+ GridDhtPartitionMap parts,
+ Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -698,6 +706,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
}
+ if (cntrMap != null) {
+ for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+ Long cntr = this.cntrMap.get(e.getKey());
+
+ if (cntr == null || cntr < e.getValue())
+ this.cntrMap.put(e.getKey(), e.getValue());
+ }
+ }
+
consistencyCheck();
if (log.isDebugEnabled())
@@ -852,6 +869,23 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public Map<Integer, Long> updateCounters() {
+ lock.readLock().lock();
+
+ try {
+ return new HashMap<>(cntrMap);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 1516ee4..63e2899 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -114,6 +115,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
/** Group reservations. */
private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
+ /** Update counter. */
+ private final AtomicLong cntr = new AtomicLong();
+
/**
* @param cctx Context.
* @param id Partition ID.
@@ -532,6 +536,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
if (cctx.isDrEnabled())
cctx.dr().partitionEvicted(id);
+ cctx.continuousQueries().onPartitionEvicted(id);
+
cctx.dataStructures().onPartitionEvicted(id);
rent.onDone();
@@ -599,6 +605,35 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
}
/**
+ * @return Next update index.
+ */
+ public long nextUpdateCounter() {
+ return cntr.incrementAndGet();
+ }
+
+ /**
+ * @return Current update index.
+ */
+ public long updateCounter() {
+ return cntr.get();
+ }
+
+ /**
+ * @param val Update index value.
+ */
+ public void updateCounter(long val) {
+ while (true) {
+ long val0 = cntr.get();
+
+ if (val0 >= val)
+ break;
+
+ if (cntr.compareAndSet(val0, val))
+ break;
+ }
+ }
+
+ /**
* Clears values for this partition.
*/
private void clearAll() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d642314..3ac2b85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
@@ -51,6 +52,8 @@ public interface GridDhtPartitionTopology {
*
* @param exchId Exchange ID.
* @param exchFut Exchange future.
+ * @param updateSeq Update sequence.
+ * @param stopping Stopping flag.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
public void updateTopologyVersion(
@@ -193,17 +196,27 @@ public interface GridDhtPartitionTopology {
/**
* @param exchId Exchange ID.
* @param partMap Update partition map.
+ * @param cntrMap Partition update counters.
* @return Local partition map if there were evictions or {@code null} otherwise.
*/
- public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap);
+ public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
+ GridDhtPartitionFullMap partMap,
+ @Nullable Map<Integer, Long> cntrMap);
/**
* @param exchId Exchange ID.
* @param parts Partitions.
+ * @param cntrMap Partition update counters.
* @return Local partition map if there were evictions or {@code null} otherwise.
*/
@Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts);
+ GridDhtPartitionMap parts,
+ @Nullable Map<Integer, Long> cntrMap);
+
+ /**
+ * @return Partition update counters.
+ */
+ public Map<Integer, Long> updateCounters();
/**
* @param part Partition to own.
@@ -213,6 +226,7 @@ public interface GridDhtPartitionTopology {
/**
* @param part Evicted partition.
+ * @param updateSeq Update sequence increment flag.
*/
public void onEvicted(GridDhtLocalPartition part, boolean updateSeq);
@@ -228,4 +242,10 @@ public interface GridDhtPartitionTopology {
* @param threshold Threshold for number of entries.
*/
public void printMemoryStats(int threshold);
-}
\ No newline at end of file
+
+ /**
+ * @param topVer Topology version.
+ * @return {@code True} if rebalance process finished.
+ */
+ public boolean rebalanceFinished(AffinityTopologyVersion topVer);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 6bd283a..39c55db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -102,6 +102,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** Lock. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ /** Partition update counter. */
+ private Map<Integer, Long> cntrMap = new HashMap<>();
+
+ /** */
+ private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
+
/**
* @param cctx Context.
*/
@@ -131,6 +137,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
topReadyFut = null;
topVer = AffinityTopologyVersion.NONE;
+
+ rebalancedTopVer = AffinityTopologyVersion.NONE;
}
finally {
lock.writeLock().unlock();
@@ -220,6 +228,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
updateSeq.setIfGreater(updSeq);
topReadyFut = exchFut;
+
+ rebalancedTopVer = AffinityTopologyVersion.NONE;
}
finally {
lock.writeLock().unlock();
@@ -292,6 +302,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
+ cntrMap.clear();
+
// If this is the oldest node.
if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) {
if (node2part == null) {
@@ -525,6 +537,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
+ updateRebalanceVersion();
+
consistencyCheck();
}
finally {
@@ -732,7 +746,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @param states Additional partition states.
* @return List of nodes for the partition.
*/
- private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
+ private List<ClusterNode> nodes(int p,
+ AffinityTopologyVersion topVer,
+ GridDhtPartitionState state,
+ GridDhtPartitionState... states) {
Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
lock.readLock().lock();
@@ -831,7 +848,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionFullMap partMap) {
+ GridDhtPartitionFullMap partMap,
+ @Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
@@ -843,6 +861,22 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (stopping)
return null;
+ if (cntrMap != null) {
+ for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+ Long cntr = this.cntrMap.get(e.getKey());
+
+ if (cntr == null || cntr < e.getValue())
+ this.cntrMap.put(e.getKey(), e.getValue());
+ }
+
+ for (GridDhtLocalPartition part : locParts.values()) {
+ Long cntr = cntrMap.get(part.id());
+
+ if (cntr != null)
+ part.updateCounter(cntr);
+ }
+ }
+
if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) {
if (log.isDebugEnabled())
log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
@@ -913,6 +947,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
boolean changed = checkEvictions(updateSeq);
+ updateRebalanceVersion();
+
consistencyCheck();
if (log.isDebugEnabled())
@@ -928,7 +964,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts) {
+ GridDhtPartitionMap parts, @Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -946,6 +982,22 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (stopping)
return null;
+ if (cntrMap != null) {
+ for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+ Long cntr = this.cntrMap.get(e.getKey());
+
+ if (cntr == null || cntr < e.getValue())
+ this.cntrMap.put(e.getKey(), e.getValue());
+ }
+
+ for (GridDhtLocalPartition part : locParts.values()) {
+ Long cntr = cntrMap.get(part.id());
+
+ if (cntr != null)
+ part.updateCounter(cntr);
+ }
+ }
+
if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
if (log.isDebugEnabled())
log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
@@ -1008,6 +1060,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
changed |= checkEvictions(updateSeq);
+ updateRebalanceVersion();
+
consistencyCheck();
if (log.isDebugEnabled())
@@ -1254,6 +1308,33 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public Map<Integer, Long> updateCounters() {
+ lock.readLock().lock();
+
+ try {
+ Map<Integer, Long> res = new HashMap<>(cntrMap);
+
+ for (GridDhtLocalPartition part : locParts.values()) {
+ Long cntr0 = res.get(part.id());
+ Long cntr1 = part.updateCounter();
+
+ if (cntr0 == null || cntr1 > cntr0)
+ res.put(part.id(), cntr1);
+ }
+
+ return res;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+ return topVer.equals(rebalancedTopVer);
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
@@ -1266,6 +1347,31 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/**
+ *
+ */
+ private void updateRebalanceVersion() {
+ if (!rebalancedTopVer.equals(topVer)) {
+ for (int i = 0; i < cctx.affinity().partitions(); i++) {
+ List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer);
+
+ // Topology doesn't contain server nodes (just clients).
+ if (affNodes.isEmpty() || (node2part != null && !node2part.valid()))
+ continue;
+
+ List<ClusterNode> owners = owners(i);
+
+ if (affNodes.size() != owners.size() || !owners.containsAll(affNodes))
+ return;
+ }
+
+ rebalancedTopVer = topVer;
+
+ if (log.isDebugEnabled())
+ log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']');
+ }
+ }
+
+ /**
* @param p Partition.
* @param nodeId Node ID.
* @param match State to match.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index bb370a5..e8ef5d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
@@ -42,9 +42,7 @@ import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
@@ -356,6 +354,11 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
add(fut); // Append new future.
+ Collection<Long> updCntrs = new ArrayList<>(dhtMapping.entries().size());
+
+ for (IgniteTxEntry e : dhtMapping.entries())
+ updCntrs.add(e.updateCounter());
+
GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
tx.nearNodeId(),
futId,
@@ -379,7 +382,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.size(),
tx.subjectId(),
tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ updCntrs);
req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index caa0aa5..65f1cb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -66,6 +67,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** Check comitted flag. */
private boolean checkCommitted;
+ /** Partition update counter. */
+ @GridToStringInclude
+ @GridDirectCollection(Long.class)
+ private GridLongList partUpdateCnt;
+
/** One phase commit write version. */
private GridCacheVersion writeVer;
@@ -163,6 +169,76 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
+ * @param nearNodeId Near node ID.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ * @param topVer Topology version.
+ * @param xidVer Transaction ID.
+ * @param threadId Thread ID.
+ * @param commitVer Commit version.
+ * @param isolation Transaction isolation.
+ * @param commit Commit flag.
+ * @param invalidate Invalidate flag.
+ * @param sys System flag.
+ * @param sysInvalidate System invalidation flag.
+ * @param syncCommit Synchronous commit flag.
+ * @param syncRollback Synchronous rollback flag.
+ * @param baseVer Base version.
+ * @param committedVers Committed versions.
+ * @param rolledbackVers Rolled back versions.
+ * @param pendingVers Pending versions.
+ * @param txSize Expected transaction size.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash.
+ * @param updateIdxs Partition update idxs.
+ * @param addDepInfo Deployment info flag.
+ */
+ public GridDhtTxFinishRequest(
+ UUID nearNodeId,
+ IgniteUuid futId,
+ IgniteUuid miniId,
+ @NotNull AffinityTopologyVersion topVer,
+ GridCacheVersion xidVer,
+ GridCacheVersion commitVer,
+ long threadId,
+ TransactionIsolation isolation,
+ boolean commit,
+ boolean invalidate,
+ boolean sys,
+ byte plc,
+ boolean sysInvalidate,
+ boolean syncCommit,
+ boolean syncRollback,
+ GridCacheVersion baseVer,
+ Collection<GridCacheVersion> committedVers,
+ Collection<GridCacheVersion> rolledbackVers,
+ Collection<GridCacheVersion> pendingVers,
+ int txSize,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ boolean addDepInfo,
+ Collection<Long> updateIdxs
+ ) {
+ this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
+ sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
+ subjId, taskNameHash, addDepInfo);
+
+ if (updateIdxs != null && !updateIdxs.isEmpty()) {
+ partUpdateCnt = new GridLongList(updateIdxs.size());
+
+ for (Long idx : updateIdxs)
+ partUpdateCnt.add(idx);
+ }
+ }
+
+ /**
+ * @return Partition update counters.
+ */
+ public GridLongList partUpdateCounters(){
+ return partUpdateCnt;
+ }
+
+ /**
* @return Mini ID.
*/
public IgniteUuid miniId() {
@@ -294,36 +370,42 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
writer.incrementState();
case 22:
- if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
return false;
writer.incrementState();
case 23:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 24:
- if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 25:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
return false;
writer.incrementState();
case 26:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 27:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 28:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -382,7 +464,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 22:
- pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
+ partUpdateCnt = reader.readMessage("partUpdateCnt");
if (!reader.isLastRead())
return false;
@@ -390,7 +472,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 23:
- subjId = reader.readUuid("subjId");
+ pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -398,7 +480,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 24:
- sysInvalidate = reader.readBoolean("sysInvalidate");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -406,7 +488,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 25:
- taskNameHash = reader.readInt("taskNameHash");
+ sysInvalidate = reader.readBoolean("sysInvalidate");
if (!reader.isLastRead())
return false;
@@ -414,7 +496,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 26:
- topVer = reader.readMessage("topVer");
+ taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
return false;
@@ -422,6 +504,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 27:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 28:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -441,6 +531,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 28;
+ return 29;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 75f8c2f..3ee1048 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
@@ -1204,7 +1205,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
try {
- topology().readLock();
+ GridDhtPartitionTopology top = topology();
+
+ top.readLock();
try {
if (topology().stopping()) {
@@ -1221,7 +1224,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Also do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
- !needRemap(req.topologyVersion(), topology().topologyVersion())) {
+ !needRemap(req.topologyVersion(), top.topologyVersion())) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
@@ -1236,7 +1239,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (ver == null) {
// Assign next version for update inside entries lock.
- ver = ctx.versions().next(topology().topologyVersion());
+ ver = ctx.versions().next(top.topologyVersion());
if (hasNear)
res.nearVersion(ver);
@@ -1248,6 +1251,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
log.debug("Using cache version for update request on primary node [ver=" + ver +
", req=" + req + ']');
+ boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
+
dhtFut = createDhtFuture(ver, req, res, completionCb, false);
expiry = expiryPolicy(req.expiry());
@@ -1270,7 +1275,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb,
ctx.isDrEnabled(),
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
deleted = updRes.deleted();
dhtFut = updRes.dhtFuture();
@@ -1289,7 +1295,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb,
ctx.isDrEnabled(),
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
retVal = updRes.returnValue();
deleted = updRes.deleted();
@@ -1309,7 +1316,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
remap = true;
}
finally {
- topology().readUnlock();
+ top.readUnlock();
}
}
catch (GridCacheEntryRemovedException e) {
@@ -1384,6 +1391,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param replicate Whether replication is enabled.
* @param taskName Task name.
* @param expiry Expiry policy.
+ * @param sndPrevVal If {@code true} sends previous value to backups.
* @return Deleted entries.
* @throws GridCacheEntryRemovedException Should not be thrown.
*/
@@ -1399,7 +1407,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
- @Nullable IgniteCacheExpiryPolicy expiry
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean sndPrevVal
) throws GridCacheEntryRemovedException {
assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
@@ -1546,7 +1555,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
replicate,
updRes,
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
firstEntryIdx = i;
@@ -1594,7 +1604,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
replicate,
updRes,
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
firstEntryIdx = i;
@@ -1713,7 +1724,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
replicate,
updRes,
taskName,
- expiry);
+ expiry,
+ sndPrevVal);
}
else
assert filtered.isEmpty();
@@ -1790,6 +1802,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param replicate Whether DR is enabled for that cache.
* @param taskName Task name.
* @param expiry Expiry policy.
+ * @param sndPrevVal If {@code true} sends previous value to backups.
* @return Return value.
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
@@ -1804,7 +1817,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
- @Nullable IgniteCacheExpiryPolicy expiry
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean sndPrevVal
) throws GridCacheEntryRemovedException {
GridCacheReturn retVal = null;
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -1861,7 +1875,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.invokeArguments(),
primary && writeThrough() && !req.skipStore(),
!req.skipStore(),
- req.returnValue(),
+ sndPrevVal || req.returnValue(),
expiry,
true,
true,
@@ -1876,7 +1890,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
intercept,
req.subjectId(),
- taskName);
+ taskName,
+ null,
+ null);
if (dhtFut == null && !F.isEmpty(filteredReaders)) {
dhtFut = createDhtFuture(ver, req, res, completionCb, true);
@@ -1901,7 +1917,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entryProcessor,
updRes.newTtl(),
updRes.conflictExpireTime(),
- newConflictVer);
+ newConflictVer,
+ sndPrevVal,
+ updRes.oldValue(),
+ updRes.updateCounter());
}
if (!F.isEmpty(filteredReaders))
@@ -1918,6 +1937,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
"[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
}
}
+ else if (!entry.isNear() && updRes.success()) {
+ ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(), updRes.oldValue(),
+ entry.isInternal() || !context().userCache(), entry.partition(), primary, false,
+ updRes.updateCounter(), topVer);
+ }
if (hasNear) {
if (primary && updRes.sendToDht()) {
@@ -2008,6 +2032,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param batchRes Batch update result.
* @param taskName Task name.
* @param expiry Expiry policy.
+ * @param sndPrevVal If {@code true} sends previous value to backups.
* @return Deleted entries.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
@@ -2028,7 +2053,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean replicate,
UpdateBatchResult batchRes,
String taskName,
- @Nullable IgniteCacheExpiryPolicy expiry
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean sndPrevVal
) {
assert putMap == null ^ rmvKeys == null;
@@ -2130,7 +2156,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*write-through*/false,
/*read-through*/false,
- /*retval*/false,
+ /*retval*/sndPrevVal,
expiry,
/*event*/true,
/*metrics*/true,
@@ -2145,7 +2171,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/*conflict resolve*/false,
/*intercept*/false,
req.subjectId(),
- taskName);
+ taskName,
+ null,
+ null);
assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null :
"success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry;
@@ -2184,7 +2212,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entryProcessor,
updRes.newTtl(),
CU.EXPIRE_TIME_CALCULATE,
- null);
+ null,
+ sndPrevVal,
+ updRes.oldValue(),
+ updRes.updateCounter());
if (!F.isEmpty(filteredReaders))
dhtFut.addNearWriteEntries(filteredReaders,
@@ -2573,7 +2604,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entry = entryExx(key);
CacheObject val = req.value(i);
+ CacheObject prevVal = req.previousValue(i);
+
EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+ Long updateIdx = req.updateCounter(i);
GridCacheOperation op = entryProcessor != null ? TRANSFORM :
(val != null) ? UPDATE : DELETE;
@@ -2605,11 +2639,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
intercept,
req.subjectId(),
- taskName);
+ taskName,
+ prevVal,
+ updateIdx);
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
+ if (updRes.success() && !entry.isNear())
+ ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(),
+ updRes.oldValue(), entry.isInternal() || !context().userCache(), entry.partition(),
+ false, false, updRes.updateCounter(), req.topologyVersion());
+
entry.onUnlock();
break; // While.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index c34dcfd..c73b3b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -215,13 +215,17 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
* @param ttl TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
+ * @param updateCntr Partition update counter.
*/
public void addWriteEntry(GridDhtCacheEntry entry,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer) {
+ @Nullable GridCacheVersion conflictVer,
+ boolean addPrevVal,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateCntr) {
AffinityTopologyVersion topVer = updateReq.topologyVersion();
Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
@@ -261,7 +265,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
entryProcessor,
ttl,
conflictExpireTime,
- conflictVer);
+ conflictVer,
+ addPrevVal,
+ entry.partition(),
+ prevVal,
+ updateCntr);
+ }
+ else if (dhtNodes.size() == 1) {
+ try {
+ cctx.continuousQueries().onEntryUpdated(entry.key(), val, prevVal,
+ entry.key().internal() || !cctx.userCache(), entry.partition(), true, false,
+ updateCntr, updateReq.topologyVersion());
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal="
+ + val + ", err=" + e + "]");
+ }
}
}
}
@@ -331,8 +350,56 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
cctx.mvcc().removeAtomicFuture(version());
if (err != null) {
- for (KeyCacheObject key : keys)
- updateRes.addFailedKey(key, err);
+ if (!mappings.isEmpty()) {
+ Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
+
+ exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+ for (int i = 0; i < req.size(); i++) {
+ KeyCacheObject key = req.key(i);
+
+ if (!hndKeys.contains(key)) {
+ updateRes.addFailedKey(key, err);
+
+ cctx.continuousQueries().skipUpdateEvent(key, req.partitionId(i), req.updateCounter(i),
+ updateReq.topologyVersion());
+
+ hndKeys.add(key);
+
+ if (hndKeys.size() == keys.size())
+ break exit;
+ }
+ }
+ }
+ }
+ else
+ for (KeyCacheObject key : keys)
+ updateRes.addFailedKey(key, err);
+ }
+ else {
+ Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
+
+ exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+ for (int i = 0; i < req.size(); i++) {
+ KeyCacheObject key = req.key(i);
+
+ if (!hndKeys.contains(key)) {
+ try {
+ cctx.continuousQueries().onEntryUpdated(key, req.value(i), req.localPreviousValue(i),
+ key.internal() || !cctx.userCache(), req.partitionId(i), true, false,
+ req.updateCounter(i), updateReq.topologyVersion());
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Failed to send continuous query message. [key=" + key + ", newVal="
+ + req.value(i) + ", err=" + e + "]");
+ }
+
+ hndKeys.add(key);
+
+ if (hndKeys.size() == keys.size())
+ break exit;
+ }
+ }
+ }
}
if (updateReq.writeSynchronizationMode() == FULL_SYNC)