You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/30 13:11:49 UTC
[04/18] incubator-ignite git commit: # ignite-44
# ignite-44
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/928aa3d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/928aa3d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/928aa3d4
Branch: refs/heads/ignite-1
Commit: 928aa3d48c7a29dc101c10866a8c6bde66492953
Parents: 71ee2ee
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 24 17:45:42 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 24 17:45:42 2014 +0300
----------------------------------------------------------------------
.../processors/cache/CacheInvokeResult.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 108 +++++-
.../processors/cache/GridCacheReturn.java | 57 ++--
.../GridDistributedLockResponse.java | 19 +-
.../GridDistributedTxRemoteAdapter.java | 6 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 9 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 13 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 8 +-
.../dht/atomic/GridDhtAtomicCache.java | 21 +-
.../colocated/GridDhtColocatedLockFuture.java | 68 ++--
.../distributed/near/GridNearLockResponse.java | 1 +
.../cache/transactions/IgniteTxAdapter.java | 16 +-
.../cache/transactions/IgniteTxEntry.java | 77 +++--
.../cache/transactions/IgniteTxHandler.java | 2 +-
.../transactions/IgniteTxLocalAdapter.java | 339 +++++++++++++------
.../cache/transactions/IgniteTxLocalEx.java | 20 +-
.../cache/IgniteCacheInvokeAbstractTest.java | 163 ++++++++-
.../cache/IgniteCacheTxInvokeTest.java | 41 +++
.../cache/GridCacheAbstractFullApiSelfTest.java | 115 ++++---
.../cache/GridCacheAbstractSelfTest.java | 15 +
21 files changed, 817 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
index 50af119..5f472d7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
@@ -25,7 +25,7 @@ public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externaliz
private static final long serialVersionUID = 0L;
/** */
- @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"})
+ @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "JavaAbbreviationUsage", "UnusedDeclaration"})
private static Object GG_CLASS_ID;
/** */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index b3e567c..62daeb9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -2194,22 +2194,97 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(
+ final K key,
+ final EntryProcessor<K, V, T> entryProcessor,
+ final Object... args)
throws EntryProcessorException {
- // TODO IGNITE-44.
- return null;
+ A.notNull(key, "key", entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKey(key);
+
+ ctx.denyOnLocalRead();
+
+ IgniteFuture<?> fut = asyncOp(new AsyncInOp(key) {
+ @Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) {
+ Map<? extends K, EntryProcessor> invokeMap =
+ Collections.singletonMap(key, (EntryProcessor)entryProcessor);
+
+ return tx.invokeAsync(ctx, false, invokeMap, args);
+ }
+
+ @Override public String toString() {
+ return "invokeAsync [key=" + key + ", entryProcessor=" + entryProcessor + ']';
+ }
+ });
+
+ IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut0 =
+ (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)fut;
+
+ return fut0.chain(new CX1<IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>, EntryProcessorResult<T>>() {
+ @Override public EntryProcessorResult<T> applyx(IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut)
+ throws IgniteCheckedException {
+ GridCacheReturn<Map<K, EntryProcessorResult<T>>> ret = fut.get();
+
+ Map<K, EntryProcessorResult<T>> resMap = ret.value();
+
+ assert resMap != null;
+ assert resMap.size() == 1 : resMap.size();
+
+ return resMap.values().iterator().next();
+ }
+ });
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
- EntryProcessor<K, V, T> entryProcessor,
- Object... args) {
- // TODO IGNITE-44.
- return null;
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(
+ final Set<? extends K> keys,
+ final EntryProcessor<K, V, T> entryProcessor,
+ final Object... args) {
+ A.notNull(entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKeys(keys);
+
+ ctx.denyOnLocalRead();
+
+ IgniteFuture<?> fut = asyncOp(new AsyncInOp(keys) {
+ @Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) {
+ Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+ @Override public EntryProcessor apply(K k) {
+ return entryProcessor;
+ }
+ });
+
+ return tx.invokeAsync(ctx, false, invokeMap, args);
+ }
+
+ @Override public String toString() {
+ return "invokeAllAsync [keys=" + keys + ", entryProcessor=" + entryProcessor + ']';
+ }
+ });
+
+ IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut0 =
+ (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)fut;
+
+ return fut0.chain(new CX1<IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>, Map<K, EntryProcessorResult<T>>>() {
+ @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut)
+ throws IgniteCheckedException {
+ GridCacheReturn<Map<K, EntryProcessorResult<T>>> ret = fut.get();
+
+ assert ret != null;
+
+ return ret.value();
+ }
+ });
}
/** {@inheritDoc} */
@Override public void transform(final K key, final IgniteClosure<V, V> transformer) throws IgniteCheckedException {
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
+ /*
A.notNull(key, "key", transformer, "valTransform");
if (keyCheck)
@@ -2226,11 +2301,15 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
return "transform [key=" + key + ", valTransform=" + transformer + ']';
}
});
+ */
}
/** {@inheritDoc} */
@Override public <R> R transformAndCompute(final K key, final IgniteClosure<V, IgniteBiTuple<V, R>> transformer)
throws IgniteCheckedException {
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
+ /*
A.notNull(key, "key", transformer, "transformer");
if (keyCheck)
@@ -2250,6 +2329,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
return "transformAndCompute [key=" + key + ", valTransform=" + transformer + ']';
}
});
+ */
}
/** {@inheritDoc} */
@@ -2291,6 +2371,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
/** {@inheritDoc} */
@Override public IgniteFuture<?> transformAsync(final K key, final IgniteClosure<V, V> transformer,
@Nullable final GridCacheEntryEx<K, V> entry, final long ttl) {
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
+ /*
A.notNull(key, "key", transformer, "transformer");
if (keyCheck)
@@ -2307,6 +2390,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
return "transformAsync [key=" + key + ", valTransform=" + transformer + ']';
}
});
+ */
}
/** {@inheritDoc} */
@@ -2581,6 +2665,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
/** {@inheritDoc} */
@Override public void transformAll(@Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> m)
throws IgniteCheckedException {
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
+ /*
if (F.isEmpty(m))
return;
@@ -2598,6 +2685,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
return "transformAll [map=" + m + ']';
}
});
+ */
}
/** {@inheritDoc} */
@@ -2640,6 +2728,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
/** {@inheritDoc} */
@Override public IgniteFuture<?> transformAllAsync(@Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> m) {
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
+ /*
if (F.isEmpty(m))
return new GridFinishedFuture<>(ctx.kernalContext());
@@ -2657,6 +2748,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
return "transformAllAsync [map=" + m + ']';
}
});
+ */
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java
index e9c476a..ab05b34 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java
@@ -14,7 +14,9 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.grid.util.tostring.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
import java.io.*;
+import java.util.*;
/**
* Return value for cases where both, value and success flag need to be returned.
@@ -24,7 +26,7 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha
private static final long serialVersionUID = 0L;
/** */
- @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"})
+ @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "JavaAbbreviationUsage", "UnusedDeclaration"})
private static Object GG_CLASS_ID;
/** Value. */
@@ -42,13 +44,6 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha
}
/**
- * @param v Value.
- */
- public GridCacheReturn(V v) {
- this.v = v;
- }
-
- /**
*
* @param success Success flag.
*/
@@ -93,17 +88,6 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha
}
/**
- * @param v Value.
- * @return This instance for chaining.
- */
- public GridCacheReturn<V> valueIfNull(V v) {
- if (this.v == null)
- this.v = v;
-
- return this;
- }
-
- /**
* @return Success flag.
*/
public boolean success() {
@@ -123,27 +107,34 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha
}
/**
- * @param v Value.
* @param success Success flag.
* @return This instance for chaining.
*/
- public GridCacheReturn<V> setIfNull(V v, boolean success) {
- if (this.v == null) {
- this.v = v;
- this.success = success;
- }
+ public GridCacheReturn<V> success(boolean success) {
+ this.success = success;
return this;
}
/**
- * @param success Success flag.
- * @return This instance for chaining.
+ * @param key Key.
+ * @param res Result.
*/
- public GridCacheReturn<V> success(boolean success) {
- this.success = success;
+ @SuppressWarnings("unchecked")
+ public synchronized void addEntryProcessResult(Object key, EntryProcessorResult<?> res) {
+ assert v == null || v instanceof Map : v;
+ assert key != null;
+ assert res != null;
- return this;
+ HashMap<Object, EntryProcessorResult> resMap = (HashMap<Object, EntryProcessorResult>)v;
+
+ if (resMap == null) {
+ resMap = new HashMap<>();
+
+ v = (V)resMap;
+ }
+
+ resMap.put(key, res);
}
/** {@inheritDoc} */
@@ -157,11 +148,15 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha
out.writeObject(v);
}
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
success = in.readBoolean();
v = (V)in.readObject();
}
/** {@inheritDoc} */
- @Override public String toString() { return S.toString(GridCacheReturn.class, this); }
+ @Override public String toString() {
+ return S.toString(GridCacheReturn.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java
index 8edfc7c..76fd449 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -57,11 +57,15 @@ public class GridDistributedLockResponse<K, V> extends GridDistributedBaseMessag
}
/**
+ * @param cacheId Cache ID.
* @param lockVer Lock version.
* @param futId Future ID.
* @param cnt Key count.
*/
- public GridDistributedLockResponse(int cacheId, GridCacheVersion lockVer, IgniteUuid futId, int cnt) {
+ public GridDistributedLockResponse(int cacheId,
+ GridCacheVersion lockVer,
+ IgniteUuid futId,
+ int cnt) {
super(lockVer, cnt);
assert futId != null;
@@ -74,11 +78,15 @@ public class GridDistributedLockResponse<K, V> extends GridDistributedBaseMessag
}
/**
+ * @param cacheId Cache ID.
* @param lockVer Lock ID.
* @param futId Future ID.
* @param err Error.
*/
- public GridDistributedLockResponse(int cacheId, GridCacheVersion lockVer, IgniteUuid futId, Throwable err) {
+ public GridDistributedLockResponse(int cacheId,
+ GridCacheVersion lockVer,
+ IgniteUuid futId,
+ Throwable err) {
super(lockVer, 0);
assert futId != null;
@@ -89,12 +97,17 @@ public class GridDistributedLockResponse<K, V> extends GridDistributedBaseMessag
}
/**
+ * @param cacheId Cache ID.
* @param lockVer Lock ID.
* @param futId Future ID.
* @param cnt Count.
* @param err Error.
*/
- public GridDistributedLockResponse(int cacheId, GridCacheVersion lockVer, IgniteUuid futId, int cnt, Throwable err) {
+ public GridDistributedLockResponse(int cacheId,
+ GridCacheVersion lockVer,
+ IgniteUuid futId,
+ int cnt,
+ Throwable err) {
super(lockVer, cnt);
assert futId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 4a93646..d507d0d 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -83,6 +83,8 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
* @param timeout Timeout.
* @param txSize Expected transaction size.
* @param grpLockKey Group lock key if this is a group-lock transaction.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
*/
public GridDistributedTxRemoteAdapter(
GridCacheSharedContext<K, V> ctx,
@@ -325,7 +327,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
else {
// Copy values.
entry.value(e.value(), e.hasWriteValue(), e.hasReadValue());
- entry.transformClosures(e.transformClosures());
+ entry.entryProcessors(e.entryProcessors());
entry.valueBytes(e.valueBytes());
entry.op(e.op());
entry.ttl(e.ttl());
@@ -481,7 +483,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
if (updateNearCache(cacheCtx, txEntry.key(), topVer))
nearCached = cacheCtx.dht().near().peekExx(txEntry.key());
- if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters()))
+ if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
txEntry.cached().unswap(true, false);
GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index dc8ddcb..b7ff63e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -215,7 +215,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.keyBytes() != null ? req.keyBytes().get(i) : null,
writeEntry == null ? null : writeEntry.value(),
writeEntry == null ? null : writeEntry.valueBytes(),
- writeEntry == null ? null : writeEntry.transformClosures(),
+ writeEntry == null ? null : writeEntry.entryProcessors(),
drVer,
req.accessTtl());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 0f11ecc..32e43a7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -340,8 +340,13 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
* @return Future that will be completed when locks are acquired.
*/
public IgniteFuture<IgniteTxEx<K, V>> prepareAsync(@Nullable Iterable<IgniteTxEntry<K, V>> reads,
- @Nullable Iterable<IgniteTxEntry<K, V>> writes, Map<IgniteTxKey<K>, GridCacheVersion> verMap, long msgId,
- IgniteUuid nearMiniId, Map<UUID, Collection<UUID>> txNodes, boolean last, Collection<UUID> lastBackups) {
+ @Nullable Iterable<IgniteTxEntry<K, V>> writes,
+ Map<IgniteTxKey<K>, GridCacheVersion> verMap,
+ long msgId,
+ IgniteUuid nearMiniId,
+ Map<UUID, Collection<UUID>> txNodes,
+ boolean last,
+ Collection<UUID> lastBackups) {
assert optimistic();
// In optimistic mode prepare still can be called explicitly from salvageTx.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index b752178..55d8f7a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -406,7 +406,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
if (entry != null) {
entry.op(e.op()); // Absolutely must set operation, as default is DELETE.
entry.value(e.value(), e.hasWriteValue(), e.hasReadValue());
- entry.transformClosures(e.transformClosures());
+ entry.entryProcessors(e.entryProcessors());
entry.valueBytes(e.valueBytes());
entry.ttl(e.ttl());
entry.filters(e.filters());
@@ -525,11 +525,13 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
cached.unswap(!read, read);
- IgniteTxEntry<K, V> w = writeEntries == null ? null : writeEntries.get(idx++);
+ IgniteTxEntry<K, V>
+ w = writeEntries == null ? null : writeEntries.get(idx++);
txEntry = addEntry(NOOP,
null,
null,
+ null,
cached,
null,
CU.<K, V>empty(),
@@ -545,7 +547,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
txEntry.value(w.value(), w.hasWriteValue(), w.hasReadValue());
txEntry.valueBytes(w.valueBytes());
txEntry.drVersion(w.drVersion());
- txEntry.transformClosures(w.transformClosures());
+ txEntry.entryProcessors(w.entryProcessors());
txEntry.ttl(w.ttl());
txEntry.filters(w.filters());
txEntry.drExpireTime(w.drExpireTime());
@@ -635,14 +637,13 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
postLockWrite(cacheCtx,
passedKeys,
skipped,
- null,
- null,
ret,
/*remove*/false,
/*retval*/false,
/*read*/read,
accessTtl,
- filter == null ? CU.<K, V>empty() : filter);
+ filter == null ? CU.<K, V>empty() : filter,
+ /**computeInvoke*/false);
return ret;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 97ec1af..2b4491f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -16,9 +16,11 @@ import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
@@ -280,7 +282,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param val Value.
* @param valBytes Value bytes.
* @param drVer Data center replication version.
- * @param clos Transform closures.
+ * @param entryProcessors Entry processors.
* @param ttl TTL.
*/
public void addWrite(GridCacheContext<K, V> cacheCtx,
@@ -289,7 +291,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
byte[] keyBytes,
@Nullable V val,
@Nullable byte[] valBytes,
- @Nullable Collection<IgniteClosure<V, V>> clos,
+ @Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors,
@Nullable GridCacheVersion drVer,
long ttl) {
checkInternal(key);
@@ -310,7 +312,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
txEntry.keyBytes(keyBytes);
txEntry.valueBytes(valBytes);
- txEntry.transformClosures(clos);
+ txEntry.entryProcessors(entryProcessors);
writeMap.put(key, txEntry);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index fec59b2..78d92f8 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -315,7 +315,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
false,
entry,
- ttl,
filter);
}
@@ -331,7 +330,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
entry,
- ttl,
filter);
}
@@ -412,7 +410,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
true,
null,
- 0,
ctx.equalsPeekArray(oldVal));
}
@@ -433,7 +430,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
null,
- 0,
filter);
}
@@ -454,7 +450,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
null,
- 0,
null);
}
@@ -648,18 +643,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.denyOnLocalRead();
- Map<? extends K, EntryProcessor> transformMap =
+ Map<? extends K, EntryProcessor> invokeMap =
Collections.singletonMap(key, (EntryProcessor)entryProcessor);
IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
- transformMap,
+ invokeMap,
args,
null,
null,
true,
false,
null,
- -1L,
null);
return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
@@ -687,24 +681,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.denyOnLocalRead();
- Map<? extends K, EntryProcessor> transformMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+ Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
@Override public EntryProcessor apply(K k) {
return entryProcessor;
}
});
- IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
- transformMap,
+ return updateAllAsync0(null,
+ invokeMap,
args,
null,
null,
true,
false,
null,
- -1L,
null);
-
- return fut;
}
/**
@@ -718,7 +709,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
* @param cached Cached cache entry for key. May be passed if and only if map size is {@code 1}.
- * @param ttl Entry time-to-live.
* @param filter Cache entry filter for atomic updates.
* @return Completion future.
*/
@@ -731,7 +721,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean retval,
final boolean rawRetval,
@Nullable GridCacheEntryEx<K, V> cached,
- long ttl,
@Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter
) {
if (map != null && keyCheck)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 8c8a8e5..e6a4eb7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -164,15 +164,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
* @return Participating nodes.
*/
@Override public Collection<? extends ClusterNode> nodes() {
- return
- F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
+ return F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() {
+ @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) {
+ if (isMini(f))
+ return ((MiniFuture)f).node();
- return cctx.discovery().localNode();
- }
- });
+ return cctx.discovery().localNode();
+ }
+ });
}
/** {@inheritDoc} */
@@ -272,18 +271,38 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
}
else {
// Check transaction entries (corresponding tx entries must be enlisted in transaction).
- cand = new GridCacheMvccCandidate<>(entry, cctx.localNodeId(),
- null, null, threadId, lockVer, timeout, true, tx.entry(entry.txKey()).locked(), inTx(),
- inTx() && tx.implicitSingle(), false, false);
+ cand = new GridCacheMvccCandidate<>(entry,
+ cctx.localNodeId(),
+ null,
+ null,
+ threadId,
+ lockVer,
+ timeout,
+ true,
+ tx.entry(entry.txKey()).locked(),
+ inTx(),
+ inTx() && tx.implicitSingle(),
+ false,
+ false);
cand.topologyVersion(topSnapshot.get().topologyVersion());
}
}
else {
if (cand == null) {
- cand = new GridCacheMvccCandidate<>(entry, cctx.localNodeId(),
- null, null, threadId, lockVer, timeout, true, false, inTx(),
- inTx() && tx.implicitSingle(), false, false);
+ cand = new GridCacheMvccCandidate<>(entry,
+ cctx.localNodeId(),
+ null,
+ null,
+ threadId,
+ lockVer,
+ timeout,
+ true,
+ false,
+ inTx(),
+ inTx() && tx.implicitSingle(),
+ false,
+ false);
cand.topologyVersion(topSnapshot.get().topologyVersion());
}
@@ -611,8 +630,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (mapAsPrimary(keys, topVer))
return;
- ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings =
- new ConcurrentLinkedDeque8<>();
+ ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings = new ConcurrentLinkedDeque8<>();
// Assign keys to primary nodes.
GridNearLockMapping<K, V> map = null;
@@ -1270,10 +1288,20 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
else
cctx.mvcc().markExplicitOwner(k, threadId);
- if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
- cctx.events().addEvent(cctx.affinity().partition(k), k, tx, null,
- EVT_CACHE_OBJECT_READ, newVal, newVal != null || newBytes != null,
- null, false, CU.subjectId(tx, cctx.shared()), null, tx == null ? null : tx.resolveTaskName());
+ if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ cctx.events().addEvent(cctx.affinity().partition(k),
+ k,
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ newVal,
+ newVal != null || newBytes != null,
+ null,
+ false,
+ CU.subjectId(tx, cctx.shared()),
+ null,
+ tx == null ? null : tx.resolveTaskName());
+ }
i++;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
index 7a0c2fd..7711470 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
@@ -57,6 +57,7 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
}
/**
+ * @param cacheId Cache ID.
* @param lockVer Lock ID.
* @param futId Future ID.
* @param miniId Mini future ID.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
index 29e33b8..6fb77a1 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1169,7 +1169,8 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
* @throws IgniteCheckedException If failed to get previous value for transform.
* @throws GridCacheEntryRemovedException If entry was concurrently deleted.
*/
- protected GridTuple3<GridCacheOperation, V, byte[]> applyTransformClosures(IgniteTxEntry<K, V> txEntry,
+ protected GridTuple3<GridCacheOperation, V, byte[]> applyTransformClosures(
+ IgniteTxEntry<K, V> txEntry,
boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException {
GridCacheContext cacheCtx = txEntry.context();
@@ -1177,7 +1178,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
if (isSystemInvalidate())
return F.t(cacheCtx.isStoreEnabled() ? RELOAD : DELETE, null, null);
- if (F.isEmpty(txEntry.transformClosures()))
+ if (F.isEmpty(txEntry.entryProcessors()))
return F.t(txEntry.op(), txEntry.value(), txEntry.valueBytes());
else {
try {
@@ -1193,19 +1194,12 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
/*event*/recordEvt,
/*temporary*/true,
/*subjId*/subjId,
- /**closure name */recordEvt ? F.first(txEntry.transformClosures()) : null,
+ /**closure name */recordEvt ? F.first(txEntry.entryProcessors()) : null,
resolveTaskName(),
CU.<K, V>empty(),
null);
- try {
- for (IgniteClosure<V, V> clos : txEntry.transformClosures())
- val = clos.apply(val);
- }
- catch (Throwable e) {
- throw new IgniteException("Transform closure must not throw any exceptions " +
- "(transaction will be invalidated)", e);
- }
+ val = txEntry.applyEntryProcessors(val);
GridCacheOperation op = val == null ? DELETE : UPDATE;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
index 17b153d..73d17b5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
@@ -22,6 +22,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.*;
@@ -71,7 +72,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
/** Transform. */
@GridToStringInclude
- private Collection<IgniteClosure<V, V>> transformClosCol;
+ private Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessorsCol;
/** Transform closure bytes. */
@GridToStringExclude
@@ -192,7 +193,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
* @param tx Owning transaction.
* @param op Operation.
* @param val Value.
- * @param transformClos Transform closure.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param ttl Time to live.
* @param entry Cache entry.
* @param filters Put filters.
@@ -202,9 +204,10 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
IgniteTxEx<K, V> tx,
GridCacheOperation op,
V val,
- IgniteClosure<V, V> transformClos,
+ EntryProcessor<K, V, ?> entryProcessor,
+ Object[] invokeArgs,
long ttl,
- GridCacheEntryEx<K,V> entry,
+ GridCacheEntryEx<K, V> entry,
IgnitePredicate<GridCacheEntry<K, V>>[] filters,
GridCacheVersion drVer) {
assert ctx != null;
@@ -220,8 +223,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
this.filters = filters;
this.drVer = drVer;
- if (transformClos != null)
- addTransformClosure(transformClos);
+ if (entryProcessor != null)
+ addEntryProcessor(entryProcessor, invokeArgs);
key = entry.key();
keyBytes = entry.keyBytes();
@@ -299,7 +302,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
cp.filters = filters;
cp.val.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
cp.val.valueBytes(val.valueBytes());
- cp.transformClosCol = transformClosCol;
+ cp.entryProcessorsCol = entryProcessorsCol;
cp.ttl = ttl;
cp.drExpireTime = drExpireTime;
cp.explicitVer = explicitVer;
@@ -605,13 +608,14 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
}
/**
- * @param transformClos Transform closure.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for EntryProcessor.
*/
- public void addTransformClosure(IgniteClosure<V, V> transformClos) {
- if (transformClosCol == null)
- transformClosCol = new LinkedList<>();
+ public void addEntryProcessor(EntryProcessor<K, V, ?> entryProcessor, Object[] invokeArgs) {
+ if (entryProcessorsCol == null)
+ entryProcessorsCol = new LinkedList<>();
- transformClosCol.add(transformClos);
+ entryProcessorsCol.add(new T2<EntryProcessor<K, V, ?>, Object[]>(entryProcessor, invokeArgs));
// Must clear transform closure bytes since collection has changed.
transformClosBytes = null;
@@ -620,17 +624,41 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
}
/**
- * @return Collection of transform closures.
+ * @return Collection of entry processors.
*/
- public Collection<IgniteClosure<V, V>> transformClosures() {
- return transformClosCol;
+ public Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors() {
+ return entryProcessorsCol;
+ }
+
+ /**
+ * @param val Value.
+ * @return New value.
+ */
+ @SuppressWarnings("unchecked")
+ public V applyEntryProcessors(V val) {
+ for (T2<EntryProcessor<K, V, ?>, Object[]> t : entryProcessors()) {
+ try {
+ CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(key, val);
+
+ EntryProcessor processor = t.get1();
+
+ processor.process(invokeEntry, t.get2());
+
+ val = invokeEntry.getValue();
+ }
+ catch (Exception ignore) {
+ // No-op.
+ }
+ }
+
+ return val;
}
/**
- * @param transformClosCol Collection of transform closures.
+ * @param entryProcessorsCol Collection of entry processors.
*/
- public void transformClosures(@Nullable Collection<IgniteClosure<V, V>> transformClosCol) {
- this.transformClosCol = transformClosCol;
+ public void entryProcessors(@Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessorsCol) {
+ this.entryProcessorsCol = entryProcessorsCol;
// Must clear transform closure bytes since collection has changed.
transformClosBytes = null;
@@ -740,8 +768,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
if (keyBytes == null)
keyBytes = entry.getOrMarshalKeyBytes();
- if (transformClosBytes == null && transformClosCol != null)
- transformClosBytes = CU.marshal(ctx, transformClosCol);
+ if (transformClosBytes == null && entryProcessorsCol != null)
+ transformClosBytes = CU.marshal(ctx, entryProcessorsCol);
if (F.isEmptyOrNulls(filters))
filterBytes = null;
@@ -781,8 +809,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
key = ctx.marshaller().unmarshal(keyBytes, clsLdr);
// Unmarshal transform closure anyway if it exists.
- if (transformClosBytes != null && transformClosCol == null)
- transformClosCol = ctx.marshaller().unmarshal(transformClosBytes, clsLdr);
+ if (transformClosBytes != null && entryProcessorsCol == null)
+ entryProcessorsCol = ctx.marshaller().unmarshal(transformClosBytes, clsLdr);
if (filters == null && filterBytes != null) {
filters = ctx.marshaller().unmarshal(filterBytes, clsLdr);
@@ -820,7 +848,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
}
else {
out.writeObject(key);
- U.writeCollection(out, transformClosCol);
+ U.writeCollection(out, entryProcessorsCol);
U.writeArray(out, filters);
}
@@ -850,7 +878,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
}
else {
key = (K)in.readObject();
- transformClosCol = U.readCollection(in);
+ entryProcessorsCol = U.readCollection(in);
filters = U.readEntryFilterArray(in);
}
@@ -1022,6 +1050,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
}
/**
+ * @param sharedCtx Shared cache context.
* @param ctx Cache context.
* @param depEnabled Deployment enabled flag.
* @throws IgniteCheckedException If marshaling failed.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
index 1d4b5d7..7284161 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
@@ -1155,7 +1155,7 @@ public class IgniteTxHandler<K, V> {
txEntry.keyBytes(),
txEntry.value(),
txEntry.valueBytes(),
- txEntry.transformClosures(),
+ txEntry.entryProcessors(),
txEntry.drVersion(),
txEntry.ttl());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 84ef5b7..34938d5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -28,6 +28,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.*;
@@ -456,7 +457,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
boolean intercept = e.context().config().getInterceptor() != null;
- if (intercept || !F.isEmpty(e.transformClosures()))
+ if (intercept || !F.isEmpty(e.entryProcessors()))
e.cached().unswap(true, false);
GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(e, false);
@@ -645,7 +646,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (cacheCtx.isNear())
((GridNearCacheEntry<K, V>)cached).recordDhtVersion(txEntry.dhtVersion());
- if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters()))
+ if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
txEntry.cached().unswap(true, false);
GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry,
@@ -702,7 +703,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
txEntry.value(val, true, false);
txEntry.valueBytes(valBytes);
txEntry.op(op);
- txEntry.transformClosures(null);
+ txEntry.entryProcessors(null);
txEntry.drVersion(explicitVer);
}
@@ -1061,10 +1062,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
// Read value from locked entry in group-lock transaction as well.
if (txEntry.hasValue()) {
- if (!F.isEmpty(txEntry.transformClosures())) {
- for (IgniteClosure<V, V> clos : txEntry.transformClosures())
- val = clos.apply(val);
- }
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
if (val != null) {
V val0 = val;
@@ -1082,7 +1081,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
try {
Object transformClo =
(txEntry.op() == TRANSFORM && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
- F.first(txEntry.transformClosures()) : null;
+ F.first(txEntry.entryProcessors()) : null;
val = txEntry.cached().innerGet(this,
/*swap*/true,
@@ -1102,10 +1101,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (!readCommitted())
txEntry.readValue(val);
- if (!F.isEmpty(txEntry.transformClosures())) {
- for (IgniteClosure<V, V> clos : txEntry.transformClosures())
- val = clos.apply(val);
- }
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
V val0 = val;
@@ -1195,6 +1192,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
txEntry = addEntry(READ,
val,
null,
+ null,
entry,
expiryPlc,
filter,
@@ -1229,6 +1227,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
txEntry = addEntry(READ,
val,
null,
+ null,
entry,
expiryPlc,
CU.<K, V>empty(),
@@ -1344,10 +1343,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (!readCommitted())
txEntry.readValue(val);
- if (!F.isEmpty(txEntry.transformClosures())) {
- for (IgniteClosure<V, V> clos : txEntry.transformClosures())
- visibleVal = clos.apply(visibleVal);
- }
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ visibleVal = txEntry.applyEntryProcessors(visibleVal);
}
// In pessimistic mode we hold the lock, so filter validation
@@ -1560,9 +1557,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
try {
Object transformClo =
- (!F.isEmpty(txEntry.transformClosures()) &&
+ (!F.isEmpty(txEntry.entryProcessors()) &&
cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
- F.first(txEntry.transformClosures()) : null;
+ F.first(txEntry.entryProcessors()) : null;
V val = cached.innerGet(IgniteTxLocalAdapter.this,
cacheCtx.isSwapOrOffheapEnabled(),
@@ -1584,10 +1581,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
txEntry.setAndMarkValid(val);
- if (!F.isEmpty(txEntry.transformClosures())) {
- for (IgniteClosure<V, V> clos : txEntry.transformClosures())
- val = clos.apply(val);
- }
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
if (cacheCtx.portableEnabled())
val = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
@@ -1711,10 +1706,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (!readCommitted())
txEntry.readValue(val);
- if (!F.isEmpty(txEntry.transformClosures())) {
- for (IgniteClosure<V, V> clos : txEntry.transformClosures())
- val = clos.apply(val);
- }
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
retMap.put(entry.getKey(), val);
}
@@ -1736,6 +1729,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public IgniteFuture<GridCacheReturn<V>> putAllAsync(
GridCacheContext<K, V> cacheCtx,
Map<? extends K, ? extends V> map,
@@ -1744,7 +1738,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
long ttl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter
) {
- return putAllAsync0(cacheCtx, map, null, null, retval, cached, ttl, filter);
+ return (IgniteFuture<GridCacheReturn<V>>)putAllAsync0(cacheCtx,
+ map,
+ null,
+ null,
+ null,
+ retval,
+ cached,
+ filter);
}
/** {@inheritDoc} */
@@ -1752,18 +1753,32 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
GridCacheContext<K, V> cacheCtx,
Map<? extends K, GridCacheDrInfo<V>> drMap
) {
- return putAllAsync0(cacheCtx, null, null, drMap, false, null, -1, null);
+ return putAllAsync0(cacheCtx,
+ null,
+ null,
+ null,
+ drMap,
+ false,
+ null,
+ null);
}
/** {@inheritDoc} */
- @Override public IgniteFuture<GridCacheReturn<V>> transformAllAsync(
+ @SuppressWarnings("unchecked")
+ @Override public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync(
GridCacheContext<K, V> cacheCtx,
- @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> map,
boolean retval,
- @Nullable GridCacheEntryEx<K, V> cached,
- long ttl
+ @Nullable Map<? extends K, EntryProcessor> map,
+ Object... invokeArgs
) {
- return putAllAsync0(cacheCtx, null, map, null, retval, null, -1, null);
+ return (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)putAllAsync0(cacheCtx,
+ null,
+ map,
+ invokeArgs,
+ null,
+ retval,
+ null,
+ null);
}
/** {@inheritDoc} */
@@ -1796,7 +1811,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
* @param expiryPlc Explicitly specified expiry policy for entry.
* @param implicit Implicit flag.
* @param lookup Value lookup map ({@code null} for remove).
- * @param transformMap Map with transform closures if this is a transform operation.
+ * @param invokeMap Map with entry processors for invoke operation.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param retval Flag indicating whether a value should be returned.
* @param lockOnly If {@code true}, then entry will be enlisted as noop.
* @param filter User filters.
@@ -1807,13 +1823,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
* @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
*/
protected IgniteFuture<Set<K>> enlistWrite(
- GridCacheContext<K, V> cacheCtx,
+ final GridCacheContext<K, V> cacheCtx,
Collection<? extends K> keys,
@Nullable GridCacheEntryEx<K, V> cached,
@Nullable ExpiryPolicy expiryPlc,
boolean implicit,
@Nullable Map<? extends K, ? extends V> lookup,
- @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+ @Nullable Map<? extends K, EntryProcessor> invokeMap,
+ @Nullable Object[] invokeArgs,
boolean retval,
boolean lockOnly,
IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@@ -1834,18 +1851,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
Set<K> skipped = null;
- boolean rmv = lookup == null && transformMap == null;
+ Set<K> missedForInvoke = null;
+
+ boolean rmv = lookup == null && invokeMap == null;
try {
// Set transform flag for transaction.
- if (transformMap != null)
+ if (invokeMap != null)
transform = true;
groupLockSanityCheck(cacheCtx, keys);
for (K key : keys) {
V val = rmv || lookup == null ? null : lookup.get(key);
- IgniteClosure<V, V> transformClo = transformMap == null ? null : transformMap.get(key);
+ EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key);
GridCacheVersion drVer;
long drTtl;
@@ -1876,7 +1895,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (key == null)
continue;
- if (!rmv && val == null && transformClo == null) {
+ if (!rmv && val == null && entryProcessor == null) {
skipped = skip(skipped, key);
continue;
@@ -1930,7 +1949,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
/*events*/retval,
/*temporary*/false,
CU.subjectId(this, cctx),
- transformClo,
+ entryProcessor,
resolveTaskName(),
CU.<K, V>empty(),
null);
@@ -1952,7 +1971,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (!readCommitted() && old != null) {
// Enlist failed filters as reads for non-read-committed mode,
// so future ops will get the same values.
- txEntry = addEntry(READ, old, null, entry, null, CU.<K, V>empty(), false, -1L, -1L,
+ txEntry = addEntry(READ,
+ old,
+ null,
+ null,
+ entry,
+ null,
+ CU.<K, V>empty(),
+ false,
+ -1L,
+ -1L,
null);
txEntry.markValid();
@@ -1964,9 +1992,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
break; // While.
}
- txEntry = addEntry(lockOnly ? NOOP : rmv ? DELETE : transformClo != null ? TRANSFORM :
- old != null ? UPDATE : CREATE, val, transformClo, entry, expiryPlc, filter, true, drTtl,
- drExpireTime, drVer);
+ GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+ entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+
+ txEntry = addEntry(op,
+ val,
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer);
if (!implicit() && readCommitted())
cacheCtx.evicts().touch(entry, topologyVersion());
@@ -2013,15 +2052,39 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
}
);
}
- else
- ret.set(null, true);
+ else {
+ if (retval)
+ ret.set(null, true);
+ else {
+ if (txEntry.op() == TRANSFORM) {
+ if (missedForInvoke == null)
+ missedForInvoke = new HashSet<>();
+
+ missedForInvoke.add(key);
+ }
+ else
+ ret.success(true);
+ }
+ }
+ }
+ else {
+ if (retval)
+ ret.set(old, true);
+ else {
+ if (txEntry.op() == TRANSFORM)
+ addInvokeResult(txEntry, old, ret);
+ else
+ ret.success(true);
+ }
}
- else
- ret.set(old, true);
}
// Pessimistic.
- else
- ret.set(old, true);
+ else {
+ if (retval)
+ ret.set(old, true);
+ else
+ ret.success(true);
+ }
break; // While.
}
@@ -2032,7 +2095,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
}
}
else {
- if (transformClo == null && txEntry.op() == TRANSFORM)
+ if (entryProcessor == null && txEntry.op() == TRANSFORM)
throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
"transaction after transform closure is applied): " + key);
@@ -2051,9 +2114,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
continue;
}
- txEntry = addEntry(rmv ? DELETE : transformClo != null ? TRANSFORM :
- v != null ? UPDATE : CREATE, val, transformClo, entry, expiryPlc, filter, true, drTtl,
- drExpireTime, drVer);
+ GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
+ v != null ? UPDATE : CREATE;
+
+ txEntry = addEntry(op,
+ val,
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer);
enlisted.add(key);
}
@@ -2061,8 +2135,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (!pessimistic()) {
txEntry.markValid();
- // Set tx entry and return values.
- ret.set(v, true);
+ if (retval)
+ ret.set(v, true);
}
}
}
@@ -2071,6 +2145,38 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
return new GridFinishedFuture<>(cctx.kernalContext(), e);
}
+ if (missedForInvoke != null) {
+ assert optimistic();
+ assert invokeMap != null;
+
+ IgniteFuture<Boolean> fut = loadMissing(
+ cacheCtx,
+ true,
+ missedForInvoke,
+ deserializePortables(cacheCtx),
+ new CI2<K, V>() {
+ @Override public void apply(K k, V v) {
+ if (log.isDebugEnabled())
+ log.debug("Loaded value from remote node [key=" + k + ", val=" + v + ']');
+
+ addInvokeResult(entry(new IgniteTxKey<>(k, cacheCtx.cacheId())), v, ret);
+ }
+ });
+
+ return new GridEmbeddedFuture<>(
+ cctx.kernalContext(),
+ fut,
+ new C2<Boolean, Exception, Set<K>>() {
+ @Override public Set<K> apply(Boolean b, Exception e) {
+ if (e != null)
+ throw new GridClosureException(e);
+
+ return Collections.emptySet();
+ }
+ }
+ );
+ }
+
return new GridFinishedFuture<>(cctx.kernalContext(), skipped);
}
@@ -2080,8 +2186,6 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
* @param cacheCtx Context.
* @param keys Keys.
* @param failed Collection of potentially failed keys (need to populate in this method).
- * @param transformed Output map where transformed values will be placed.
- * @param transformMap Transform map.
* @param ret Return value.
* @param rmv {@code True} if remove.
* @param retval Flag to return value or not.
@@ -2090,19 +2194,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
* @param filter Filter to check entries.
* @return Failed keys.
* @throws IgniteCheckedException If error.
+ * @param computeInvoke If {@code true} computes return value for invoke operation.
*/
+ @SuppressWarnings("unchecked")
protected Set<K> postLockWrite(
GridCacheContext<K, V> cacheCtx,
Iterable<? extends K> keys,
Set<K> failed,
- @Nullable Map<K, V> transformed,
- @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
- GridCacheReturn<V> ret,
+ GridCacheReturn ret,
boolean rmv,
boolean retval,
boolean read,
long accessTtl,
- IgnitePredicate<GridCacheEntry<K, V>>[] filter
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+ boolean computeInvoke
) throws IgniteCheckedException {
for (K k : keys) {
IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(k));
@@ -2132,7 +2237,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
retval = true;
- if (retval) {
+ if (retval || txEntry.op() == TRANSFORM) {
if (!cacheCtx.isNear()) {
try {
if (!hasPrevVal)
@@ -2161,7 +2266,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
v = retval ? cached.rawGetOrUnmarshal(false) : cached.rawGet();
}
- ret.value(v);
+ if (txEntry.op() == TRANSFORM) {
+ if (computeInvoke)
+ addInvokeResult(txEntry, v, ret);
+ }
+ else
+ ret.value(v);
}
boolean pass = cacheCtx.isAll(cached, filter);
@@ -2185,7 +2295,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
failed = skip(failed, k);
// Revert operation to previous. (if no - NOOP, so entry will be unlocked).
- txEntry.setAndMarkValid(txEntry.previousOperation(), ret.value());
+ txEntry.setAndMarkValid(txEntry.previousOperation(), (V)ret.value());
txEntry.filters(CU.<K, V>empty());
txEntry.filtersSet(false);
@@ -2222,33 +2332,58 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
}
/**
+ * @param txEntry Entry.
+ * @param val Value.
+ * @param ret Return value to update.
+ */
+ private void addInvokeResult(IgniteTxEntry<K, V> txEntry, V val, GridCacheReturn ret) {
+ try {
+ Object res = null;
+
+ for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) {
+ CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.key(), val);
+
+ EntryProcessor<K, V, ?> entryProcessor = t.get1();
+
+ res = entryProcessor.process(invokeEntry, t.get2());
+ }
+
+ ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult<>(res));
+ }
+ catch (Exception e) {
+ ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult(e));
+ }
+ }
+
+ /**
* Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
* maps must be non-null.
*
* @param cacheCtx Context.
* @param map Key-value map to store.
- * @param transformMap Transform map.
+ * @param invokeMap Invoke map.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param drMap DR map.
* @param retval Key-transform value map to store.
* @param cached Cached entry, if any.
- * @param ttl Time to live.
* @param filter Filter.
* @return Operation future.
*/
- private IgniteFuture<GridCacheReturn<V>> putAllAsync0(
+ @SuppressWarnings("unchecked")
+ private IgniteFuture putAllAsync0(
final GridCacheContext<K, V> cacheCtx,
@Nullable Map<? extends K, ? extends V> map,
- @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+ @Nullable Map<? extends K, EntryProcessor> invokeMap,
+ @Nullable final Object[] invokeArgs,
@Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap,
final boolean retval,
@Nullable GridCacheEntryEx<K, V> cached,
- long ttl,
@Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
cacheCtx.checkSecurity(GridSecurityPermission.CACHE_PUT);
// Cached entry may be passed only from entry wrapper.
final Map<K, V> map0;
- final Map<K, IgniteClosure<V, V>> transformMap0;
+ final Map<K, EntryProcessor> invokeMap0;
if (drMap != null) {
assert map == null;
@@ -2259,7 +2394,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
}
});
- transformMap0 = null;
+ invokeMap0 = null;
}
else if (cacheCtx.portableEnabled()) {
if (map != null) {
@@ -2280,14 +2415,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
else
map0 = null;
- if (transformMap != null) {
- transformMap0 = U.newHashMap(transformMap.size());
+ if (invokeMap != null) {
+ invokeMap0 = U.newHashMap(invokeMap.size());
try {
- for (Map.Entry<? extends K, ? extends IgniteClosure<V, V>> e : transformMap.entrySet()) {
+ for (Map.Entry<? extends K, EntryProcessor> e : invokeMap.entrySet()) {
K key = (K)cacheCtx.marshalToPortable(e.getKey());
- transformMap0.put(key, e.getValue());
+ invokeMap0.put(key, e.getValue());
}
}
catch (PortableException e) {
@@ -2295,19 +2430,19 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
}
}
else
- transformMap0 = null;
+ invokeMap0 = null;
}
else {
map0 = (Map<K, V>)map;
- transformMap0 = (Map<K, IgniteClosure<V, V>>)transformMap;
+ invokeMap0 = (Map<K, EntryProcessor>)invokeMap;
}
if (log.isDebugEnabled())
log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
- assert map0 != null || transformMap0 != null;
+ assert map0 != null || invokeMap0 != null;
assert cached == null ||
- (map0 != null && map0.size() == 1) || (transformMap0 != null && transformMap0.size() == 1);
+ (map0 != null && map0.size() == 1) || (invokeMap0 != null && invokeMap0.size() == 1);
try {
checkValid();
@@ -2320,7 +2455,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
final GridCacheReturn<V> ret = new GridCacheReturn<>(false);
- if (F.isEmpty(map0) && F.isEmpty(transformMap0)) {
+ if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
if (implicit())
try {
commit();
@@ -2333,7 +2468,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
}
try {
- Set<? extends K> keySet = map0 != null ? map0.keySet() : transformMap0.keySet();
+ Set<? extends K> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
Collection<K> enlisted = new LinkedList<>();
@@ -2346,7 +2481,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
prj != null ? prj.expiry() : null,
implicit,
map0,
- transformMap0,
+ invokeMap0,
+ invokeArgs,
retval,
false,
filter,
@@ -2390,19 +2526,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (log.isDebugEnabled())
log.debug("Acquired transaction lock for put on keys: " + keys);
- Map<K, V> transformed = null;
-
postLockWrite(cacheCtx,
keys,
loaded,
- transformed,
- transformMap0,
ret,
/*remove*/false,
retval,
/*read*/false,
-1L,
- filter);
+ filter,
+ /*computeInvoke*/true);
return ret;
}
@@ -2554,7 +2687,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
plc,
implicit,
/** lookup map */null,
- /** transform map */null,
+ /** invoke map */null,
+ /** invoke arguments */null,
retval,
/** lock only */false,
filter,
@@ -2595,14 +2729,13 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
postLockWrite(cacheCtx,
passedKeys,
loadFut.get(),
- null,
- null,
ret,
/*remove*/true,
retval,
/*read*/false,
-1L,
- filter);
+ filter,
+ /*computeInvoke*/false);
return ret;
}
@@ -2650,6 +2783,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
/**
* Checks if portable values should be deserialized.
*
+ * @param cacheCtx Cache context.
* @return {@code True} if portables should be deserialized, {@code false} otherwise.
*/
private boolean deserializePortables(GridCacheContext<K, V> cacheCtx) {
@@ -2670,6 +2804,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
/**
* Checks that affinity keys are enlisted in group transaction on start.
*
+ * @param cacheCtx Cache context.
* @param keys Keys to check.
* @throws IgniteCheckedException If sanity check failed.
*/
@@ -2721,7 +2856,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
/** expiry - leave unchanged */null,
/** implicit */false,
/** lookup map */null,
- /** transform map */null,
+ /** invoke map */null,
+ /** invoke arguments */null,
/** retval */false,
/** lock only */true,
CU.<K, V>empty(),
@@ -2842,7 +2978,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
* @param op Cache operation.
* @param val Value.
* @param expiryPlc Explicitly specified expiry policy.
- * @param transformClos Transform closure.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param entryProcessor Entry processor.
* @param entry Cache entry.
* @param filter Filter.
* @param filtersSet {@code True} if filter should be marked as set.
@@ -2853,7 +2990,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
*/
protected final IgniteTxEntry<K, V> addEntry(GridCacheOperation op,
@Nullable V val,
- @Nullable IgniteClosure<V, V> transformClos,
+ @Nullable EntryProcessor entryProcessor,
+ Object[] invokeArgs,
GridCacheEntryEx<K, V> entry,
@Nullable ExpiryPolicy expiryPlc,
IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@@ -2861,6 +2999,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
long drTtl,
long drExpireTime,
@Nullable GridCacheVersion drVer) {
+ assert invokeArgs == null || op == TRANSFORM;
+
IgniteTxKey<K> key = entry.txKey();
checkInternal(key);
@@ -2883,12 +3023,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
IgniteTxEntry<K, V> txEntry;
if (old != null) {
- if (transformClos != null) {
+ if (entryProcessor != null) {
assert val == null;
assert op == TRANSFORM;
// Will change the op.
- old.addTransformClosure(transformClos);
+ old.addEntryProcessor(entryProcessor, invokeArgs);
}
else {
assert old.op() != TRANSFORM;
@@ -2922,7 +3062,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
this,
op,
val,
- transformClos,
+ entryProcessor,
+ invokeArgs,
hasDrTtl ? drTtl : -1L,
entry,
filter,