You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/12/15 11:15:34 UTC

[1/2] ignite git commit: IGNITE-GG-10837 WIP

Repository: ignite
Updated Branches:
  refs/heads/ignite-gg-10837 [created] 93656982b


IGNITE-GG-10837 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a011bb72
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a011bb72
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a011bb72

Branch: refs/heads/ignite-gg-10837
Commit: a011bb725b3f2fa7be61c2bb480d222190c67327
Parents: c930e7d
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Dec 14 15:48:12 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Dec 15 13:02:13 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  5 +--
 .../cache/dr/GridCacheDrExpirationInfo.java     | 16 ++++++++++
 .../processors/cache/dr/GridCacheDrInfo.java    | 33 +++++++++++++++++---
 .../cache/version/GridCacheVersionManager.java  | 25 ++++++++++-----
 4 files changed, 65 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ba3d546..d9db2ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedExceptio
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -987,7 +988,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 else if (conflictPutVals != null) {
                     GridCacheDrInfo conflictPutVal =  conflictPutValsIt.next();
 
-                    val = conflictPutVal.value();
+                    val = conflictPutVal.valueEx();
                     conflictVer = conflictPutVal.version();
                     conflictTtl =  conflictPutVal.ttl();
                     conflictExpireTime = conflictPutVal.expireTime();
@@ -1090,7 +1091,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 // Conflict PUT.
                 GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
 
-                val = conflictPutVal.value();
+                val = conflictPutVal.valueEx();
                 conflictVer = conflictPutVal.version();
                 conflictTtl = conflictPutVal.ttl();
                 conflictExpireTime = conflictPutVal.expireTime();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
index 7293950..c5f645f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.dr;
 
+import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -56,6 +57,21 @@ public class GridCacheDrExpirationInfo extends GridCacheDrInfo {
         this.expireTime = expireTime;
     }
 
+    /**
+     * Constructor.
+     *
+     * @param proc Entry processor.
+     * @param ver Version.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
+     */
+    public GridCacheDrExpirationInfo(EntryProcessor proc, GridCacheVersion ver, long ttl, long expireTime) {
+        super(proc, ver);
+
+        this.ttl = ttl;
+        this.expireTime = expireTime;
+    }
+
     /** {@inheritDoc} */
     @Override public long ttl() {
         return ttl;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
index 8635fe2..d37eb7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -36,6 +37,9 @@ public class GridCacheDrInfo implements Externalizable {
     /** Value. */
     private CacheObject val;
 
+    /** Entry processor. */
+    private EntryProcessor proc;
+
     /** DR version. */
     private GridCacheVersion ver;
 
@@ -61,6 +65,20 @@ public class GridCacheDrInfo implements Externalizable {
     }
 
     /**
+     * Constructor.
+     *
+     * @param proc Entry processor.
+     * @param ver Version.
+     */
+    public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) {
+        assert val != null;
+        assert ver != null;
+
+        this.proc = proc;
+        this.ver = ver;
+    }
+
+    /**
      * @return Value.
      */
     public CacheObject value() {
@@ -68,6 +86,13 @@ public class GridCacheDrInfo implements Externalizable {
     }
 
     /**
+     * @return Value (entry processor or cache object.
+     */
+    public Object valueEx() {
+        return val == null ? proc : val;
+    }
+
+    /**
      * @return Version.
      */
     public GridCacheVersion version() {
@@ -88,13 +113,13 @@ public class GridCacheDrInfo implements Externalizable {
         return CU.EXPIRE_TIME_ETERNAL;
     }
 
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         assert false;
     }
 
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
         assert false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 68d03cd..b5fc4ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -176,7 +176,15 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version based on current topology.
      */
     public GridCacheVersion next() {
-        return next(cctx.kernalContext().discovery().topologyVersion(), true, false);
+        return next(cctx.kernalContext().discovery().topologyVersion(), true, false, null);
+    }
+
+    /**
+     * @param dataCenterId Data center id.
+     * @return Next version based on current topology with given data center id.
+     */
+    public GridCacheVersion next(byte dataCenterId) {
+        return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
     }
 
     /**
@@ -188,7 +196,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version based on given topology version.
      */
     public GridCacheVersion next(AffinityTopologyVersion topVer) {
-        return next(topVer.topologyVersion(), true, false);
+        return next(topVer.topologyVersion(), true, false, null);
     }
 
     /**
@@ -197,7 +205,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version for cache store operations.
      */
     public GridCacheVersion nextForLoad() {
-        return next(cctx.kernalContext().discovery().topologyVersion(), true, true);
+        return next(cctx.kernalContext().discovery().topologyVersion(), true, true, null);
     }
 
     /**
@@ -206,7 +214,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version for cache store operations.
      */
     public GridCacheVersion nextForLoad(AffinityTopologyVersion topVer) {
-        return next(topVer.topologyVersion(), true, true);
+        return next(topVer.topologyVersion(), true, true, null);
     }
 
     /**
@@ -215,7 +223,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version for cache store operations.
      */
     public GridCacheVersion nextForLoad(GridCacheVersion ver) {
-        return next(ver.topologyVersion(), false, true);
+        return next(ver.topologyVersion(), false, true, null);
     }
 
     /**
@@ -225,7 +233,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version based on given cache version.
      */
     public GridCacheVersion next(GridCacheVersion ver) {
-        return next(ver.topologyVersion(), false, false);
+        return next(ver.topologyVersion(), false, false, null);
     }
 
     /**
@@ -237,9 +245,10 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @param topVer Topology version for which new version should be obtained.
      * @param addTime If {@code true} then adds to the given topology version number of seconds
      *        from the start time of the first grid node.
+     * @param dataCenterId0 Data center id.
      * @return New lock order.
      */
-    private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad) {
+    private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad, Byte dataCenterId0) {
         if (topVer == -1)
             topVer = cctx.kernalContext().discovery().topologyVersion();
 
@@ -261,7 +270,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
             globalTime,
             ord,
             locNodeOrder,
-            dataCenterId);
+            dataCenterId0 == null ? dataCenterId : dataCenterId0);
 
         last = next;
 


[2/2] ignite git commit: IGNITE-GG-10837 WIP

Posted by nt...@apache.org.
IGNITE-GG-10837 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93656982
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93656982
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93656982

Branch: refs/heads/ignite-gg-10837
Commit: 93656982b8cbf993445676a92885234788ed6533
Parents: a011bb7
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Dec 15 12:53:55 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Dec 15 13:02:14 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 34 +++++++++++++++++
 .../processors/cache/GridCacheProxyImpl.java    | 19 ++++++++++
 .../processors/cache/IgniteInternalCache.java   | 22 +++++++++++
 .../dht/atomic/GridDhtAtomicCache.java          | 28 +++++++++++++-
 .../distributed/near/GridNearAtomicCache.java   | 12 ++++++
 .../processors/cache/dr/GridCacheDrInfo.java    |  9 ++++-
 .../transactions/IgniteTxLocalAdapter.java      | 39 +++++++++++++++++---
 .../cache/transactions/IgniteTxLocalEx.java     | 10 +++++
 8 files changed, 165 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index cc4e962..914797f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2077,6 +2077,40 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public void invokeAllConflict(final Map<KeyCacheObject, GridCacheDrInfo> map,
+        final Object... args) throws IgniteCheckedException {
+        if (F.isEmpty(map))
+            return;
+
+        syncOp(new SyncInOp(map.size() == 1) {
+            @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+                tx.invokeAllDrAsync(ctx, map, args).get();
+            }
+
+            @Override public String toString() {
+                return "invokeAllDrAsync [drMap=" + map + ']';
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> invokeAllConflictAsync(final Map<KeyCacheObject, GridCacheDrInfo> map,
+        final Object... args) throws IgniteCheckedException {
+        if (F.isEmpty(map))
+            return new GridFinishedFuture<Object>();
+
+        return asyncOp(new AsyncInOp(map.keySet()) {
+            @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+                return tx.invokeAllDrAsync(ctx, map, args);
+            }
+
+            @Override public String toString() {
+                return "invokeAllDrAsync [drMap=" + map + ']';
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> EntryProcessorResult<T> invoke(final K key,
         final EntryProcessor<K, V, T> entryProcessor,
         final Object... args)

http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index d1d93d8..3753a9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -523,6 +523,25 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Override public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map,
+        Object... args) throws IgniteCheckedException {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            delegate.invokeAllConflict(map, args);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map,
+        Object... args) throws IgniteCheckedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> EntryProcessorResult<T> invoke(K key,
         EntryProcessor<K, V, T> entryProcessor,
         Object... args) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 186de68..61dc13b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -1557,6 +1557,28 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
         throws IgniteCheckedException;
 
     /**
+     * Invoke with conflict resolution.
+     *
+     * @param map Map containing keys and entry processors to be applied to values.
+     * @param args Arguments.
+     * @return Invoke results.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args)
+        throws IgniteCheckedException;
+
+    /**
+     * Invoke async with conflict resolution.
+     *
+     * @param map Map containing keys and entry processors to be applied to values.
+     * @param args Arguments.
+     * @return Invoke results.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args)
+        throws IgniteCheckedException;
+
+    /**
      * Removes DR data.
      *
      * @param drMap DR map.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 481317a..4f2f8f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -587,6 +587,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args)
+        throws IgniteCheckedException {
+        invokeAllConflictAsync(map, args).get();
+    }
+
+    /** {@inheritDoc} */
+    public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap,
+        Object... args) throws IgniteCheckedException {
+
+        return updateAllAsync0(null,
+            null,
+            args,
+            conflictMap,
+            null,
+            false,
+            false,
+            null,
+            true);
+    }
+
+    /** {@inheritDoc} */
     @Override public V getAndRemove(K key) throws IgniteCheckedException {
         return getAndRemoveAsync(key).get();
     }
@@ -897,11 +918,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
 
+        GridCacheOperation op = invokeMap != null ? TRANSFORM : UPDATE;
+
+        if (op == UPDATE && conflictPutMap != null && !conflictPutMap.isEmpty())
+            op = F.firstEntry(conflictPutMap).getValue().entryProcessor() != null ? TRANSFORM : UPDATE;
+
         final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
             ctx,
             this,
             ctx.config().getWriteSynchronizationMode(),
-            invokeMap != null ? TRANSFORM : UPDATE,
+            op,
             map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
                 conflictPutMap.keySet() : conflictRmvMap.keySet(),
             map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 06898cd..335266d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -545,6 +545,18 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map,
+        Object... args) throws IgniteCheckedException {
+        dht.invokeAllConflict(map, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map,
+        Object... args) throws IgniteCheckedException {
+        return dht.invokeAllConflictAsync(map, args);
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> EntryProcessorResult<T> invoke(K key,
         EntryProcessor<K, V, T> entryProcessor,
         Object... args) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
index d37eb7b..1ec45e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
@@ -71,7 +71,7 @@ public class GridCacheDrInfo implements Externalizable {
      * @param ver Version.
      */
     public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) {
-        assert val != null;
+        assert proc != null;
         assert ver != null;
 
         this.proc = proc;
@@ -86,6 +86,13 @@ public class GridCacheDrInfo implements Externalizable {
     }
 
     /**
+     * @return Entry processor.
+     */
+    public EntryProcessor entryProcessor() {
+        return proc;
+    }
+
+    /**
      * @return Value (entry processor or cache object.
      */
     public Object valueEx() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index a3aed34..8e7069d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1979,6 +1979,26 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> invokeAllDrAsync(GridCacheContext cacheCtx,
+        Map<KeyCacheObject, GridCacheDrInfo> drMap,
+        Object... args
+    ) {
+        Map<Object, EntryProcessor<Object, Object, Object>> invoke = new LinkedHashMap<>();
+
+        for (Map.Entry<KeyCacheObject, GridCacheDrInfo> e : drMap.entrySet())
+            if (e.getValue().entryProcessor() != null)
+                invoke.put(e.getKey(), e.getValue().entryProcessor());
+
+        return this.<Object, Object>putAllAsync0(cacheCtx,
+            null,
+            invoke,
+            args,
+            drMap,
+            true,
+            null);
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
         GridCacheContext cacheCtx,
@@ -3062,13 +3082,20 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         if (drMap != null) {
             assert map == null;
 
-            map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
-                @Override public Object apply(GridCacheDrInfo val) {
-                    return val.value();
-                }
-            });
+            if (invokeMap == null) {
+                map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
+                    @Override public Object apply(GridCacheDrInfo val) {
+                        return val.value();
+                    }
+                });
+
+                invokeMap0 = null;
+            }
+            else  {
+                map0 = null;
 
-            invokeMap0 = null;
+                invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+            }
         }
         else {
             map0 = map;

http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index a5d3373..2895a81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -156,6 +156,16 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
 
     /**
      * @param cacheCtx Cache context.
+     * @param drMap DR map to put.
+     * @return Future for DR put operation.
+     */
+    public IgniteInternalFuture<?> invokeAllDrAsync(
+        GridCacheContext cacheCtx,
+        Map<KeyCacheObject, GridCacheDrInfo> drMap,
+        Object... args);
+
+    /**
+     * @param cacheCtx Cache context.
      * @param drMap DR map.
      * @return Future for asynchronous remove.
      */