You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/12/15 11:15:34 UTC
[1/2] ignite git commit: IGNITE-GG-10837 WIP
Repository: ignite
Updated Branches:
refs/heads/ignite-gg-10837 [created] 93656982b
IGNITE-GG-10837 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a011bb72
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a011bb72
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a011bb72
Branch: refs/heads/ignite-gg-10837
Commit: a011bb725b3f2fa7be61c2bb480d222190c67327
Parents: c930e7d
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Dec 14 15:48:12 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Dec 15 13:02:13 2015 +0300
----------------------------------------------------------------------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 5 +--
.../cache/dr/GridCacheDrExpirationInfo.java | 16 ++++++++++
.../processors/cache/dr/GridCacheDrInfo.java | 33 +++++++++++++++++---
.../cache/version/GridCacheVersionManager.java | 25 ++++++++++-----
4 files changed, 65 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ba3d546..d9db2ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedExceptio
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -987,7 +988,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
else if (conflictPutVals != null) {
GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
- val = conflictPutVal.value();
+ val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
@@ -1090,7 +1091,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
// Conflict PUT.
GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
- val = conflictPutVal.value();
+ val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
index 7293950..c5f645f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.dr;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -56,6 +57,21 @@ public class GridCacheDrExpirationInfo extends GridCacheDrInfo {
this.expireTime = expireTime;
}
+ /**
+ * Constructor.
+ *
+ * @param proc Entry processor.
+ * @param ver Version.
+ * @param ttl TTL.
+ * @param expireTime Expire time.
+ */
+ public GridCacheDrExpirationInfo(EntryProcessor proc, GridCacheVersion ver, long ttl, long expireTime) {
+ super(proc, ver);
+
+ this.ttl = ttl;
+ this.expireTime = expireTime;
+ }
+
/** {@inheritDoc} */
@Override public long ttl() {
return ttl;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
index 8635fe2..d37eb7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -36,6 +37,9 @@ public class GridCacheDrInfo implements Externalizable {
/** Value. */
private CacheObject val;
+ /** Entry processor. */
+ private EntryProcessor proc;
+
/** DR version. */
private GridCacheVersion ver;
@@ -61,6 +65,20 @@ public class GridCacheDrInfo implements Externalizable {
}
/**
+ * Constructor.
+ *
+ * @param proc Entry processor.
+ * @param ver Version.
+ */
+ public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) {
+ assert val != null;
+ assert ver != null;
+
+ this.proc = proc;
+ this.ver = ver;
+ }
+
+ /**
* @return Value.
*/
public CacheObject value() {
@@ -68,6 +86,13 @@ public class GridCacheDrInfo implements Externalizable {
}
/**
+ * @return Value (entry processor or cache object.
+ */
+ public Object valueEx() {
+ return val == null ? proc : val;
+ }
+
+ /**
* @return Version.
*/
public GridCacheVersion version() {
@@ -88,13 +113,13 @@ public class GridCacheDrInfo implements Externalizable {
return CU.EXPIRE_TIME_ETERNAL;
}
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
assert false;
}
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
assert false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 68d03cd..b5fc4ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -176,7 +176,15 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on current topology.
*/
public GridCacheVersion next() {
- return next(cctx.kernalContext().discovery().topologyVersion(), true, false);
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, false, null);
+ }
+
+ /**
+ * @param dataCenterId Data center id.
+ * @return Next version based on current topology with given data center id.
+ */
+ public GridCacheVersion next(byte dataCenterId) {
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
}
/**
@@ -188,7 +196,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on given topology version.
*/
public GridCacheVersion next(AffinityTopologyVersion topVer) {
- return next(topVer.topologyVersion(), true, false);
+ return next(topVer.topologyVersion(), true, false, null);
}
/**
@@ -197,7 +205,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad() {
- return next(cctx.kernalContext().discovery().topologyVersion(), true, true);
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, true, null);
}
/**
@@ -206,7 +214,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(AffinityTopologyVersion topVer) {
- return next(topVer.topologyVersion(), true, true);
+ return next(topVer.topologyVersion(), true, true, null);
}
/**
@@ -215,7 +223,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(GridCacheVersion ver) {
- return next(ver.topologyVersion(), false, true);
+ return next(ver.topologyVersion(), false, true, null);
}
/**
@@ -225,7 +233,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on given cache version.
*/
public GridCacheVersion next(GridCacheVersion ver) {
- return next(ver.topologyVersion(), false, false);
+ return next(ver.topologyVersion(), false, false, null);
}
/**
@@ -237,9 +245,10 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @param topVer Topology version for which new version should be obtained.
* @param addTime If {@code true} then adds to the given topology version number of seconds
* from the start time of the first grid node.
+ * @param dataCenterId0 Data center id.
* @return New lock order.
*/
- private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad) {
+ private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad, Byte dataCenterId0) {
if (topVer == -1)
topVer = cctx.kernalContext().discovery().topologyVersion();
@@ -261,7 +270,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
globalTime,
ord,
locNodeOrder,
- dataCenterId);
+ dataCenterId0 == null ? dataCenterId : dataCenterId0);
last = next;
[2/2] ignite git commit: IGNITE-GG-10837 WIP
Posted by nt...@apache.org.
IGNITE-GG-10837 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93656982
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93656982
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93656982
Branch: refs/heads/ignite-gg-10837
Commit: 93656982b8cbf993445676a92885234788ed6533
Parents: a011bb7
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Dec 15 12:53:55 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Dec 15 13:02:14 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 34 +++++++++++++++++
.../processors/cache/GridCacheProxyImpl.java | 19 ++++++++++
.../processors/cache/IgniteInternalCache.java | 22 +++++++++++
.../dht/atomic/GridDhtAtomicCache.java | 28 +++++++++++++-
.../distributed/near/GridNearAtomicCache.java | 12 ++++++
.../processors/cache/dr/GridCacheDrInfo.java | 9 ++++-
.../transactions/IgniteTxLocalAdapter.java | 39 +++++++++++++++++---
.../cache/transactions/IgniteTxLocalEx.java | 10 +++++
8 files changed, 165 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index cc4e962..914797f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2077,6 +2077,40 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @Override public void invokeAllConflict(final Map<KeyCacheObject, GridCacheDrInfo> map,
+ final Object... args) throws IgniteCheckedException {
+ if (F.isEmpty(map))
+ return;
+
+ syncOp(new SyncInOp(map.size() == 1) {
+ @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ tx.invokeAllDrAsync(ctx, map, args).get();
+ }
+
+ @Override public String toString() {
+ return "invokeAllDrAsync [drMap=" + map + ']';
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> invokeAllConflictAsync(final Map<KeyCacheObject, GridCacheDrInfo> map,
+ final Object... args) throws IgniteCheckedException {
+ if (F.isEmpty(map))
+ return new GridFinishedFuture<Object>();
+
+ return asyncOp(new AsyncInOp(map.keySet()) {
+ @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ return tx.invokeAllDrAsync(ctx, map, args);
+ }
+
+ @Override public String toString() {
+ return "invokeAllDrAsync [drMap=" + map + ']';
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
@Override public <T> EntryProcessorResult<T> invoke(final K key,
final EntryProcessor<K, V, T> entryProcessor,
final Object... args)
http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index d1d93d8..3753a9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -523,6 +523,25 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
+ @Override public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map,
+ Object... args) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ delegate.invokeAllConflict(map, args);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map,
+ Object... args) throws IgniteCheckedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public <T> EntryProcessorResult<T> invoke(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 186de68..61dc13b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -1557,6 +1557,28 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
throws IgniteCheckedException;
/**
+ * Invoke with conflict resolution.
+ *
+ * @param map Map containing keys and entry processors to be applied to values.
+ * @param args Arguments.
+ * @return Invoke results.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args)
+ throws IgniteCheckedException;
+
+ /**
+ * Invoke async with conflict resolution.
+ *
+ * @param map Map containing keys and entry processors to be applied to values.
+ * @param args Arguments.
+ * @return Invoke results.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args)
+ throws IgniteCheckedException;
+
+ /**
* Removes DR data.
*
* @param drMap DR map.
http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 481317a..4f2f8f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -587,6 +587,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args)
+ throws IgniteCheckedException {
+ invokeAllConflictAsync(map, args).get();
+ }
+
+ /** {@inheritDoc} */
+ public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap,
+ Object... args) throws IgniteCheckedException {
+
+ return updateAllAsync0(null,
+ null,
+ args,
+ conflictMap,
+ null,
+ false,
+ false,
+ null,
+ true);
+ }
+
+ /** {@inheritDoc} */
@Override public V getAndRemove(K key) throws IgniteCheckedException {
return getAndRemoveAsync(key).get();
}
@@ -897,11 +918,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
+ GridCacheOperation op = invokeMap != null ? TRANSFORM : UPDATE;
+
+ if (op == UPDATE && conflictPutMap != null && !conflictPutMap.isEmpty())
+ op = F.firstEntry(conflictPutMap).getValue().entryProcessor() != null ? TRANSFORM : UPDATE;
+
final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
ctx,
this,
ctx.config().getWriteSynchronizationMode(),
- invokeMap != null ? TRANSFORM : UPDATE,
+ op,
map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
conflictPutMap.keySet() : conflictRmvMap.keySet(),
map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 06898cd..335266d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -545,6 +545,18 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map,
+ Object... args) throws IgniteCheckedException {
+ dht.invokeAllConflict(map, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map,
+ Object... args) throws IgniteCheckedException {
+ return dht.invokeAllConflictAsync(map, args);
+ }
+
+ /** {@inheritDoc} */
@Override public <T> EntryProcessorResult<T> invoke(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
index d37eb7b..1ec45e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
@@ -71,7 +71,7 @@ public class GridCacheDrInfo implements Externalizable {
* @param ver Version.
*/
public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) {
- assert val != null;
+ assert proc != null;
assert ver != null;
this.proc = proc;
@@ -86,6 +86,13 @@ public class GridCacheDrInfo implements Externalizable {
}
/**
+ * @return Entry processor.
+ */
+ public EntryProcessor entryProcessor() {
+ return proc;
+ }
+
+ /**
* @return Value (entry processor or cache object.
*/
public Object valueEx() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index a3aed34..8e7069d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1979,6 +1979,26 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> invokeAllDrAsync(GridCacheContext cacheCtx,
+ Map<KeyCacheObject, GridCacheDrInfo> drMap,
+ Object... args
+ ) {
+ Map<Object, EntryProcessor<Object, Object, Object>> invoke = new LinkedHashMap<>();
+
+ for (Map.Entry<KeyCacheObject, GridCacheDrInfo> e : drMap.entrySet())
+ if (e.getValue().entryProcessor() != null)
+ invoke.put(e.getKey(), e.getValue().entryProcessor());
+
+ return this.<Object, Object>putAllAsync0(cacheCtx,
+ null,
+ invoke,
+ args,
+ drMap,
+ true,
+ null);
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
GridCacheContext cacheCtx,
@@ -3062,13 +3082,20 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (drMap != null) {
assert map == null;
- map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
- @Override public Object apply(GridCacheDrInfo val) {
- return val.value();
- }
- });
+ if (invokeMap == null) {
+ map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
+ @Override public Object apply(GridCacheDrInfo val) {
+ return val.value();
+ }
+ });
+
+ invokeMap0 = null;
+ }
+ else {
+ map0 = null;
- invokeMap0 = null;
+ invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+ }
}
else {
map0 = map;
http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index a5d3373..2895a81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -156,6 +156,16 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
/**
* @param cacheCtx Cache context.
+ * @param drMap DR map to put.
+ * @return Future for DR put operation.
+ */
+ public IgniteInternalFuture<?> invokeAllDrAsync(
+ GridCacheContext cacheCtx,
+ Map<KeyCacheObject, GridCacheDrInfo> drMap,
+ Object... args);
+
+ /**
+ * @param cacheCtx Cache context.
* @param drMap DR map.
* @return Future for asynchronous remove.
*/