You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/29 14:48:22 UTC

[2/2] incubator-ignite git commit: ignite-44

ignite-44


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/58de0b22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/58de0b22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/58de0b22

Branch: refs/heads/ignite-44-8273
Commit: 58de0b22f9b9ec7b08ae882308ab0b067638923c
Parents: 1600e5c
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 29 16:45:54 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 29 16:45:54 2014 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheInvokeResult.java     |   9 +-
 .../kernal/processors/cache/GridCacheUtils.java |  29 +++++
 .../distributed/GridDistributedLockRequest.java |   1 +
 .../dht/GridDhtTransactionalCacheAdapter.java   |  46 ++++++--
 .../colocated/GridDhtColocatedLockFuture.java   |   6 ++
 .../distributed/near/GridNearLockResponse.java  | 106 ++++++++++++++++++-
 .../distributed/near/GridNearTxRemote.java      |   6 ++
 .../cache/transactions/IgniteTxEntry.java       |  19 +++-
 .../transactions/IgniteTxLocalAdapter.java      |  37 +++----
 .../GridCacheReturnValueTransferSelfTest.java   |   6 +-
 10 files changed, 225 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
index ab0959e..4d51c4e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
@@ -45,8 +45,6 @@ public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externaliz
      * @param res Computed result.
      */
     public CacheInvokeResult(T res) {
-        assert res != null;
-
         this.res = res;
     }
 
@@ -57,6 +55,13 @@ public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externaliz
         this.err = err;
     }
 
