You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/29 14:48:22 UTC
[2/2] incubator-ignite git commit: ignite-44
ignite-44
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/58de0b22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/58de0b22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/58de0b22
Branch: refs/heads/ignite-44-8273
Commit: 58de0b22f9b9ec7b08ae882308ab0b067638923c
Parents: 1600e5c
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 29 16:45:54 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 29 16:45:54 2014 +0300
----------------------------------------------------------------------
.../processors/cache/CacheInvokeResult.java | 9 +-
.../kernal/processors/cache/GridCacheUtils.java | 29 +++++
.../distributed/GridDistributedLockRequest.java | 1 +
.../dht/GridDhtTransactionalCacheAdapter.java | 46 ++++++--
.../colocated/GridDhtColocatedLockFuture.java | 6 ++
.../distributed/near/GridNearLockResponse.java | 106 ++++++++++++++++++-
.../distributed/near/GridNearTxRemote.java | 6 ++
.../cache/transactions/IgniteTxEntry.java | 19 +++-
.../transactions/IgniteTxLocalAdapter.java | 37 +++----
.../GridCacheReturnValueTransferSelfTest.java | 6 +-
10 files changed, 225 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
index ab0959e..4d51c4e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
@@ -45,8 +45,6 @@ public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externaliz
* @param res Computed result.
*/
public CacheInvokeResult(T res) {
- assert res != null;
-
this.res = res;
}
@@ -57,6 +55,13 @@ public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externaliz
this.err = err;
}
+ /**
+ * @return {@code True} if both result and error are {@code null}.
+ */
+ public boolean empty() {
+ return res == null && err == null;
+ }
+
/** {@inheritDoc} */
@Override public Object ggClassId() {
return GG_CLASS_ID;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
index 1646bf1..4e8f985 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
@@ -27,6 +27,7 @@ import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -1671,4 +1672,32 @@ public class GridCacheUtils {
return duration.getTimeUnit().toMillis(duration.getDurationAmount());
}
+
+ /**
+ * @param txEntry Entry.
+ * @param val Value.
+ * @return Invoke result.
+ */
+ @Nullable public static <K, V> CacheInvokeResult<Object> computeInvokeResult(
+ IgniteTxEntry<K, V> txEntry, V val, boolean ignoreNull) {
+ try {
+ Object res = null;
+
+ for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) {
+ CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.key(), val);
+
+ EntryProcessor<K, V, ?> entryProcessor = t.get1();
+
+ res = entryProcessor.process(invokeEntry, t.get2());
+ }
+
+ if (res == null && ignoreNull)
+ return null;
+ else
+ return new CacheInvokeResult<>(res);
+ }
+ catch (Exception e) {
+ return new CacheInvokeResult<>(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java
index 749eff0..c8cdd65 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -106,6 +106,7 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage
}
/**
+ * @param cacheId Cache ID.
* @param nodeId Node ID.
* @param nearXidVer Near transaction ID.
* @param threadId Thread ID.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index b7ff63e..27f7336 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -955,10 +955,17 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
assert mappedVer != null;
assert tx == null || tx.xidVersion().equals(mappedVer);
+ boolean nearCacheReq = U.hasNearCache(nearNode, ctx.name());
+
try {
// Send reply back to originating near node.
GridNearLockResponse<K, V> res = new GridNearLockResponse<>(ctx.cacheId(),
- req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err);
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ tx != null && tx.onePhaseCommit(),
+ entries.size(),
+ err);
if (err == null) {
res.pending(localDhtPendingVersions(entries, mappedVer));
@@ -984,11 +991,29 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
try {
GridCacheVersion ver = e.version();
- boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver);
+ IgniteTxEntry<K, V> writeEntry = null;
+
+ boolean ret;
+
+ if (req.implicitTx()) {
+ ret = req.returnValue(i) ||
+ (nearCacheReq && (dhtVer == null || !dhtVer.equals(ver)));
+ }
+ else
+ ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver);
+
+ boolean invoke = false;
+
+ if (!ret && tx != null && req.hasTransforms()) {
+ writeEntry = tx.entry(ctx.txKey(e.key()));
+
+ if (writeEntry.op() == TRANSFORM)
+ invoke = true;
+ }
V val = null;
- if (ret)
+ if (ret || invoke)
val = e.innerGet(tx,
/*swap*/true,
/*read-through*/true,
@@ -1014,7 +1039,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
boolean filterPassed = false;
if (tx != null && tx.onePhaseCommit()) {
- IgniteTxEntry<K, V> writeEntry = tx.entry(ctx.txKey(e.key()));
+ if (writeEntry == null)
+ writeEntry = tx.entry(ctx.txKey(e.key()));
assert writeEntry != null :
"Missing tx entry for locked cache entry: " + e;
@@ -1032,6 +1058,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
filterPassed,
ver,
mappedVer,
+ invoke ? computeInvokeResult(writeEntry, val, false) : null,
ctx);
}
catch (GridCacheFilterFailedException ex) {
@@ -1043,7 +1070,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
else {
// We include values into response since they are required for local
// calls and won't be serialized. We are also including DHT version.
- res.addValueBytes(null, null, false, e.version(), mappedVer, ctx);
+ res.addValueBytes(null, null, false, e.version(), mappedVer, null, ctx);
}
break;
@@ -1069,8 +1096,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
U.error(log, "Failed to get value for lock reply message for node [node=" +
U.toShortString(nearNode) + ", req=" + req + ']', e);
- return new GridNearLockResponse<>(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false,
- entries.size(), e);
+ return new GridNearLockResponse<>(ctx.cacheId(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ false,
+ entries.size(),
+ e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index e6a4eb7..545ad31 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -1261,6 +1261,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
// In colocated cache we must receive responses only for detached entries.
assert txEntry.cached().detached();
+ if (txEntry.op() == GridCacheOperation.TRANSFORM) {
+ CacheInvokeResult<Object> invokeRes = res.invokeResult(i);
+
+ txEntry.invokeResult(invokeRes);
+ }
+
txEntry.markLocked();
GridDhtDetachedCacheEntry<K, V> entry = (GridDhtDetachedCacheEntry<K, V>)txEntry.cached();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
index 7711470..5ea89ad 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
@@ -49,6 +49,15 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
/** Filter evaluation results for fast-commit transactions. */
private boolean[] filterRes;
+ /** Result for invoke operation. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private List<CacheInvokeResult<Object>> invokeRes;
+
+ /** Serialized results for invoke operation. */
+ @GridDirectCollection(byte[].class)
+ private List<byte[]> invokeResBytes;
+
/**
* Empty constructor (required by {@link Externalizable}).
*/
@@ -148,6 +157,7 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
* @param filterPassed Boolean flag indicating whether filter passed for fast-commit transaction.
* @param dhtVer DHT version.
* @param mappedVer Mapped version.
+ * @param res Result for invoke operation.
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
@@ -157,6 +167,7 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
boolean filterPassed,
@Nullable GridCacheVersion dhtVer,
@Nullable GridCacheVersion mappedVer,
+ @Nullable CacheInvokeResult<Object> res,
GridCacheContext<K, V> ctx
) throws IgniteCheckedException {
int idx = valuesSize();
@@ -167,10 +178,39 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
if (filterRes != null)
filterRes[idx] = filterPassed;
+ if (res != null) {
+ if (invokeRes == null)
+ invokeRes = new ArrayList<>(dhtVers.length);
+
+ invokeRes.add(res);
+ }
+
// Delegate to super.
addValueBytes(val, valBytes, ctx);
}
+ /**
+ * @param idx Key index.
+ * @return Result for invoke operation.
+ */
+ public CacheInvokeResult<Object> invokeResult(int idx) {
+ return invokeRes != null ? invokeRes.get(idx) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ invokeResBytes = marshalCollection(invokeRes, ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ invokeRes = unmarshalCollection(invokeResBytes, ctx, ldr);
+ }
+
/** {@inheritDoc} */
@SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
@Override public GridTcpCommunicationMessageAdapter clone() {
@@ -192,6 +232,8 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
_clone.dhtVers = dhtVers;
_clone.mappedVers = mappedVers;
_clone.filterRes = filterRes;
+ _clone.invokeRes = invokeRes;
+ _clone.invokeResBytes = invokeResBytes;
}
/** {@inheritDoc} */
@@ -244,6 +286,33 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
commState.idx++;
case 13:
+ if (invokeResBytes != null) {
+ if (commState.it == null) {
+ if (!commState.putInt(invokeResBytes.size()))
+ return false;
+
+ commState.it = invokeResBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
+ if (!commState.putByteArray((byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
+ if (!commState.putInt(-1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 14:
if (mappedVers != null) {
if (commState.it == null) {
if (!commState.putInt(mappedVers.length))
@@ -270,13 +339,13 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
commState.idx++;
- case 14:
+ case 15:
if (!commState.putGridUuid(miniId))
return false;
commState.idx++;
- case 15:
+ case 16:
if (pending != null) {
if (commState.it == null) {
if (!commState.putInt(pending.size()))
@@ -365,6 +434,35 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
}
if (commState.readSize >= 0) {
+ if (invokeResBytes == null)
+ invokeResBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
+ byte[] _val = commState.getByteArray();
+
+ if (_val == BYTE_ARR_NOT_READ)
+ return false;
+
+ invokeResBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 14:
+ if (commState.readSize == -1) {
+ if (buf.remaining() < 4)
+ return false;
+
+ commState.readSize = commState.getInt();
+ }
+
+ if (commState.readSize >= 0) {
if (mappedVers == null)
mappedVers = new GridCacheVersion[commState.readSize];
@@ -385,7 +483,7 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
commState.idx++;
- case 14:
+ case 15:
IgniteUuid miniId0 = commState.getGridUuid();
if (miniId0 == GRID_UUID_NOT_READ)
@@ -395,7 +493,7 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
commState.idx++;
- case 15:
+ case 16:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
index c419a48..d08e5e5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -72,6 +72,8 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param ctx Cache registry.
* @param txSize Expected transaction size.
* @param grpLockKey Group lock key if this is a group-lock transaction.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
* @throws IgniteCheckedException If unmarshalling failed.
*/
public GridNearTxRemote(
@@ -130,6 +132,8 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param ctx Cache registry.
* @param txSize Expected transaction size.
* @param grpLockKey Collection of group lock keys if this is a group-lock transaction.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
*/
public GridNearTxRemote(
GridCacheSharedContext<K, V> ctx,
@@ -311,8 +315,10 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
}
/**
+ * @param cacheCtx Cache context.
* @param key Key to add to read set.
* @param keyBytes Key bytes.
+ * @param op Cache operation.
* @param val Value.
* @param valBytes Value bytes.
* @param drVer Data center replication version.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
index 73d17b5..25c1fb3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
@@ -70,10 +70,13 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
/** Filter bytes. */
private byte[] filterBytes;
- /** Transform. */
+ /** EntryProcessors for invoke operation. */
@GridToStringInclude
private Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessorsCol;
+ /** */
+ private CacheInvokeResult<Object> invokeRes;
+
/** Transform closure bytes. */
@GridToStringExclude
private byte[] transformClosBytes;
@@ -624,6 +627,20 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
}
/**
+ * @param invokeRes Remotely computed result for invoke operation.
+ */
+ public void invokeResult(@Nullable CacheInvokeResult<Object> invokeRes) {
+ this.invokeRes = invokeRes;
+ }
+
+ /**
+ * @return Remotely computed result for invoke operation.
+ */
+ @Nullable public CacheInvokeResult<Object> invokeResult() {
+ return invokeRes;
+ }
+
+ /**
* @return Collection of entry processors.
*/
public Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 6380605..472f607 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -2259,7 +2259,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
retval = true;
- if (retval || txEntry.op() == TRANSFORM) {
+ boolean invoke = computeInvoke && txEntry.op() == TRANSFORM && txEntry.invokeResult() == null;
+
+ if (retval || invoke) {
if (!cacheCtx.isNear()) {
try {
if (!hasPrevVal)
@@ -2288,14 +2290,18 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
v = cached.rawGetOrUnmarshal(false);
}
- if (txEntry.op() == TRANSFORM) {
- if (computeInvoke)
- addInvokeResult(txEntry, v, ret);
- }
- else
+ if (retval)
ret.value(v);
}
+ if (computeInvoke && txEntry.op() == TRANSFORM) {
+ CacheInvokeResult<Object> res =
+ invoke ? CU.computeInvokeResult(txEntry, v, true) : txEntry.invokeResult();
+
+ if (res != null && !res.empty())
+ ret.addEntryProcessResult(k, res);
+ }
+
boolean pass = cacheCtx.isAll(cached, filter);
// For remove operation we return true only if we are removing s/t,
@@ -2359,23 +2365,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
* @param ret Return value to update.
*/
private void addInvokeResult(IgniteTxEntry<K, V> txEntry, V val, GridCacheReturn ret) {
- try {
- Object res = null;
-
- for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) {
- CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.key(), val);
+ CacheInvokeResult<Object> res = CU.computeInvokeResult(txEntry, val, true);
- EntryProcessor<K, V, ?> entryProcessor = t.get1();
-
- res = entryProcessor.process(invokeEntry, t.get2());
- }
-
- if (res != null)
- ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult<>(res));
- }
- catch (Exception e) {
- ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult(e));
- }
+ if (res != null)
+ ret.addEntryProcessResult(txEntry.key(), res);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
index b65fcad..342740f 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
@@ -90,17 +90,15 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
/**
* @throws Exception If failed.
- * TODO gg-8273 enable when fixed
*/
- public void _testTransformTransactionalNoBackups() throws Exception {
+ public void testTransformTransactionalNoBackups() throws Exception {
checkTransform(TRANSACTIONAL, PRIMARY, 0);
}
/**
* @throws Exception If failed.
- * TODO gg-8273 enable when fixed
*/
- public void _testTransformTransactionalOneBackup() throws Exception {
+ public void testTransformTransactionalOneBackup() throws Exception {
checkTransform(TRANSACTIONAL, PRIMARY, 1);
}