You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/25 15:29:38 UTC
[2/3] incubator-ignite git commit: # ignite-44
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index eaf0173..650f0ab 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -25,6 +25,7 @@ import org.jetbrains.annotations.*;
import sun.misc.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -106,6 +107,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (V)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
+ null,
expiryPerCall(),
true,
false,
@@ -127,6 +129,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
+ null,
expiryPerCall(),
false,
false,
@@ -145,6 +148,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
+ null,
expiryPerCall(),
false,
false,
@@ -163,7 +167,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ctx.denyOnLocalRead();
- return updateAllAsync0(F0.asMap(key, val), null, true, false, ttl, filter);
+ return updateAllAsync0(F0.asMap(key, val),
+ null,
+ null,
+ true,
+ false,
+ filter);
}
/** {@inheritDoc} */
@@ -177,7 +186,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ctx.denyOnLocalRead();
- return updateAllAsync0(F0.asMap(key, val), null, false, false, ttl, filter);
+ return updateAllAsync0(F0.asMap(key, val),
+ null,
+ null,
+ false,
+ false,
+ filter);
}
/** {@inheritDoc} */
@@ -242,6 +256,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (GridCacheReturn<V>)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(newVal),
+ null,
expiryPerCall(),
true,
true,
@@ -259,6 +274,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (GridCacheReturn<V>)updateAllInternal(DELETE,
Collections.singleton(key),
null,
+ null,
expiryPerCall(),
true,
true,
@@ -283,7 +299,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ctx.denyOnLocalRead();
- return updateAllAsync0(F.asMap(key, newVal), null, true, true, 0,
+ return updateAllAsync0(F.asMap(key, newVal),
+ null,
+ null,
+ true,
+ true,
ctx.equalsPeekArray(oldVal));
}
@@ -295,6 +315,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
updateAllInternal(UPDATE,
m.keySet(),
m.values(),
+ null,
expiryPerCall(),
false,
false,
@@ -307,11 +328,17 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
ctx.denyOnLocalRead();
- return updateAllAsync0(m, null, false, false, 0, filter);
+ return updateAllAsync0(m,
+ null,
+ null,
+ false,
+ false,
+ filter);
}
/** {@inheritDoc} */
@Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException {
+ /*
ctx.denyOnLocalRead();
updateAllInternal(TRANSFORM,
@@ -322,12 +349,16 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
null,
ctx.isStoreEnabled());
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer)
throws IgniteCheckedException {
+ /*
return (R)updateAllInternal(TRANSFORM,
Collections.singleton(key),
Collections.singleton(new GridCacheTransformComputeClosure<>(transformer)),
@@ -336,6 +367,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
null,
ctx.isStoreEnabled());
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@@ -343,14 +377,19 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
IgniteClosure<V, V> transformer,
@Nullable GridCacheEntryEx<K, V> entry,
long ttl) {
+ /*
ctx.denyOnLocalRead();
return updateAllAsync0(null, Collections.singletonMap(key, transformer), false, false, ttl, null);
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
@Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException {
+ /*
ctx.denyOnLocalRead();
if (F.isEmpty(m))
@@ -364,16 +403,23 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
null,
ctx.isStoreEnabled());
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) {
+ /*
ctx.denyOnLocalRead();
if (F.isEmpty(m))
return new GridFinishedFuture<Object>(ctx.kernalContext());
return updateAllAsync0(null, m, false, false, 0, null);
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@@ -386,6 +432,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (V)updateAllInternal(DELETE,
Collections.singleton(key),
null,
+ null,
expiryPerCall(),
true,
false,
@@ -412,6 +459,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
updateAllInternal(DELETE,
keys,
null,
+ null,
expiryPerCall(),
false,
false,
@@ -439,6 +487,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(DELETE,
Collections.singleton(key),
null,
+ null,
expiryPerCall(),
false,
false,
@@ -467,6 +516,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(DELETE,
Collections.singleton(key),
null,
+ null,
expiryPerCall(),
false,
false,
@@ -669,31 +719,107 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return getAllAsync(keys, null, false, subjId, taskName, deserializePortable, false, expiry, filter).get();
}
+ /** {@inheritDoc} */
+ @Override public <T> EntryProcessorResult<T> invoke(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ return invokeAsync(key, entryProcessor, args).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ return invokeAllAsync(keys, entryProcessor, args).get();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws EntryProcessorException {
+ A.notNull(key, "key", entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKey(key);
+
+ ctx.denyOnLocalRead();
+
+ Map<? extends K, EntryProcessor> invokeMap =
+ Collections.singletonMap(key, (EntryProcessor)entryProcessor);
+
+ IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
+ invokeMap,
+ args,
+ true,
+ false,
+ null);
+
+ return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
+ @Override public EntryProcessorResult<T> applyx(IgniteFuture<Map<K, EntryProcessorResult<T>>> fut)
+ throws IgniteCheckedException {
+ Map<K, EntryProcessorResult<T>> resMap = fut.get();
+
+ assert resMap != null;
+ assert resMap.size() == 1 : resMap.size();
+
+ return resMap.values().iterator().next();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
+ Set<? extends K> keys,
+ final EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ A.notNull(keys, "keys", entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKeys(keys);
+
+ ctx.denyOnLocalRead();
+
+ Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+ @Override public EntryProcessor apply(K k) {
+ return entryProcessor;
+ }
+ });
+
+ return updateAllAsync0(null,
+ invokeMap,
+ args,
+ true,
+ false,
+ null);
+ }
+
/**
* Entry point for public API update methods.
*
- * @param map Put map. Either {@code map} or {@code transformMap} should be passed.
- * @param transformMap Transform map. Either {@code map} or {@code transformMap} should be passed.
+ * @param map Put map. Either {@code map} or {@code invokeMap} should be passed.
+ * @param invokeMap Transform map. Either {@code map} or {@code invokeMap} should be passed.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
- * @param ttl Entry time-to-live.
* @param filter Cache entry filter for atomic updates.
* @return Completion future.
*/
private IgniteFuture updateAllAsync0(
@Nullable final Map<? extends K, ? extends V> map,
- @Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+ @Nullable final Map<? extends K, EntryProcessor> invokeMap,
+ @Nullable final Object[] invokeArgs,
final boolean retval,
final boolean rawRetval,
- final long ttl,
@Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter
) {
- final GridCacheOperation op = transformMap != null ? TRANSFORM : UPDATE;
+ final GridCacheOperation op = invokeMap != null ? TRANSFORM : UPDATE;
final Collection<? extends K> keys =
- map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : null;
+ map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : null;
- final Collection<?> vals = map != null ? map.values() : transformMap != null ? transformMap.values() : null;
+ final Collection<?> vals = map != null ? map.values() : invokeMap != null ? invokeMap.values() : null;
final boolean storeEnabled = ctx.isStoreEnabled();
@@ -704,6 +830,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return updateAllInternal(op,
keys,
vals,
+ invokeArgs,
expiry,
retval,
rawRetval,
@@ -737,6 +864,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return updateAllInternal(DELETE,
keys,
null,
+ null,
expiryPlc,
retval,
rawRetval,
@@ -747,11 +875,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
}
/**
- * Entry point for all public update methods (put, remove, transform).
+ * Entry point for all public update methods (put, remove, invoke).
*
* @param op Operation.
* @param keys Keys.
* @param vals Values.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param expiryPlc Expiry policy.
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
@@ -764,6 +893,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
private Object updateAllInternal(GridCacheOperation op,
Collection<? extends K> keys,
@Nullable Iterable<?> vals,
+ @Nullable Object[] invokeArgs,
@Nullable ExpiryPolicy expiryPlc,
boolean retval,
boolean rawRetval,
@@ -784,9 +914,15 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
UUID subjId = ctx.subjectIdPerCall(null);
if (storeEnabled && keys.size() > 1) {
- updateWithBatch(op, keys, vals, expiryPlc, ver, filter, subjId, taskName);
-
- return null;
+ return updateWithBatch(op,
+ keys,
+ vals,
+ invokeArgs,
+ expiryPlc,
+ ver,
+ filter,
+ subjId,
+ taskName);
}
Iterator<?> valsIter = vals != null ? vals.iterator() : null;
@@ -809,10 +945,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
try {
entry = entryEx(key);
- IgniteBiTuple<Boolean, V> t = entry.innerUpdateLocal(
+ IgniteBiTuple<Boolean, Object> t = entry.innerUpdateLocal(
ver,
val == null ? DELETE : op,
val,
+ invokeArgs,
storeEnabled,
retval,
expiryPlc,
@@ -823,16 +960,23 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
subjId,
taskName);
- if (res == null) {
- if (op == TRANSFORM && val instanceof GridCacheTransformComputeClosure) {
- assert retval;
+ if (op == TRANSFORM) {
+ assert t.get2() instanceof EntryProcessorResult : t.get2();
+
+ Map<K, EntryProcessorResult> computedMap;
- res = new IgniteBiTuple<>(t.get1(),
- ((GridCacheTransformComputeClosure<V, ?>)val).returnValue());
+ if (res == null) {
+ computedMap = U.newHashMap(keys.size());
+
+ res = new IgniteBiTuple<>(true, computedMap);
}
else
- res = t;
+ computedMap = (Map<K, EntryProcessorResult>)res.get2();
+
+ computedMap.put(key, (EntryProcessorResult)t.getValue());
}
+ else if (res == null)
+ res = t;
break; // While.
}
@@ -872,18 +1016,21 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
* @param op Operation.
* @param keys Keys.
* @param vals Values.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param expiryPlc Expiry policy.
* @param ver Cache version.
* @param filter Optional filter.
* @param subjId Subject ID.
* @param taskName Task name.
* @throws GridCachePartialUpdateException If update failed.
+ * @return Results map for invoke operation.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"})
- private void updateWithBatch(
+ private Map<K, EntryProcessorResult> updateWithBatch(
GridCacheOperation op,
Collection<? extends K> keys,
@Nullable Iterable<?> vals,
+ @Nullable Object[] invokeArgs,
@Nullable ExpiryPolicy expiryPlc,
GridCacheVersion ver,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@@ -896,7 +1043,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
int size = locked.size();
Map<K, V> putMap = null;
+
Collection<K> rmvKeys = null;
+
+ Map<K, EntryProcessorResult> invokeResMap =
+ op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : null;
+
List<GridCacheEntryEx<K, V>> filtered = new ArrayList<>(size);
GridCachePartialUpdateException err = null;
@@ -933,7 +1085,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
}
if (op == TRANSFORM) {
- IgniteClosure<V, V> transform = (IgniteClosure<V, V>)val;
+ EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)val;
V old = entry.innerGet(null,
/*swap*/true,
@@ -944,12 +1096,30 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/**event*/true,
/**temporary*/true,
subjId,
- transform,
+ entryProcessor,
taskName,
CU.<K, V>empty(),
null);
- V updated = transform.apply(old);
+ CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old);
+
+ V updated;
+ CacheInvokeResult invokeRes;
+
+ try {
+ Object computed = entryProcessor.process(invokeEntry, invokeArgs);
+
+ updated = ctx.unwrapTemporary(invokeEntry.getValue());
+
+ invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed));
+ }
+ catch (Exception e) {
+ invokeRes = new CacheInvokeResult<>(e);
+
+ updated = old;
+ }
+
+ invokeResMap.put(entry.key(), invokeRes);
if (updated == null) {
if (intercept) {
@@ -1107,6 +1277,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
if (err != null)
throw err;
+
+ return invokeResMap;
}
finally {
unlockEntries(locked);
@@ -1179,10 +1351,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
assert writeVal != null || op == DELETE : "null write value found.";
- IgniteBiTuple<Boolean, V> t = entry.innerUpdateLocal(
+ IgniteBiTuple<Boolean, Object> t = entry.innerUpdateLocal(
ver,
op,
writeVal,
+ null,
false,
false,
expiryPlc,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
index 6fb77a1..bfd9359 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1194,7 +1194,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
/*event*/recordEvt,
/*temporary*/true,
/*subjId*/subjId,
- /**closure name */recordEvt ? F.first(txEntry.entryProcessors()) : null,
+ /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null,
resolveTaskName(),
CU.<K, V>empty(),
null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 34938d5..1680724 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1768,7 +1768,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
@Override public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync(
GridCacheContext<K, V> cacheCtx,
boolean retval,
- @Nullable Map<? extends K, EntryProcessor> map,
+ @Nullable Map<? extends K, EntryProcessor<K, V, Object>> map,
Object... invokeArgs
) {
return (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)putAllAsync0(cacheCtx,
@@ -1829,7 +1829,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
@Nullable ExpiryPolicy expiryPlc,
boolean implicit,
@Nullable Map<? extends K, ? extends V> lookup,
- @Nullable Map<? extends K, EntryProcessor> invokeMap,
+ @Nullable Map<? extends K, EntryProcessor<K, V, Object>> invokeMap,
@Nullable Object[] invokeArgs,
boolean retval,
boolean lockOnly,
@@ -1992,7 +1992,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
break; // While.
}
- GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+ final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
txEntry = addEntry(op,
@@ -2019,7 +2019,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
txEntry.markValid();
if (old == null) {
- if (retval && !readThrough) {
+ boolean load = retval && !readThrough;
+
+ // Check for transform here to avoid map creation.
+ load |= (op == TRANSFORM && keys.size() == 1);
+
+ if (load) {
// If return value is required, then we know for sure that there is only
// one key in the keys collection.
assert keys.size() == 1;
@@ -2035,7 +2040,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
log.debug("Loaded value from remote node [key=" + k + ", val=" +
v + ']');
- ret.set(v, true);
+ if (op == TRANSFORM) {
+ IgniteTxEntry<K, V> e =
+ entry(new IgniteTxKey<>(k, cacheCtx.cacheId()));
+
+ assert e != null && e.op() == TRANSFORM : e;
+
+ addInvokeResult(e, v, ret);
+ }
+ else
+ ret.set(v, true);
}
});
@@ -2130,6 +2144,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
drVer);
enlisted.add(key);
+
+ if (txEntry.op() == TRANSFORM)
+ addInvokeResult(txEntry, txEntry.value(), ret);
}
if (!pessimistic()) {
@@ -2137,6 +2154,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (retval)
ret.set(v, true);
+ else
+ ret.success(true);
}
}
}
@@ -2155,11 +2174,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
missedForInvoke,
deserializePortables(cacheCtx),
new CI2<K, V>() {
- @Override public void apply(K k, V v) {
+ @Override public void apply(K key, V val) {
if (log.isDebugEnabled())
- log.debug("Loaded value from remote node [key=" + k + ", val=" + v + ']');
+ log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
+
+ IgniteTxEntry<K, V> e = entry(new IgniteTxKey<>(key, cacheCtx.cacheId()));
- addInvokeResult(entry(new IgniteTxKey<>(k, cacheCtx.cacheId())), v, ret);
+ assert e != null && e.op() == TRANSFORM : e;
+
+ addInvokeResult(e, val, ret);
}
});
@@ -2373,17 +2396,19 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
private IgniteFuture putAllAsync0(
final GridCacheContext<K, V> cacheCtx,
@Nullable Map<? extends K, ? extends V> map,
- @Nullable Map<? extends K, EntryProcessor> invokeMap,
+ @Nullable Map<? extends K, EntryProcessor<K, V, Object>> invokeMap,
@Nullable final Object[] invokeArgs,
@Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap,
final boolean retval,
@Nullable GridCacheEntryEx<K, V> cached,
@Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+ assert filter == null || invokeMap == null;
+
cacheCtx.checkSecurity(GridSecurityPermission.CACHE_PUT);
// Cached entry may be passed only from entry wrapper.
final Map<K, V> map0;
- final Map<K, EntryProcessor> invokeMap0;
+ final Map<K, EntryProcessor<K, V, Object>> invokeMap0;
if (drMap != null) {
assert map == null;
@@ -2419,7 +2444,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
invokeMap0 = U.newHashMap(invokeMap.size());
try {
- for (Map.Entry<? extends K, EntryProcessor> e : invokeMap.entrySet()) {
+ for (Map.Entry<? extends K, EntryProcessor<K, V, Object>> e : invokeMap.entrySet()) {
K key = (K)cacheCtx.marshalToPortable(e.getKey());
invokeMap0.put(key, e.getValue());
@@ -2434,7 +2459,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
}
else {
map0 = (Map<K, V>)map;
- invokeMap0 = (Map<K, EntryProcessor>)invokeMap;
+ invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
}
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
index 8a485b6..12680f3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -96,7 +96,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> {
public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync(
GridCacheContext<K, V> cacheCtx,
boolean retval,
- Map<? extends K, EntryProcessor> map,
+ Map<? extends K, EntryProcessor<K, V, Object>> map,
Object... invokeArgs);
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
index a21c8fc..89bcbe6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
@@ -34,6 +34,7 @@ import org.gridgain.grid.util.worker.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
import java.io.*;
import java.nio.*;
import java.util.*;
@@ -1101,7 +1102,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
// No affinity key present, just concat and return.
if (colocatedKey.affinityKey() == null) {
- dataCachePrj.transform(colocatedKey, new UpdateClosure(startOff, data));
+ dataCachePrj.invoke(colocatedKey, new UpdateProcessor(startOff, data));
return;
}
@@ -1125,16 +1126,16 @@ public class GridGgfsDataManager extends GridGgfsManager {
boolean hasVal = false;
- UpdateClosure transformClos = new UpdateClosure(startOff, data);
+ UpdateProcessor transformClos = new UpdateProcessor(startOff, data);
if (vals.get(colocatedKey) != null) {
- dataCachePrj.transform(colocatedKey, transformClos);
+ dataCachePrj.invoke(colocatedKey, transformClos);
hasVal = true;
}
if (vals.get(key) != null) {
- dataCachePrj.transform(key, transformClos);
+ dataCachePrj.invoke(key, transformClos);
hasVal = true;
}
@@ -1570,7 +1571,8 @@ public class GridGgfsDataManager extends GridGgfsManager {
* Helper closure to update data in cache.
*/
@GridInternal
- private static final class UpdateClosure implements IgniteClosure<byte[], byte[]>, Externalizable {
+ private static final class UpdateProcessor implements EntryProcessor<GridGgfsBlockKey, byte[], Void>,
+ Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -1584,7 +1586,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
* Empty constructor required for {@link Externalizable}.
*
*/
- public UpdateClosure() {
+ public UpdateProcessor() {
// No-op.
}
@@ -1594,7 +1596,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
* @param start Start position in the block to write new data from.
* @param data Data block to write into cache.
*/
- private UpdateClosure(int start, byte[] data) {
+ private UpdateProcessor(int start, byte[] data) {
assert start >= 0;
assert data != null;
assert start + data.length >= 0 : "Too much data [start=" + start + ", data.length=" + data.length + ']';
@@ -1604,7 +1606,9 @@ public class GridGgfsDataManager extends GridGgfsManager {
}
/** {@inheritDoc} */
- @Override public byte[] apply(byte[] e) {
+ @Override public Void process(MutableEntry<GridGgfsBlockKey, byte[]> entry, Object... args) {
+ byte[] e = entry.getValue();
+
final int size = data.length;
if (e == null || e.length == 0)
@@ -1621,7 +1625,9 @@ public class GridGgfsDataManager extends GridGgfsManager {
// Copy data into entry.
U.arrayCopy(data, 0, e, start, size);
- return e;
+ entry.setValue(e);
+
+ return null;
}
/** {@inheritDoc} */
@@ -1638,7 +1644,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(UpdateClosure.class, this, "start", start, "data.length", data.length);
+ return S.toString(UpdateProcessor.class, this, "start", start, "data.length", data.length);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
index eb0a728..87e09b9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
@@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.grid.util.lang.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
@@ -751,7 +752,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
assert metaCache.get(parentId) != null;
- id2InfoPrj.transform(parentId, new UpdateListing(fileName, new GridGgfsListingEntry(newFileInfo), false));
+ id2InfoPrj.invoke(parentId, new UpdateListing(fileName, new GridGgfsListingEntry(newFileInfo), false));
return null;
}
@@ -868,10 +869,10 @@ public class GridGgfsMetaManager extends GridGgfsManager {
assert metaCache.get(destParentId) != null;
// Remove listing entry from the source parent listing.
- id2InfoPrj.transform(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
+ id2InfoPrj.invoke(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
// Add listing entry into the destination parent listing.
- id2InfoPrj.transform(destParentId, new UpdateListing(destFileName, srcEntry, false));
+ id2InfoPrj.invoke(destParentId, new UpdateListing(destFileName, srcEntry, false));
}
/**
@@ -987,7 +988,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
// Update a file info of the removed file with a file path,
// which will be used by delete worker for event notifications.
- id2InfoPrj.transform(fileId, new UpdatePath(path));
+ id2InfoPrj.invoke(fileId, new UpdatePath(path));
return GridGgfsFileInfo.builder(fileInfo).path(path).build();
}
@@ -1086,12 +1087,12 @@ public class GridGgfsMetaManager extends GridGgfsManager {
id2InfoPrj.put(newInfo.id(), newInfo);
// Add new info to trash listing.
- id2InfoPrj.transform(TRASH_ID, new UpdateListing(newInfo.id().toString(),
+ id2InfoPrj.invoke(TRASH_ID, new UpdateListing(newInfo.id().toString(),
new GridGgfsListingEntry(newInfo), false));
// Remove listing entries from root.
for (Map.Entry<String, GridGgfsListingEntry> entry : transferListing.entrySet())
- id2InfoPrj.transform(ROOT_ID, new UpdateListing(entry.getKey(), entry.getValue(), true));
+ id2InfoPrj.invoke(ROOT_ID, new UpdateListing(entry.getKey(), entry.getValue(), true));
resId = newInfo.id();
}
@@ -1228,7 +1229,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
GridGgfsListingEntry listingEntry = parentInfo.listing().get(name);
if (listingEntry != null)
- id2InfoPrj.transform(parentId, new UpdateListing(name, listingEntry, true));
+ id2InfoPrj.invoke(parentId, new UpdateListing(name, listingEntry, true));
id2InfoPrj.remove(id);
@@ -1359,7 +1360,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
assert metaCache.get(parentId) != null;
- id2InfoPrj.transform(parentId, new UpdateListing(fileName, entry, false));
+ id2InfoPrj.invoke(parentId, new UpdateListing(fileName, entry, false));
}
return newInfo;
@@ -1424,7 +1425,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
assert validTxState(false);
- id2InfoPrj.transformAsync(parentId, new UpdateListingEntry(fileId, fileName, lenDelta, 0,
+ id2InfoPrj.invokeAsync(parentId, new UpdateListingEntry(fileId, fileName, lenDelta, 0,
modificationTime));
}
finally {
@@ -1659,9 +1660,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
id2InfoPrj.removex(oldId); // Remove the old one.
id2InfoPrj.putx(newInfo.id(), newInfo); // Put the new one.
- id2InfoPrj.transform(parentInfo.id(),
+ id2InfoPrj.invoke(parentInfo.id(),
new UpdateListing(path.name(), parentInfo.listing().get(path.name()), true));
- id2InfoPrj.transform(parentInfo.id(),
+ id2InfoPrj.invoke(parentInfo.id(),
new UpdateListing(path.name(), new GridGgfsListingEntry(newInfo), false));
IgniteFuture<?> delFut = ggfsCtx.data().delete(oldInfo);
@@ -2150,7 +2151,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
}
// Update the deleted file info with path information for delete worker.
- id2InfoPrj.transform(info.id(), new UpdatePath(path));
+ id2InfoPrj.invoke(info.id(), new UpdatePath(path));
return true; // No additional handling is required.
}
@@ -2606,7 +2607,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
id2InfoPrj.putx(fileId, updated);
- id2InfoPrj.transform(parentId, new UpdateListingEntry(fileId, fileName, 0, accessTime,
+ id2InfoPrj.invoke(parentId, new UpdateListingEntry(fileId, fileName, 0, accessTime,
modificationTime));
tx.commit();
@@ -2741,7 +2742,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
/**
* Updates file length information in parent listing.
*/
- private static final class UpdateListingEntry implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>,
+ private static final class UpdateListingEntry implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -2775,8 +2776,11 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* @param accessTime Last access time.
* @param modificationTime Last modification time.
*/
- private UpdateListingEntry(IgniteUuid fileId, String fileName, long lenDelta,
- long accessTime, long modificationTime) {
+ private UpdateListingEntry(IgniteUuid fileId,
+ String fileName,
+ long lenDelta,
+ long accessTime,
+ long modificationTime) {
this.fileId = fileId;
this.fileName = fileName;
this.lenDelta = lenDelta;
@@ -2785,13 +2789,18 @@ public class GridGgfsMetaManager extends GridGgfsManager {
}
/** {@inheritDoc} */
- @Override public GridGgfsFileInfo apply(GridGgfsFileInfo fileInfo) {
+ @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) {
+ GridGgfsFileInfo fileInfo = e.getValue();
+
Map<String, GridGgfsListingEntry> listing = fileInfo.listing();
GridGgfsListingEntry entry = listing.get(fileName);
- if (entry == null || !entry.fileId().equals(fileId))
- return fileInfo;
+ if (entry == null || !entry.fileId().equals(fileId)) {
+ e.setValue(fileInfo);
+
+ return null;
+ }
entry = new GridGgfsListingEntry(entry, entry.length() + lenDelta,
accessTime == -1 ? entry.accessTime() : accessTime,
@@ -2803,7 +2812,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
// Modify listing map in-place since map is serialization-safe.
listing.put(fileName, entry);
- return new GridGgfsFileInfo(listing, fileInfo);
+ e.setValue(new GridGgfsFileInfo(listing, fileInfo));
+
+ return null;
}
/** {@inheritDoc} */
@@ -2829,7 +2840,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* Update directory listing closure.
*/
@GridInternal
- private static final class UpdateListing implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>,
+ private static final class UpdateListing implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -2868,7 +2879,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
}
/** {@inheritDoc} */
- @Override @Nullable public GridGgfsFileInfo apply(GridGgfsFileInfo fileInfo) {
+ @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) {
+ GridGgfsFileInfo fileInfo = e.getValue();
+
assert fileInfo != null : "File info not found for the child: " + entry.fileId();
assert fileInfo.isDirectory();
@@ -2897,7 +2910,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
", oldEntry=" + oldEntry + ']');
}
- return new GridGgfsFileInfo(listing, fileInfo);
+ e.setValue(new GridGgfsFileInfo(listing, fileInfo));
+
+ return null;
}
/** {@inheritDoc} */
@@ -2924,7 +2939,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* Update path closure.
*/
@GridInternal
- private static final class UpdatePath implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>,
+ private static final class UpdatePath implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -2943,11 +2958,16 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* Default constructor (required by Externalizable).
*/
public UpdatePath() {
+ // No-op.
}
/** {@inheritDoc} */
- @Override public GridGgfsFileInfo apply(GridGgfsFileInfo info) {
- return GridGgfsFileInfo.builder(info).path(path).build();
+ @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) {
+ GridGgfsFileInfo info = e.getValue();
+
+ e.setValue(GridGgfsFileInfo.builder(info).path(path).build());
+
+ return null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java
new file mode 100644
index 0000000..6a97b19
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java
@@ -0,0 +1,47 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicInvokeTest extends IgniteCacheInvokeAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+ return CLOCK;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return PARTITIONED_ONLY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java
new file mode 100644
index 0000000..7048f18
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java
@@ -0,0 +1,41 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicLocalInvokeTest extends IgniteCacheInvokeAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return LOCAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return PARTITIONED_ONLY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java
new file mode 100644
index 0000000..2ff0468
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java
@@ -0,0 +1,22 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.store.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicLocalWithStoreInvokeTest extends IgniteCacheAtomicLocalInvokeTest {
+ /** {@inheritDoc} */
+ @Override protected GridCacheStore<?, ?> cacheStore() {
+ return new TestStore();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java
new file mode 100644
index 0000000..8f4d71c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java
@@ -0,0 +1,24 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicNearEnabledInvokeTest extends IgniteCacheAtomicInvokeTest {
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return NEAR_PARTITIONED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
index 8731306..068476c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
@@ -39,7 +39,7 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
public void testInvoke() throws Exception {
// TODO IGNITE41 test with forceTransformBackups.
- invoke(null);
+ invoke(null);
if (atomicityMode() == TRANSACTIONAL) {
invoke(PESSIMISTIC);
@@ -165,19 +165,21 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
invokeAll(cache, new HashSet<>(primaryKeys(cache, 3, 0)), txMode);
- invokeAll(cache, new HashSet<>(backupKeys(cache, 3, 0)), txMode);
+ if (gridCount() > 1) {
+ invokeAll(cache, new HashSet<>(backupKeys(cache, 3, 0)), txMode);
- invokeAll(cache, new HashSet<>(nearKeys(cache, 3, 0)), txMode);
+ invokeAll(cache, new HashSet<>(nearKeys(cache, 3, 0)), txMode);
- Set<Integer> keys = new HashSet<>();
+ Set<Integer> keys = new HashSet<>();
- keys.addAll(primaryKeys(jcache(0), 3, 0));
- keys.addAll(primaryKeys(jcache(1), 3, 0));
- keys.addAll(primaryKeys(jcache(2), 3, 0));
+ keys.addAll(primaryKeys(jcache(0), 3, 0));
+ keys.addAll(primaryKeys(jcache(1), 3, 0));
+ keys.addAll(primaryKeys(jcache(2), 3, 0));
- invokeAll(cache, keys, txMode);
+ invokeAll(cache, keys, txMode);
+ }
- keys = new HashSet<>();
+ Set<Integer> keys = new HashSet<>();
for (int i = 0; i < 1000; i++)
keys.add(i);
@@ -415,7 +417,6 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<Integer, Integer> e,
Object... arguments) throws EntryProcessorException {
- System.out.println(Thread.currentThread() + " compute, old=" + e.getValue());
if (e.exists()) {
Integer val = e.getValue();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java
new file mode 100644
index 0000000..20576ab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java
@@ -0,0 +1,41 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheTxLocalInvokeTest extends IgniteCacheInvokeAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return LOCAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return PARTITIONED_ONLY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java
new file mode 100644
index 0000000..ffc50ff
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java
@@ -0,0 +1,24 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheTxNearEnabledInvokeTest extends IgniteCacheTxInvokeTest {
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return NEAR_PARTITIONED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index a57da71..0d3b54f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -363,12 +363,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
createUpdatePutAll(null);
if (atomicityMode() == TRANSACTIONAL) {
- IgniteTxConcurrency[] txModes;
-
- if (cacheMode() == LOCAL)
- txModes= new IgniteTxConcurrency[]{PESSIMISTIC};
- else
- txModes= new IgniteTxConcurrency[]{PESSIMISTIC, OPTIMISTIC};
+ IgniteTxConcurrency[] txModes = new IgniteTxConcurrency[]{PESSIMISTIC, OPTIMISTIC};
for (IgniteTxConcurrency tx : txModes) {
for (final Integer key : keys()) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index fd3751c..9d56c7f 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -891,6 +891,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertEquals("3", res.get("key3").get());
assertEquals(3, res.size());
+
+ cache.remove("key1");
+ cache.put("key2", 1);
+ cache.put("key3", 3);
}
Map<String, EntryProcessorResult<String>> res = cache.invokeAll(F.asSet("key1", "key2", "key3"), RMV_PROCESSOR);
@@ -901,9 +905,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache(i).peek("key3"));
}
- assertEquals("1", res.get("key1").get());
- assertEquals("2", res.get("key2").get());
- assertEquals("4", res.get("key3").get());
+ assertEquals("null", res.get("key1").get());
+ assertEquals("1", res.get("key2").get());
+ assertEquals("3", res.get("key3").get());
assertEquals(3, res.size());
@@ -928,9 +932,23 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
public void testTransformAllWithNulls() throws Exception {
- final GridCacheProjection<String, Integer> cache = cache();
+ final IgniteCache<String, Integer> cache = jcache();
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.invokeAll(null, INCR_PROCESSOR);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
- cache.transformAll(null); // This should be no-op.
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.invokeAll(F.asSet("key1"), null);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
{
Map<String, Integer> m = new HashMap<>(2);
@@ -944,38 +962,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
{
- Map<String, IgniteClosure<Integer, Integer>> tm = new HashMap<>(2);
-
- tm.put("key1", INCR_PROCESSOR);
- tm.put(null, INCR_PROCESSOR);
-
- // WARN: F.asMap() doesn't work here, because it will throw NPE.
-
- cache.transformAll(tm);
- }
-
- {
- Map<String, IgniteClosure<Integer, Integer>> tm = new HashMap<>(2);
-
- tm.put("key1", INCR_PROCESSOR);
- tm.put("key2", null);
-
- // WARN: F.asMap() doesn't work here, because it will throw NPE.
-
- cache.transformAll(tm);
- }
-
- cache.transformAll(null, INCR_PROCESSOR); // This should be no-op.
-
- {
- Set<String> ts = new HashSet<>(3);
+ Set<String> keys = new HashSet<>(2);
- ts.add("key1");
- ts.add(null);
+ keys.add("key1");
+ keys.add(null);
// WARN: F.asSet() doesn't work here, because it will throw NPE.
- cache.transformAll(ts, INCR_PROCESSOR);
+ cache.invokeAll(keys, INCR_PROCESSOR);
}
}
@@ -1014,17 +1008,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
*/
private void checkTransformSequential0(boolean startVal, IgniteTxConcurrency concurrency)
throws Exception {
- GridCacheProjection<String, Integer> cache = cache();
+ IgniteCache<String, Integer> cache = jcache();
- IgniteTx tx = txEnabled() ? cache.txStart(concurrency, READ_COMMITTED) : null;
+ IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;
try {
if (startVal)
cache.put("key", 2);
- cache.transform("key", INCR_PROCESSOR);
- cache.transform("key", INCR_PROCESSOR);
- cache.transform("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
if (tx != null)
tx.commit();
@@ -1063,18 +1057,18 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
private void checkTransformAfterRemove(IgniteTxConcurrency concurrency) throws Exception {
- GridCacheProjection<String, Integer> cache = cache();
+ IgniteCache<String, Integer> cache = jcache();
cache.put("key", 4);
- IgniteTx tx = txEnabled() ? cache.txStart(concurrency, READ_COMMITTED) : null;
+ IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;
try {
cache.remove("key");
- cache.transform("key", INCR_PROCESSOR);
- cache.transform("key", INCR_PROCESSOR);
- cache.transform("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
if (tx != null)
tx.commit();
@@ -1128,20 +1122,23 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @param isolation Isolation.
* @throws Exception If failed.
*/
- private void checkTransformReturnValue(boolean put, IgniteTxConcurrency concurrency,
- IgniteTxIsolation isolation) throws Exception {
- GridCacheProjection<String, Integer> cache = cache();
+ private void checkTransformReturnValue(boolean put,
+ IgniteTxConcurrency concurrency,
+ IgniteTxIsolation isolation)
+ throws Exception
+ {
+ IgniteCache<String, Integer> cache = jcache();
if (!put)
cache.put("key", 1);
- IgniteTx tx = txEnabled() ? cache.txStart(concurrency, isolation) : null;
+ IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, isolation) : null;
try {
if (put)
cache.put("key", 1);
- cache.transform("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
assertEquals((Integer)2, cache.get("key"));
@@ -1211,33 +1208,25 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @throws Exception If failed.
*/
- public void testTransformEntry() throws Exception {
- GridCacheEntry<String, Integer> entry = cache().entry("test");
+ public void testInvokeAsync() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
- entry.setValue(1);
+ cache.put("key2", 1);
+ cache.put("key3", 3);
- // Make user entry capture cache entry.
- entry.version();
+ cache = cache.enableAsync();
- assertEquals((Integer)1, entry.getValue());
+ assertNull(cache.invoke("key1", INCR_PROCESSOR));
- entry.transform(INCR_PROCESSOR);
+ IgniteFuture<?> fut0 = cache.future();
- assertEquals((Integer)2, entry.getValue());
- }
+ assertNull(cache.invoke("key2", INCR_PROCESSOR));
- /**
- * @throws Exception If failed.
- */
- public void testTransformAsync() throws Exception {
- GridCacheProjection<String, Integer> cache = cache();
+ IgniteFuture<?> fut1 = cache.future();
- cache.put("key2", 1);
- cache.put("key3", 3);
+ assertNull(cache.invoke("key3", RMV_PROCESSOR));
- IgniteFuture<?> fut0 = cache.transformAsync("key1", INCR_PROCESSOR);
- IgniteFuture<?> fut1 = cache.transformAsync("key2", INCR_PROCESSOR);
- IgniteFuture<?> fut2 = cache.transformAsync("key3", RMV_PROCESSOR);
+ IgniteFuture<?> fut2 = cache.future();
fut0.get();
fut1.get();
@@ -1254,46 +1243,54 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @throws Exception If failed.
*/
- public void testTransformCompute() throws Exception {
- GridCacheProjection<String, Integer> cache = cache();
-
- IgniteClosure<Integer, IgniteBiTuple<Integer, String>> c;
-
- c = new IgniteClosure<Integer, IgniteBiTuple<Integer, String>>() {
- @Override public IgniteBiTuple<Integer, String> apply(Integer val) {
- return val == null ? new IgniteBiTuple<>(0, "null") : new IgniteBiTuple<>(val + 1, String.valueOf(val));
- }
- };
+ public void testInvoke() throws Exception {
+ final IgniteCache<String, Integer> cache = jcache();
- assertEquals("null", cache.transformAndCompute("k0", c));
+ assertEquals("null", cache.invoke("k0", INCR_PROCESSOR));
- assertEquals((Integer)0, cache.get("k0"));
+ assertEquals((Integer)1, cache.get("k0"));
- assertEquals("0", cache.transformAndCompute("k0", c));
+ assertEquals("1", cache.invoke("k0", INCR_PROCESSOR));
- assertEquals((Integer)1, cache.get("k0"));
+ assertEquals((Integer)2, cache.get("k0"));
cache.put("k1", 1);
- assertEquals("1", cache.transformAndCompute("k1", c));
+ assertEquals("1", cache.invoke("k1", INCR_PROCESSOR));
assertEquals((Integer)2, cache.get("k1"));
- assertEquals("2", cache.transformAndCompute("k1", c));
+ assertEquals("2", cache.invoke("k1", INCR_PROCESSOR));
assertEquals((Integer)3, cache.get("k1"));
- c = new IgniteClosure<Integer, IgniteBiTuple<Integer, String>>() {
- @Override public IgniteBiTuple<Integer, String> apply(Integer integer) {
- return new IgniteBiTuple<>(null, null);
+ EntryProcessor<String, Integer, Integer> c = new EntryProcessor<String, Integer, Integer>() {
+ @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+ e.remove();
+
+ return null;
}
};
- assertNull(cache.transformAndCompute("k1", c));
+ assertNull(cache.invoke("k1", c));
assertNull(cache.get("k1"));
for (int i = 0; i < gridCount(); i++)
assertNull(cache(i).peek("k1"));
+
+ final EntryProcessor<String, Integer, Integer> errProcessor = new EntryProcessor<String, Integer, Integer>() {
+ @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+ throw new EntryProcessorException("Test entry processor exception.");
+ }
+ };
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.invoke("k1", errProcessor);
+
+ return null;
+ }
+ }, EntryProcessorException.class, "Test entry processor exception.");
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
index 8a929bf..c204e00 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
@@ -9,6 +9,7 @@
package org.gridgain.grid.kernal.processors.cache;
+import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
@@ -17,6 +18,8 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.gridgain.testframework.junits.common.*;
+import javax.cache.processor.*;
+import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -108,19 +111,21 @@ public abstract class GridCacheGetAndTransformStoreAbstractTest extends GridComm
startGrid(1);
startGrid(2);
- final IgniteClosure<String, String> trans = new TransformClosure();
+ final Processor entryProcessor = new Processor();
IgniteFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
- GridCache<Integer, String> c = cache(ThreadLocalRandom.current().nextInt(3));
+ IgniteCache<Integer, String> c = jcache(ThreadLocalRandom.current().nextInt(3));
while (!finish.get() && !Thread.currentThread().isInterrupted()) {
c.get(ThreadLocalRandom.current().nextInt(100));
+
c.put(ThreadLocalRandom.current().nextInt(100), "s");
- c.transform(
+
+ c.invoke(
ThreadLocalRandom.current().nextInt(100),
- trans);
+ entryProcessor);
}
return null;
@@ -147,10 +152,12 @@ public abstract class GridCacheGetAndTransformStoreAbstractTest extends GridComm
/**
*
*/
- private static class TransformClosure implements IgniteClosure<String, String> {
+ private static class Processor implements EntryProcessor<Integer, String, Void>, Serializable {
/** {@inheritDoc} */
- @Override public String apply(String s) {
- return "str";
+ @Override public Void process(MutableEntry<Integer, String> e, Object... args) {
+ e.setValue("str");
+
+ return null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
index 44b8618..0e8602c 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
@@ -10,16 +10,17 @@
package org.gridgain.grid.kernal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.grid.util.typedef.*;
import org.gridgain.testframework.*;
import org.gridgain.testframework.junits.common.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.*;
@@ -162,7 +163,7 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest {
ignite = restarts ? grids.getAndSet(idx, null) : grid(idx);
}
- GridCache <String, TestObject> cache = ignite.cache(null);
+ IgniteCache<String, TestObject> cache = ignite.jcache(null);
assertNotNull(cache);
@@ -173,11 +174,11 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest {
while (true) {
try {
- cache.transform("key", new Transformer());
+ cache.invoke("key", new Processor());
break;
}
- catch (GridCachePartialUpdateException ignored) {
+ catch (CachePartialUpdateException ignored) {
// Need to re-check if update actually succeeded.
TestObject updated = cache.get("key");
@@ -210,12 +211,16 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest {
}
/** */
- private static class Transformer implements C1<TestObject, TestObject> {
+ private static class Processor implements EntryProcessor<String, TestObject, Void>, Serializable {
/** {@inheritDoc} */
- @Override public TestObject apply(TestObject obj) {
+ @Override public Void process(MutableEntry<String, TestObject> e, Object... args) {
+ TestObject obj = e.getValue();
+
assert obj != null;
- return new TestObject(obj.val + 1);
+ e.setValue(new TestObject(obj.val + 1));
+
+ return null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
index b42406d..66abdc6 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
@@ -9,6 +9,7 @@
package org.gridgain.grid.kernal.processors.cache;
+import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
@@ -18,6 +19,7 @@ import org.gridgain.grid.util.typedef.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
import java.util.*;
import java.util.concurrent.atomic.*;
@@ -1172,27 +1174,28 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
private void cacheUpdate(int grid, boolean rmv, Operation op, String key, final Integer val,
@Nullable final Integer expOld, @Nullable final Integer expRmvRet)
throws Exception {
- GridCache<String, Integer> cache = cache(grid);
+ IgniteCache<String, Integer> cache = jcache(grid);
if (rmv) {
assertNull(val);
switch (op) {
case UPDATE: {
- assertEquals(expRmvRet, cache.remove(key));
+ assertEquals(expRmvRet, cache.getAndRemove(key));
break;
}
case UPDATEX: {
- cache.removex(key);
+ cache.remove(key);
break;
}
case UPDATE_FILTER: {
- Object old = cache.remove(key, new IgnitePredicate<GridCacheEntry<String, Integer>>() {
- @Override public boolean apply(GridCacheEntry<String, Integer> entry) {
+ Object old = cache.getAndRemoveIf(key, new IgnitePredicate<GridCacheEntry<String, Integer>>() {
+ @Override
+ public boolean apply(GridCacheEntry<String, Integer> entry) {
return true;
}
});
@@ -1203,10 +1206,15 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
}
case TRANSFORM: {
- cache.transform(key, new IgniteClosure<Integer, Integer>() {
- @Nullable @Override public Integer apply(Integer old) {
+ cache.invoke(key, new EntryProcessor<String, Integer, Void>() {
+ @Override
+ public Void process(MutableEntry<String, Integer> e, Object... args) {
+ Integer old = e.getValue();
+
assertEquals(expOld, old);
+ e.remove();
+
return null;
}
});
@@ -1221,20 +1229,21 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
else {
switch (op) {
case UPDATE: {
- assertEquals(expOld, cache.put(key, val));
+ assertEquals(expOld, cache.getAndPut(key, val));
break;
}
case UPDATEX: {
- cache.putx(key, val);
+ cache.put(key, val);
break;
}
case UPDATE_FILTER: {
- Object old = cache.put(key, val, new P1<GridCacheEntry<String, Integer>>() {
- @Override public boolean apply(GridCacheEntry<String, Integer> entry) {
+ Object old = cache.getAndPutIf(key, val, new P1<GridCacheEntry<String, Integer>>() {
+ @Override
+ public boolean apply(GridCacheEntry<String, Integer> entry) {
return true;
}
});
@@ -1245,11 +1254,16 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
}
case TRANSFORM: {
- cache.transform(key, new IgniteClosure<Integer, Integer>() {
- @Override public Integer apply(Integer old) {
+ cache.invoke(key, new EntryProcessor<String, Integer, Void>() {
+ @Override
+ public Void process(MutableEntry<String, Integer> e, Object... args) {
+ Integer old = e.getValue();
+
assertEquals(expOld, old);
- return val;
+ e.setValue(val);
+
+ return null;
}
});
@@ -1294,7 +1308,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
@SuppressWarnings("unchecked")
private void cacheBatchUpdate(int grid, boolean rmv, Operation op, final Map<String, Integer> map)
throws Exception {
- GridCache<String, Integer> cache = cache(grid);
+ IgniteCache<String, Integer> cache = jcache(grid);
if (rmv) {
switch (op) {
@@ -1305,8 +1319,10 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
}
case TRANSFORM: {
- cache.transformAll(map.keySet(), new IgniteClosure<Integer, Integer>() {
- @Nullable @Override public Integer apply(Integer old) {
+ cache.invokeAll(map.keySet(), new EntryProcessor<String, Integer, Void>() {
+ @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+ e.remove();
+
return null;
}
});
@@ -1327,17 +1343,13 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
}
case TRANSFORM: {
- Map<String, IgniteClosure<Integer, Integer>> m = new HashMap<>();
-
- for (final String key : map.keySet()) {
- m.put(key, new IgniteClosure<Integer, Integer>() {
- @Override public Integer apply(Integer old) {
- return map.get(key);
- }
- });
- }
+ cache.invokeAll(map.keySet(), new EntryProcessor<String, Integer, Void>() {
+ @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+ e.setValue(map.get(e.getKey()));
- cache.transformAll(m);
+ return null;
+ }
+ });
break;
}