+    /**
+     * @return {@code True} if both result and error are {@code null}.
+     */
+    public boolean empty() {
+        return res == null && err == null;
+    }
+
     /** {@inheritDoc} */
     @Override public Object ggClassId() {
         return GG_CLASS_ID;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
index 1646bf1..4e8f985 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
@@ -27,6 +27,7 @@ import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -1671,4 +1672,32 @@ public class GridCacheUtils {
 
         return duration.getTimeUnit().toMillis(duration.getDurationAmount());
     }
+
+    /**
+     * @param txEntry Entry.
+     * @param val Value.
+     * @return Invoke result.
+     */
+    @Nullable public static <K, V> CacheInvokeResult<Object> computeInvokeResult(
+        IgniteTxEntry<K, V> txEntry, V val, boolean ignoreNull) {
+        try {
+            Object res = null;
+
+            for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) {
+                CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.key(), val);
+
+                EntryProcessor<K, V, ?> entryProcessor = t.get1();
+
+                res = entryProcessor.process(invokeEntry, t.get2());
+            }
+
+            if (res == null && ignoreNull)
+                return null;
+            else
+                return new CacheInvokeResult<>(res);
+        }
+        catch (Exception e) {
+            return new CacheInvokeResult<>(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java
index 749eff0..c8cdd65 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -106,6 +106,7 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage
     }
 
     /**
+     * @param cacheId Cache ID.
      * @param nodeId Node ID.
      * @param nearXidVer Near transaction ID.
      * @param threadId Thread ID.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index b7ff63e..27f7336 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -955,10 +955,17 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         assert mappedVer != null;
         assert tx == null || tx.xidVersion().equals(mappedVer);
 
+        boolean nearCacheReq = U.hasNearCache(nearNode, ctx.name());
+
         try {
             // Send reply back to originating near node.
             GridNearLockResponse<K, V> res = new GridNearLockResponse<>(ctx.cacheId(),
-                req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err);
+                req.version(),
+                req.futureId(),
+                req.miniId(),
+                tx != null && tx.onePhaseCommit(),
+                entries.size(),
+                err);
 
             if (err == null) {
                 res.pending(localDhtPendingVersions(entries, mappedVer));
@@ -984,11 +991,29 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                 try {
                                     GridCacheVersion ver = e.version();
 
-                                    boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver);
+                                    IgniteTxEntry<K, V> writeEntry = null;
+
+                                    boolean ret;
+
+                                    if (req.implicitTx()) {
+                                        ret = req.returnValue(i) ||
+                                            (nearCacheReq && (dhtVer == null || !dhtVer.equals(ver)));
+                                    }
+                                    else
+                                        ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver);
+
+                                    boolean invoke = false;
+
+                                    if (!ret && tx != null && req.hasTransforms()) {
+                                        writeEntry = tx.entry(ctx.txKey(e.key()));
+
+                                        if (writeEntry.op() == TRANSFORM)
+                                            invoke = true;
+                                    }
 
                                     V val = null;
 
-                                    if (ret)
+                                    if (ret || invoke)
                                         val = e.innerGet(tx,
                                             /*swap*/true,
                                             /*read-through*/true,
@@ -1014,7 +1039,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                     boolean filterPassed = false;
 
                                     if (tx != null && tx.onePhaseCommit()) {
-                                        IgniteTxEntry<K, V> writeEntry = tx.entry(ctx.txKey(e.key()));
+                                        if (writeEntry == null)
+                                            writeEntry = tx.entry(ctx.txKey(e.key()));
 
                                         assert writeEntry != null :
                                             "Missing tx entry for locked cache entry: " + e;
@@ -1032,6 +1058,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                         filterPassed,
                                         ver,
                                         mappedVer,
+                                        invoke ? computeInvokeResult(writeEntry, val, false) : null,
                                         ctx);
                                 }
                                 catch (GridCacheFilterFailedException ex) {
@@ -1043,7 +1070,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                             else {
                                 // We include values into response since they are required for local
                                 // calls and won't be serialized. We are also including DHT version.
-                                res.addValueBytes(null, null, false, e.version(), mappedVer, ctx);
+                                res.addValueBytes(null, null, false, e.version(), mappedVer, null, ctx);
                             }
 
                             break;
@@ -1069,8 +1096,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             U.error(log, "Failed to get value for lock reply message for node [node=" +
                 U.toShortString(nearNode) + ", req=" + req + ']', e);
 
-            return new GridNearLockResponse<>(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false,
-                entries.size(), e);
+            return new GridNearLockResponse<>(ctx.cacheId(),
+                req.version(),
+                req.futureId(),
+                req.miniId(),
+                false,
+                entries.size(),
+                e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index e6a4eb7..545ad31 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -1261,6 +1261,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                         // In colocated cache we must receive responses only for detached entries.
                         assert txEntry.cached().detached();
 
+                        if (txEntry.op() == GridCacheOperation.TRANSFORM) {
+                            CacheInvokeResult<Object> invokeRes = res.invokeResult(i);
+
+                            txEntry.invokeResult(invokeRes);
+                        }
+
                         txEntry.markLocked();
 
                         GridDhtDetachedCacheEntry<K, V> entry = (GridDhtDetachedCacheEntry<K, V>)txEntry.cached();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
index 7711470..5ea89ad 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
@@ -49,6 +49,15 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
     /** Filter evaluation results for fast-commit transactions. */
     private boolean[] filterRes;
 
+    /** Result for invoke operation. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private List<CacheInvokeResult<Object>> invokeRes;
+
+    /** Serialized results for invoke operation. */
+    @GridDirectCollection(byte[].class)
+    private List<byte[]> invokeResBytes;
+
     /**
      * Empty constructor (required by {@link Externalizable}).
      */
@@ -148,6 +157,7 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
      * @param filterPassed Boolean flag indicating whether filter passed for fast-commit transaction.
      * @param dhtVer DHT version.
      * @param mappedVer Mapped version.
+     * @param res Result for invoke operation.
      * @param ctx Context.
      * @throws IgniteCheckedException If failed.
      */
@@ -157,6 +167,7 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
         boolean filterPassed,
         @Nullable GridCacheVersion dhtVer,
         @Nullable GridCacheVersion mappedVer,
+        @Nullable CacheInvokeResult<Object> res,
         GridCacheContext<K, V> ctx
     ) throws IgniteCheckedException {
         int idx = valuesSize();
@@ -167,10 +178,39 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
         if (filterRes != null)
             filterRes[idx] = filterPassed;
 
+        if (res != null) {
+            if (invokeRes == null)
+                invokeRes = new ArrayList<>(dhtVers.length);
+
+            invokeRes.add(res);
+        }
+
         // Delegate to super.
         addValueBytes(val, valBytes, ctx);
     }
 
+    /**
+     * @param idx Key index.
+     * @return Result for invoke operation.
+     */
+    public CacheInvokeResult<Object> invokeResult(int idx) {
+        return invokeRes != null ? invokeRes.get(idx) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        invokeResBytes = marshalCollection(invokeRes, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        invokeRes = unmarshalCollection(invokeResBytes, ctx, ldr);
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
     @Override public GridTcpCommunicationMessageAdapter clone() {
@@ -192,6 +232,8 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
         _clone.dhtVers = dhtVers;
         _clone.mappedVers = mappedVers;
         _clone.filterRes = filterRes;
+        _clone.invokeRes = invokeRes;
+        _clone.invokeResBytes = invokeResBytes;
     }
 
     /** {@inheritDoc} */
@@ -244,6 +286,33 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
                 commState.idx++;
 
             case 13:
+                if (invokeResBytes != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(invokeResBytes.size()))
+                            return false;
+
+                        commState.it = invokeResBytes.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putByteArray((byte[])commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 14:
                 if (mappedVers != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(mappedVers.length))
@@ -270,13 +339,13 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
 
                 commState.idx++;
 
-            case 14:
+            case 15:
                 if (!commState.putGridUuid(miniId))
                     return false;
 
                 commState.idx++;
 
-            case 15:
+            case 16:
                 if (pending != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(pending.size()))
@@ -365,6 +434,35 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
                 }
 
                 if (commState.readSize >= 0) {
+                    if (invokeResBytes == null)
+                        invokeResBytes = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; i++) {
+                        byte[] _val = commState.getByteArray();
+
+                        if (_val == BYTE_ARR_NOT_READ)
+                            return false;
+
+                        invokeResBytes.add((byte[])_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+            case 14:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
                     if (mappedVers == null)
                         mappedVers = new GridCacheVersion[commState.readSize];
 
@@ -385,7 +483,7 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
 
                 commState.idx++;
 
-            case 14:
+            case 15:
                 IgniteUuid miniId0 = commState.getGridUuid();
 
                 if (miniId0 == GRID_UUID_NOT_READ)
@@ -395,7 +493,7 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
 
                 commState.idx++;
 
-            case 15:
+            case 16:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
index c419a48..d08e5e5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -72,6 +72,8 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
      * @param ctx Cache registry.
      * @param txSize Expected transaction size.
      * @param grpLockKey Group lock key if this is a group-lock transaction.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
      * @throws IgniteCheckedException If unmarshalling failed.
      */
     public GridNearTxRemote(
@@ -130,6 +132,8 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
      * @param ctx Cache registry.
      * @param txSize Expected transaction size.
      * @param grpLockKey Collection of group lock keys if this is a group-lock transaction.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
      */
     public GridNearTxRemote(
         GridCacheSharedContext<K, V> ctx,
@@ -311,8 +315,10 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @param key Key to add to read set.
      * @param keyBytes Key bytes.
+     * @param op Cache operation.
      * @param val Value.
      * @param valBytes Value bytes.
      * @param drVer Data center replication version.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
index 73d17b5..25c1fb3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
@@ -70,10 +70,13 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
     /** Filter bytes. */
     private byte[] filterBytes;
 
-    /** Transform. */
+    /** EntryProcessors for invoke operation. */
     @GridToStringInclude
     private Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessorsCol;
 
+    /** */
+    private CacheInvokeResult<Object> invokeRes;
+
     /** Transform closure bytes. */
     @GridToStringExclude
     private byte[] transformClosBytes;
@@ -624,6 +627,20 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
     }
 
     /**
+     * @param invokeRes Remotely computed result for invoke operation.
+     */
+    public void invokeResult(@Nullable CacheInvokeResult<Object> invokeRes) {
+        this.invokeRes = invokeRes;
+    }
+
+    /**
+     * @return Remotely computed result for invoke operation.
+     */
+    @Nullable public CacheInvokeResult<Object> invokeResult() {
+        return invokeRes;
+    }
+
+    /**
      * @return Collection of entry processors.
      */
     public Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 6380605..472f607 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -2259,7 +2259,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
                         retval = true;
 
-                    if (retval || txEntry.op() == TRANSFORM) {
+                    boolean invoke = computeInvoke && txEntry.op() == TRANSFORM && txEntry.invokeResult() == null;
+
+                    if (retval || invoke) {
                         if (!cacheCtx.isNear()) {
                             try {
                                 if (!hasPrevVal)
@@ -2288,14 +2290,18 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 v = cached.rawGetOrUnmarshal(false);
                         }
 
-                        if (txEntry.op() == TRANSFORM) {
-                            if (computeInvoke)
-                                addInvokeResult(txEntry, v, ret);
-                        }
-                        else
+                        if (retval)
                             ret.value(v);
                     }
 
+                    if (computeInvoke && txEntry.op() == TRANSFORM) {
+                        CacheInvokeResult<Object> res =
+                            invoke ? CU.computeInvokeResult(txEntry, v, true) : txEntry.invokeResult();
+
+                        if (res != null && !res.empty())
+                            ret.addEntryProcessResult(k, res);
+                    }
+
                     boolean pass = cacheCtx.isAll(cached, filter);
 
                     // For remove operation we return true only if we are removing s/t,
@@ -2359,23 +2365,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param ret Return value to update.
      */
     private void addInvokeResult(IgniteTxEntry<K, V> txEntry, V val, GridCacheReturn ret) {
-        try {
-            Object res = null;
-
-            for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) {
-                CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.key(), val);
+        CacheInvokeResult<Object> res = CU.computeInvokeResult(txEntry, val, true);
 
-                EntryProcessor<K, V, ?> entryProcessor = t.get1();
-
-                res = entryProcessor.process(invokeEntry, t.get2());
-            }
-
-            if (res != null)
-                ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult<>(res));
-        }
-        catch (Exception e) {
-            ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult(e));
-        }
+        if (res != null)
+            ret.addEntryProcessResult(txEntry.key(), res);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
index b65fcad..342740f 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
@@ -90,17 +90,15 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
 
     /**
      * @throws Exception If failed.
-     * TODO gg-8273 enable when fixed
      */
-    public void _testTransformTransactionalNoBackups() throws Exception {
+    public void testTransformTransactionalNoBackups() throws Exception {
         checkTransform(TRANSACTIONAL, PRIMARY, 0);
     }
 
     /**
      * @throws Exception If failed.
-     * TODO gg-8273 enable when fixed
      */
-    public void _testTransformTransactionalOneBackup() throws Exception {
+    public void testTransformTransactionalOneBackup() throws Exception {
         checkTransform(TRANSACTIONAL, PRIMARY, 1);
     }