You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/25 15:29:37 UTC
[1/3] incubator-ignite git commit: # ignite-44
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-44 928aa3d48 -> bcb30d104
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java
index 844fc5c..3c2d32c 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java
@@ -9,12 +9,14 @@
package org.gridgain.grid.kernal.processors.cache;
-import org.apache.ignite.lang.*;
+import org.apache.ignite.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.store.*;
import org.gridgain.testframework.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
+import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -51,8 +53,8 @@ public abstract class GridCacheMultinodeUpdateAbstractSelfTest extends GridCache
/**
* @throws Exception If failed.
*/
- public void testTransform() throws Exception {
- GridCache<Integer, Integer> cache = grid(0).cache(null);
+ public void testInvoke() throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
final Integer key = primaryKey(cache);
@@ -72,16 +74,16 @@ public abstract class GridCacheMultinodeUpdateAbstractSelfTest extends GridCache
@Override public Void call() throws Exception {
int idx = gridIdx.incrementAndGet() - 1;
- final GridCache<Integer, Integer> cache = grid(idx).cache(null);
+ final IgniteCache<Integer, Integer> cache = grid(idx).jcache(null);
for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++)
- cache.transform(key, new IncClosure());
+ cache.invoke(key, new IncProcessor());
return null;
}
- }, THREADS, "transform");
+ }, THREADS, "invoke");
- assertFalse("Got null in transform.", failed);
+ assertFalse("Got null in processor.", failed);
expVal += ITERATIONS_PER_THREAD * THREADS;
@@ -103,18 +105,22 @@ public abstract class GridCacheMultinodeUpdateAbstractSelfTest extends GridCache
/**
*
*/
- protected static class IncClosure implements IgniteClosure<Integer, Integer> {
+ protected static class IncProcessor implements EntryProcessor<Integer, Integer, Void>, Serializable {
/** {@inheritDoc} */
- @Override public Integer apply(Integer val) {
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ Integer val = e.getValue();
+
if (val == null) {
failed = true;
- System.out.println(Thread.currentThread() + " got null in transform: " + val);
+ System.out.println(Thread.currentThread() + " got null in processor: " + val);
return null;
}
- return val + 1;
+ e.setValue(val + 1);
+
+ return null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java
index e1f5c5a..5a97b7f 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java
@@ -9,10 +9,13 @@
package org.gridgain.grid.kernal.processors.cache;
+import org.apache.ignite.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
import org.gridgain.testframework.*;
+import javax.cache.processor.*;
+import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -76,7 +79,7 @@ public abstract class GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest extend
* @throws Exception If failed.
*/
private void testTransform(final Integer key) throws Exception {
- final GridCache<Integer, Integer> cache = grid(0).cache(null);
+ final IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
cache.put(key, 0);
@@ -89,7 +92,7 @@ public abstract class GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest extend
if (i % 500 == 0)
log.info("Iteration " + i);
- cache.transform(key, new IncClosure());
+ cache.invoke(key, new IncProcessor());
}
return null;
@@ -339,23 +342,29 @@ public abstract class GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest extend
}
/**
+ *
*/
- protected static class IncClosure implements IgniteClosure<Integer, Integer> {
+ protected static class IncProcessor implements EntryProcessor<Integer, Integer, Void>, Serializable {
/** {@inheritDoc} */
- @Override public Integer apply(Integer val) {
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ Integer val = e.getValue();
+
if (val == null) {
failed = true;
- System.out.println(Thread.currentThread() + " got null in transform: " + val);
+ System.out.println(Thread.currentThread() + " got null in processor: " + val);
return null;
}
- return val + 1;
+ e.setValue(val + 1);
+
+ return null;
}
}
/**
+ *
*/
protected static class TestFilter implements IgnitePredicate<GridCacheEntry<Integer, Integer>> {
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
index f84e9c7..baa03c2 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
@@ -9,6 +9,7 @@
package org.gridgain.grid.kernal.processors.cache;
+import org.apache.ignite.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.cache.*;
import org.gridgain.testframework.*;
@@ -48,7 +49,7 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea
* @throws Exception If failed.
*/
private void testTransformTx(final Integer key, final IgniteTxConcurrency txConcurrency) throws Exception {
- final GridCache<Integer, Integer> cache = grid(0).cache(null);
+ final IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
cache.put(key, 0);
@@ -57,12 +58,14 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
+ IgniteTransactions txs = ignite(0).transactions();
+
for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) {
if (i % 500 == 0)
log.info("Iteration " + i);
- try (IgniteTx tx = cache.txStart(txConcurrency, REPEATABLE_READ)) {
- cache.transform(key, new IncClosure());
+ try (IgniteTx tx = txs.txStart(txConcurrency, REPEATABLE_READ)) {
+ cache.invoke(key, new IncProcessor());
tx.commit();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java
index 4456346..0799180 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java
@@ -9,6 +9,7 @@
package org.gridgain.grid.kernal.processors.cache;
+import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.portables.*;
@@ -18,6 +19,7 @@ import org.gridgain.grid.util.typedef.*;
import org.jetbrains.annotations.*;
import org.junit.*;
+import javax.cache.processor.*;
import java.util.*;
import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
@@ -92,48 +94,61 @@ public abstract class GridCacheOffHeapTieredAbstractSelfTest extends GridCacheAb
* @throws Exception If failed.
*/
private void checkTransform(Integer key) throws Exception {
- GridCache<Integer, Integer> c = grid(0).cache(null);
+ IgniteCache<Integer, Integer> c = grid(0).jcache(null);
+
+ c.invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ Integer val = e.getValue();
- c.transform(key, new C1<Integer, Integer>() {
- @Override public Integer apply(Integer val) {
assertNull("Unexpected value: " + val, val);
return null;
}
});
- c.putx(key, 1);
+ c.put(key, 1);
+
+ c.invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ Integer val = e.getValue();
- c.transform(key, new C1<Integer, Integer>() {
- @Override public Integer apply(Integer val) {
assertNotNull("Unexpected value: " + val, val);
assertEquals((Integer) 1, val);
- return val + 1;
+ e.setValue(val + 1);
+
+ return null;
}
});
assertEquals((Integer)2, c.get(key));
- c.transform(key, new C1<Integer, Integer>() {
- @Override public Integer apply(Integer val) {
+ c.invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ Integer val = e.getValue();
+
assertNotNull("Unexpected value: " + val, val);
assertEquals((Integer)2, val);
- return val;
+ e.setValue(val);
+
+ return null;
}
});
- assertEquals((Integer) 2, c.get(key));
+ assertEquals((Integer)2, c.get(key));
+
+ c.invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ Integer val = e.getValue();
- c.transform(key, new C1<Integer, Integer>() {
- @Override
- public Integer apply(Integer val) {
assertNotNull("Unexpected value: " + val, val);
- assertEquals((Integer) 2, val);
+ assertEquals((Integer)2, val);
+
+ e.remove();
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java
index b25fcbd..d4eb179 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java
@@ -9,14 +9,16 @@
package org.gridgain.grid.kernal.processors.cache;
+import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
import org.apache.ignite.portables.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.testframework.*;
+import javax.cache.processor.*;
+import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -171,7 +173,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
* @throws Exception If failed.
*/
public void testTransform() throws Exception {
- final GridCache<Integer, Object> cache = grid(0).cache(null);
+ final IgniteCache<Integer, Object> cache = grid(0).jcache(null);
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
@@ -182,9 +184,9 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
final TestValue val = vals.get(key % VAL_SIZE);
- TestClosure c = testClosure(val.val, false);
+ TestProcessor c = testClosure(val.val, false);
- cache.transform(key, c);
+ cache.invoke(key, c);
}
return null;
@@ -208,7 +210,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
* @param acceptNull If {@code true} value can be null;
* @return Predicate.
*/
- private TestClosure testClosure(String expVal, boolean acceptNull) {
+ private TestProcessor testClosure(String expVal, boolean acceptNull) {
return portableEnabled() ?
new PortableValueClosure(expVal, acceptNull) :
new TestValueClosure(expVal, acceptNull);
@@ -326,7 +328,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
/**
*
*/
- protected abstract static class TestClosure implements IgniteClosure<Object, Object> {
+ protected abstract static class TestProcessor implements EntryProcessor<Integer, Object, Void>, Serializable {
/** */
protected String expVal;
@@ -337,23 +339,29 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
* @param expVal Expected value.
* @param acceptNull If {@code true} value can be null;
*/
- protected TestClosure(String expVal, boolean acceptNull) {
+ protected TestProcessor(String expVal, boolean acceptNull) {
this.expVal = expVal;
this.acceptNull = acceptNull;
}
/** {@inheritDoc} */
- @Override public final Object apply(Object val) {
+ @Override public Void process(MutableEntry<Integer, Object> e, Object... args) {
+ Object val = e.getValue();
+
if (val == null) {
if (!acceptNull)
assertNotNull(val);
- return true;
+ e.setValue(true);
+
+ return null;
}
checkValue(val);
- return val;
+ e.setValue(val);
+
+ return null;
}
/**
@@ -366,7 +374,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
*
*/
@SuppressWarnings("PackageVisibleInnerClass")
- static class PortableValueClosure extends TestClosure {
+ static class PortableValueClosure extends TestProcessor {
/**
* @param expVal Expected value.
* @param acceptNull If {@code true} value can be null;
@@ -387,7 +395,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
*
*/
@SuppressWarnings("PackageVisibleInnerClass")
- static class TestValueClosure extends TestClosure {
+ static class TestValueClosure extends TestProcessor {
/**
* @param expVal Expected value.
* @param acceptNull If {@code true} value can be null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
index 4b88530..b65fcad 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
@@ -9,11 +9,12 @@
package org.gridgain.grid.kernal.processors.cache;
+import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
import org.gridgain.testframework.junits.common.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
@@ -104,6 +105,9 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
}
/**
+ * @param mode Atomicity mode.
+ * @param order Atomic cache write order mode.
+ * @param b Number of backups.
* @throws Exception If failed.
*/
private void checkTransform(GridCacheAtomicityMode mode, GridCacheAtomicWriteOrderMode order, int b)
@@ -126,7 +130,7 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
failDeserialization = false;
// Get client grid.
- GridCacheProjection<Integer, TestObject> cache = grid(2).cache(null);
+ IgniteCache<Integer, TestObject> cache = grid(2).jcache(null);
if (backups > 0 && atomicityMode == ATOMIC)
cache = cache.flagsOn(FORCE_TRANSFORM_BACKUP);
@@ -138,17 +142,17 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
info(">>>>>> Transforming");
- // Transform (check non-existent keys also.
+ // Transform (check non-existent keys also).
for (int i = 0; i < 200; i++)
- cache.transform(i, new Transform());
+ cache.invoke(i, new Transform());
- Map<Integer, Transform> transformMap = new HashMap<>();
+ Set<Integer> keys = new HashSet<>();
// Check transformAll.
for (int i = 0; i < 300; i++)
- transformMap.put(i, new Transform());
+ keys.add(i);
- cache.transformAll(transformMap);
+ cache.invokeAll(keys, new Transform());
// Avoid errors during stop.
failDeserialization = false;
@@ -158,10 +162,15 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
}
}
- private static class Transform implements IgniteClosure<TestObject, TestObject> {
+ /**
+ *
+ */
+ private static class Transform implements EntryProcessor<Integer, TestObject, Void>, Serializable {
/** {@inheritDoc} */
- @Override public TestObject apply(TestObject testObject) {
- return new TestObject();
+ @Override public Void process(MutableEntry<Integer, TestObject> entry, Object... args) {
+ entry.setValue(new TestObject());
+
+ return null;
}
}
@@ -169,7 +178,11 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
*
*/
private static class TestObject implements Externalizable {
+ /**
+ *
+ */
public TestObject() {
+ // No-op.
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
index 6f9d32c..af729a1 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
@@ -423,9 +423,10 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Override public IgniteBiTuple<Boolean, V> innerUpdateLocal(GridCacheVersion ver,
+ @Override public IgniteBiTuple<Boolean, Object> innerUpdateLocal(GridCacheVersion ver,
GridCacheOperation op,
@Nullable Object writeObj,
+ @Nullable Object[] invokeArgs,
boolean writeThrough,
boolean retval,
@Nullable ExpiryPolicy expiryPlc,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
index 0be91f0..8cf48bf 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
@@ -21,6 +21,8 @@ import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
import org.gridgain.testframework.*;
import org.jetbrains.annotations.*;
+import javax.cache.*;
+import javax.cache.processor.*;
import java.util.*;
import java.util.concurrent.*;
@@ -417,17 +419,21 @@ public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstrac
info("Going to transform: " + key);
- GridTestUtils.assertThrows(log, new Callable<Void>() {
+ Throwable e = GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
- grid(0).cache(null).transform(key, new IgniteClosure<Object, Object>() {
- @Override public Object apply(Object o) {
- return 2;
+ grid(0).<Integer, Integer>jcache(null).invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ e.setValue(2);
+
+ return null;
}
});
return null;
}
- }, IgniteTxHeuristicException.class, null);
+ }, CacheException.class, null);
+
+ assertTrue("Unexpected cause: " +e, e.getCause() instanceof IgniteTxHeuristicException);
checkEmpty(key);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
index 39eb728..a46353e 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
@@ -21,6 +21,8 @@ import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
import org.gridgain.testframework.*;
import org.jetbrains.annotations.*;
+import javax.cache.*;
+import javax.cache.processor.*;
import java.util.*;
import java.util.concurrent.*;
@@ -422,17 +424,21 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb
info("Going to transform: " + key);
- GridTestUtils.assertThrows(log, new Callable<Void>() {
+ Throwable e = GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
- grid(0).cache(null).transform(key, new IgniteClosure<Object, Object>() {
- @Override public Object apply(Object o) {
- return 2;
+ grid(0).<Integer, Integer>jcache(null).invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ e.setValue(2);
+
+ return null;
}
});
return null;
}
- }, IgniteTxRollbackException.class, null);
+ }, CacheException.class, null);
+
+ assertTrue("Unexpected cause: " + e, e.getCause() instanceof IgniteTxRollbackException);
checkValue(key, putBefore);
}
@@ -588,41 +594,48 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb
this.fail = fail;
}
-
+ /** {@inheritDoc} */
@Nullable @Override public Object load(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException {
return null;
}
+ /** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args)
throws IgniteCheckedException {
if (fail)
throw new IgniteCheckedException("Store exception");
}
+ /** {@inheritDoc} */
@Override public void loadAll(@Nullable IgniteTx tx, Collection<?> keys, IgniteBiInClosure<Object, Object> c)
throws IgniteCheckedException {
}
+ /** {@inheritDoc} */
@Override public void put(@Nullable IgniteTx tx, Object key, Object val) throws IgniteCheckedException {
if (fail)
throw new IgniteCheckedException("Store exception");
}
+ /** {@inheritDoc} */
@Override public void putAll(@Nullable IgniteTx tx, Map<?, ?> map) throws IgniteCheckedException {
if (fail)
throw new IgniteCheckedException("Store exception");
}
+ /** {@inheritDoc} */
@Override public void remove(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException {
if (fail)
throw new IgniteCheckedException("Store exception");
}
+ /** {@inheritDoc} */
@Override public void removeAll(@Nullable IgniteTx tx, Collection<?> keys) throws IgniteCheckedException {
if (fail)
throw new IgniteCheckedException("Store exception");
}
+ /** {@inheritDoc} */
@Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException {
if (fail && commit)
throw new IgniteCheckedException("Store exception");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
index 5c7ba54..1ced531 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
@@ -19,9 +19,12 @@ import org.gridgain.grid.cache.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.grid.cache.affinity.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.testframework.junits.common.*;
+import javax.cache.processor.*;
+import java.io.*;
import java.util.*;
import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
@@ -68,7 +71,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
private UUID[] ids;
/** Caches. */
- private GridCache<Integer, Integer>[] caches;
+ private IgniteCache<Integer, Integer>[] caches;
/** Recorded events.*/
private ConcurrentHashSet<IgniteCacheEvent> evts;
@@ -157,14 +160,14 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
ignites = new Ignite[GRID_CNT];
ids = new UUID[GRID_CNT];
- caches = new GridCache[GRID_CNT];
+ caches = new IgniteCache[GRID_CNT];
for (int i = 0; i < GRID_CNT; i++) {
ignites[i] = grid(i);
ids[i] = ignites[i].cluster().localNode().id();
- caches[i] = ignites[i].cache(CACHE_NAME);
+ caches[i] = ignites[i].jcache(CACHE_NAME);
ignites[i].events().localListen(new IgnitePredicate<IgniteEvent>() {
@Override public boolean apply(IgniteEvent evt) {
@@ -184,7 +187,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
int key = 0;
while (true) {
- if (cacheMode != PARTITIONED || (caches[0].entry(key).primary() && caches[1].entry(key).backup())) {
+ if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) {
key1 = key++;
break;
@@ -194,7 +197,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
}
while (true) {
- if (cacheMode != PARTITIONED || (caches[0].entry(key).primary() && caches[1].entry(key).backup())) {
+ if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) {
key2 = key;
break;
@@ -226,6 +229,28 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
}
/**
+ * @param gridIdx Grid index.
+ * @param key Key.
+ * @return {@code True} if grid is primary for given key.
+ */
+ private boolean primary(int gridIdx, Object key) {
+ GridCacheAffinity<Object> aff = grid(0).cache(CACHE_NAME).affinity();
+
+ return aff.isPrimary(grid(gridIdx).cluster().localNode(), key);
+ }
+
+ /**
+ * @param gridIdx Grid index.
+ * @param key Key.
+ * @return {@code True} if grid is primary for given key.
+ */
+ private boolean backup(int gridIdx, Object key) {
+ GridCacheAffinity<Object> aff = grid(0).cache(CACHE_NAME).affinity();
+
+ return aff.isBackup(grid(gridIdx).cluster().localNode(), key);
+ }
+
+ /**
* Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/REPEATABLE_READ transaction.
*
* @throws Exception If failed.
@@ -423,13 +448,13 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
private void checkAtomic(GridCacheMode cacheMode) throws Exception {
initialize(cacheMode, ATOMIC, null, null);
- caches[0].transform(key1, new Transformer());
+ caches[0].invoke(key1, new Transformer());
checkEventNodeIdsStrict(primaryIdsForKeys(key1));
assert evts.isEmpty();
- caches[0].transformAll(keys, new Transformer());
+ caches[0].invokeAll(keys, new Transformer());
checkEventNodeIdsStrict(primaryIdsForKeys(key1, key2));
}
@@ -449,7 +474,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
System.out.println("BEFORE: " + evts.size());
- caches[0].transform(key1, new Transformer());
+ caches[0].invoke(key1, new Transformer());
System.out.println("AFTER: " + evts.size());
@@ -457,7 +482,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
assert evts.isEmpty();
- caches[0].transformAll(keys, new Transformer());
+ caches[0].invokeAll(keys, new Transformer());
checkEventNodeIdsStrict(idsForKeys(key1, key2));
}
@@ -500,9 +525,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
else if (cacheMode == PARTITIONED) {
for (int key : keys) {
for (int i = 0; i < GRID_CNT; i++) {
- GridCacheEntry<Integer, Integer> entry = caches[i].entry(key);
-
- if (entry.primary() || (!primaryOnly && entry.backup()))
+ if (primary(i, key) || (!primaryOnly && backup(i, key)))
res.add(ids[i]);
}
}
@@ -510,7 +533,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
else if (cacheMode == REPLICATED) {
for (int key : keys) {
if (primaryOnly)
- res.add(caches[0].affinity().mapKeyToNode(key).id());
+ res.add(grid(0).cache(CACHE_NAME).affinity().mapKeyToNode(key).id());
else
res.addAll(Arrays.asList(ids));
}
@@ -544,22 +567,19 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
}
if (foundEvt == null) {
- GridCache<Integer, Integer> affectedCache = null;
+ int gridIdx = -1;
for (int i = 0; i < GRID_CNT; i++) {
if (F.eq(this.ids[i], id)) {
- affectedCache = caches[i];
+ gridIdx = i;
break;
}
}
- GridCacheEntry<Integer, Integer> entry1 = affectedCache.entry(key1);
- GridCacheEntry<Integer, Integer> entry2 = affectedCache.entry(key2);
-
fail("Expected transform event was not triggered on the node [nodeId=" + id +
- ", key1Primary=" + entry1.primary() + ", key1Backup=" + entry1.backup() +
- ", key2Primary=" + entry2.primary() + ", key2Backup=" + entry2.backup() + ']');
+ ", key1Primary=" + primary(gridIdx, key1) + ", key1Backup=" + backup(gridIdx, key1) +
+ ", key2Primary=" + primary(gridIdx, key2) + ", key2Backup=" + backup(gridIdx, key2) + ']');
}
else
evts.remove(foundEvt);
@@ -570,10 +590,12 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
/**
* Transform closure.
*/
- private static class Transformer implements IgniteClosure<Integer, Integer> {
+ private static class Transformer implements EntryProcessor<Integer, Integer, Void>, Serializable {
/** {@inheritDoc} */
- @Override public Integer apply(Integer val) {
- return ++val;
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ e.setValue(e.getValue() + 1);
+
+ return null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
index 2227e56..2b1fdff 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
@@ -9,14 +9,15 @@
package org.gridgain.grid.kernal.processors.cache.distributed;
+import org.apache.ignite.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.util.typedef.*;
import org.gridgain.testframework.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -58,7 +59,7 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT
* @throws Exception If failed.
*/
public void testRemoteTxPreloading() throws Exception {
- GridCache<String, Integer> cache = cache(0);
+ IgniteCache<String, Integer> cache = jcache(0);
for (int i = 0; i < 10000; i++)
cache.put(String.valueOf(i), 0);
@@ -86,12 +87,21 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT
for (int i = 0; i < 10; i++)
keys.add(String.valueOf(i * 1000));
- cache.transformAll(keys, new C1<Integer, Integer>() {
- @Override public Integer apply(Integer val) {
- if (val == null)
+ cache.invokeAll(keys, new EntryProcessor<String, Integer, Void>() {
+ @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+ Integer val = e.getValue();
+
+ if (val == null) {
keyNotLoaded = true;
- return val + 1;
+ e.setValue(1);
+
+ return null;
+ }
+
+ e.setValue(val + 1);
+
+ return null;
}
});
@@ -135,7 +145,7 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT
for (int i = 0; i < 10000; i++)
map.put(String.valueOf(i), 0);
- GridCache<String, Integer> cache0 = cache(0);
+ IgniteCache<String, Integer> cache0 = jcache(0);
cache0.putAll(map);
@@ -148,18 +158,26 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT
startGrid(i);
- GridCache<String, Integer> cache = cache(i);
+ IgniteCache<String, Integer> cache = jcache(i);
+
+ IgniteTransactions txs = ignite(i).transactions();
+
+ try (IgniteTx tx = txs.txStart(txConcurrency, IgniteTxIsolation.READ_COMMITTED)) {
+ cache.invoke(TX_KEY, new EntryProcessor<String, Integer, Void>() {
+ @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+ Integer val = e.getValue();
- try (IgniteTx tx = cache.txStart(txConcurrency, IgniteTxIsolation.READ_COMMITTED)) {
- cache.transform(TX_KEY, new C1<Integer, Integer>() {
- @Override public Integer apply(Integer val) {
if (val == null) {
keyNotLoaded = true;
- return 1;
+ e.setValue(1);
+
+ return null;
}
- return val + 1;
+ e.setValue(val + 1);
+
+ return null;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
index 5635ac8..d4d3396 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
@@ -9,8 +9,9 @@
package org.gridgain.grid.kernal.processors.cache.distributed.dht;
+import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
@@ -19,6 +20,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.gridgain.testframework.junits.common.*;
+import javax.cache.processor.*;
import java.util.*;
import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
@@ -55,16 +57,23 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri
/** IP finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
- /** Value increment closure. */
- private static final IgniteClosure<Integer, Integer> INCR_CLOS = new IgniteClosure<Integer, Integer>() {
- @Override public Integer apply(Integer src) {
- return src == null ? 1 : src + 1;
+ /** Value increment processor. */
+ private static final EntryProcessor<String, Integer, Void> INCR_CLOS = new EntryProcessor<String, Integer, Void>() {
+ @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+ if (!e.exists())
+ e.setValue(1);
+ else
+ e.setValue(e.getValue() + 1);
+
+ return null;
}
};
- /** Value remove closure. */
- private static final IgniteClosure<Integer, Integer> RMV_CLOS = new IgniteClosure<Integer, Integer>() {
- @Override public Integer apply(Integer src) {
+ /** Value remove processor. */
+ private static final EntryProcessor<String, Integer, Void> RMV_CLOS = new EntryProcessor<String, Integer, Void>() {
+ @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+ e.remove();
+
return null;
}
};
@@ -82,6 +91,8 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(IP_FINDER);
@@ -221,7 +232,7 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri
* @throws Exception If failed.
*/
protected void checkTransform(IgniteTxConcurrency concurrency, int nodeType, int op) throws Exception {
- GridCacheProjection<String, Integer> cache = cache(0);
+ IgniteCache<String, Integer> cache = jcache(0);
Collection<String> keys = keysForType(nodeType);
@@ -233,18 +244,18 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri
nearStore.reset();
for (String key : keys)
- cache.clear(key);
+ cache(0).clear(key);
info(">>> Starting transform transaction");
- try (IgniteTx tx = cache.txStart(concurrency, READ_COMMITTED)) {
+ try (IgniteTx tx = ignite(0).transactions().txStart(concurrency, READ_COMMITTED)) {
if (op == OP_UPDATE) {
for (String key : keys)
- cache.transform(key, INCR_CLOS);
+ cache.invoke(key, INCR_CLOS);
}
else {
for (String key : keys)
- cache.transform(key, RMV_CLOS);
+ cache.invoke(key, RMV_CLOS);
}
tx.commit();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
index f84d006..ab2024a 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
@@ -11,8 +11,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht;
import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.affinity.*;
import org.gridgain.grid.kernal.*;
@@ -23,6 +21,8 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.testframework.junits.common.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
+import java.io.*;
import java.util.*;
import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
@@ -255,9 +255,9 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
Ignite ignite0 = grid(0);
- GridCache<Integer, Integer> cache0 = ignite0.cache(null);
+ IgniteCache<Integer, Integer> cache0 = ignite0.jcache(null);
- GridCacheAffinity<Integer> aff = cache0.affinity();
+ GridCacheAffinity<Object> aff = cache(0).affinity();
UUID id0 = ignite0.cluster().localNode().id();
@@ -265,7 +265,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
log.info("Transform from primary.");
- cache0.transform(primaryKey, new TransformClosure(primaryKey));
+ cache0.invoke(primaryKey, new Processor(primaryKey));
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), primaryKey, primaryKey, false);
@@ -275,7 +275,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
log.info("Transform from backup.");
- cache0.transform(backupKey, new TransformClosure(backupKey));
+ cache0.invoke(backupKey, new Processor(backupKey));
for (int i = 0; i < GRID_CNT; i++)
checkEntry(grid(i), backupKey, backupKey, false);
@@ -285,7 +285,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
log.info("Transform from near.");
- cache0.transform(nearKey, new TransformClosure(nearKey));
+ cache0.invoke(nearKey, new Processor(nearKey));
for (int i = 0; i < GRID_CNT; i++) {
UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
@@ -302,11 +302,11 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
for (int i = 0; i < GRID_CNT; i++) {
delay();
- GridCache<Integer, Integer> cache = grid(i).cache(null);
+ IgniteCache<Integer, Integer> cache = grid(i).jcache(null);
log.info("Transform [grid=" + grid(i).name() + ", val=" + val + ']');
- cache.transform(nearKey, new TransformClosure(val));
+ cache.invoke(nearKey, new Processor(val));
if (!aff.isPrimaryOrBackup(grid(i).localNode(), nearKey))
readers.add(grid(i).localNode().id());
@@ -332,53 +332,53 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
Ignite ignite0 = grid(0);
- GridCache<Integer, Integer> cache0 = ignite0.cache(null);
+ IgniteCache<Integer, Integer> cache0 = ignite0.jcache(null);
- GridCacheAffinity<Integer> aff = cache0.affinity();
+ GridCacheAffinity<Object> aff = ignite0.cache(null).affinity();
UUID id0 = ignite0.cluster().localNode().id();
- Map<Integer, TransformClosure> primaryKeys = new HashMap<>();
+ Set<Integer> primaryKeys = new HashSet<>();
for (int i = 0; i < 10; i++)
- primaryKeys.put(key(ignite0, PRIMARY), new TransformClosure(1));
+ primaryKeys.add(key(ignite0, PRIMARY));
log.info("TransformAll from primary.");
- cache0.transformAll(primaryKeys);
+ cache0.invokeAll(primaryKeys, new Processor(1));
for (int i = 0; i < GRID_CNT; i++) {
- for (Integer primaryKey : primaryKeys.keySet())
+ for (Integer primaryKey : primaryKeys)
checkEntry(grid(i), primaryKey, 1, false);
}
if (backups > 0) {
- Map<Integer, TransformClosure> backupKeys = new HashMap<>();
+ Set<Integer> backupKeys = new HashSet<>();
for (int i = 0; i < 10; i++)
- backupKeys.put(key(ignite0, BACKUP), new TransformClosure(2));
+ backupKeys.add(key(ignite0, BACKUP));
log.info("TransformAll from backup.");
- cache0.transformAll(backupKeys);
+ cache0.invokeAll(backupKeys, new Processor(2));
for (int i = 0; i < GRID_CNT; i++) {
- for (Integer backupKey : backupKeys.keySet())
+ for (Integer backupKey : backupKeys)
checkEntry(grid(i), backupKey, 2, false);
}
}
- Map<Integer, TransformClosure> nearKeys = new HashMap<>();
+ Set<Integer> nearKeys = new HashSet<>();
for (int i = 0; i < 30; i++)
- nearKeys.put(key(ignite0, NOT_PRIMARY_AND_BACKUP), new TransformClosure(3));
+ nearKeys.add(key(ignite0, NOT_PRIMARY_AND_BACKUP));
log.info("TransformAll from near.");
- cache0.transformAll(nearKeys);
+ cache0.invokeAll(nearKeys, new Processor(3));
for (int i = 0; i < GRID_CNT; i++) {
- for (Integer nearKey : nearKeys.keySet()) {
+ for (Integer nearKey : nearKeys) {
UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
checkEntry(grid(i), nearKey, 3, i == 0, expReaders);
@@ -387,7 +387,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
Map<Integer, Collection<UUID>> readersMap = new HashMap<>();
- for (Integer key : nearKeys.keySet())
+ for (Integer key : nearKeys)
readersMap.put(key, new HashSet<UUID>());
int val = 4;
@@ -395,22 +395,22 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
for (int i = 0; i < GRID_CNT; i++) {
delay();
- GridCache<Integer, Integer> cache = grid(i).cache(null);
+ IgniteCache<Integer, Integer> cache = grid(i).jcache(null);
- for (Integer key : nearKeys.keySet())
- nearKeys.put(key, new TransformClosure(val));
+ for (Integer key : nearKeys)
+ nearKeys.add(key);
log.info("TransformAll [grid=" + grid(i).name() + ", val=" + val + ']');
- cache.transformAll(nearKeys);
+ cache.invokeAll(nearKeys, new Processor(val));
- for (Integer key : nearKeys.keySet()) {
+ for (Integer key : nearKeys) {
if (!aff.isPrimaryOrBackup(grid(i).localNode(), key))
readersMap.get(key).add(grid(i).localNode().id());
}
for (int j = 0; j < GRID_CNT; j++) {
- for (Integer key : nearKeys.keySet()) {
+ for (Integer key : nearKeys) {
boolean primaryNode = aff.isPrimary(grid(j).localNode(), key);
Collection<UUID> readers = readersMap.get(key);
@@ -789,21 +789,23 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
}
/**
+ *
*/
- private static class TransformClosure implements IgniteClosure<Integer, Integer> {
+ private static class Processor implements EntryProcessor<Integer, Integer, Void>, Serializable {
/** */
private final Integer newVal;
/**
* @param newVal New value.
*/
- private TransformClosure(Integer newVal) {
+ private Processor(Integer newVal) {
this.newVal = newVal;
}
- /** {@inheritDoc} */
- @Override public Integer apply(Integer val) {
- return newVal;
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ e.setValue(newVal);
+
+ return null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
index 252ac3a..402cb24 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
@@ -10,13 +10,14 @@
package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic;
import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.testframework.*;
import org.jdk8.backport.*;
+import javax.cache.processor.*;
+import java.io.*;
import java.util.concurrent.atomic.*;
import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
@@ -62,14 +63,14 @@ public class GridCacheValueConsistencyAtomicSelfTest extends GridCacheValueConsi
for (int i = rangeStart; i < rangeStart + range; i++) {
int idx = ThreadLocalRandom8.current().nextInt(gridCount());
- GridCacheProjection<Integer, Integer> cache = grid(idx).cache(null);
+ IgniteCache<Integer, Integer> cache = grid(idx).jcache(null);
cache = cache.flagsOn(GridCacheFlag.FORCE_TRANSFORM_BACKUP);
- cache.transform(i, new Transformer(i));
+ cache.invoke(i, new Transformer(i));
}
}
- catch (IgniteCheckedException e) {
+ catch (Exception e) {
throw new IgniteException(e);
}
}
@@ -102,20 +103,28 @@ public class GridCacheValueConsistencyAtomicSelfTest extends GridCacheValueConsi
/**
*
*/
- private static class Transformer implements IgniteClosure<Integer, Integer> {
+ private static class Transformer implements EntryProcessor<Integer, Integer, Void>, Serializable {
+ /** */
private int key;
+ /**
+ * @param key Key.
+ */
private Transformer(int key) {
this.key = key;
}
/** {@inheritDoc} */
- @Override public Integer apply(Integer old) {
+ @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+ Integer old = e.getValue();
+
if (key < 5)
System.err.println(Thread.currentThread().getName() + " <> Transforming value [key=" + key +
", val=" + old + ']');
- return old == null ? 1 : old + 1;
+ e.setValue(old == null ? 1 : old + 1);
+
+ return null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
index 3c56237..96ab5fb 100644
--- a/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
@@ -1402,7 +1402,8 @@ public abstract class GridAbstractTest extends TestCase {
int cnt = 0;
for (Method m : GridAbstractTest.this.getClass().getMethods())
- if (m.getDeclaringClass().getName().startsWith("org.gridgain")) {
+ if (m.getDeclaringClass().getName().startsWith("org.gridgain") ||
+ m.getDeclaringClass().getName().startsWith("org.apache.ignite")) {
if (m.getName().startsWith("test") && Modifier.isPublic(m.getModifiers()))
cnt++;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
index 9fc3ff7..f40d941 100644
--- a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
@@ -61,6 +61,14 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
/**
* @param idx Grid index.
+ * @return Cache.
+ */
+ protected <K, V> IgniteCache<K, V> jcache(int idx) {
+ return grid(idx).jcache(null);
+ }
+
+ /**
+ * @param idx Grid index.
* @param name Cache name.
* @return Cache.
*/
@@ -275,7 +283,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
/**
* @param cache Cache.
- * @return Collection of keys for which given cache is primary.
+ * @return Key for which given cache is primary.
* @throws IgniteCheckedException If failed.
*/
protected Integer primaryKey(GridCacheProjection<?, ?> cache)
@@ -327,7 +335,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
/**
* @param cache Cache.
- * @return Collection of keys for which given cache is backup.
+ * @return Key for which given cache is backup.
* @throws IgniteCheckedException If failed.
*/
protected Integer backupKey(GridCacheProjection<?, ?> cache)
@@ -379,7 +387,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
/**
* @param cache Cache.
- * @return Collection of keys for which given cache is neither primary nor backup.
+ * @return Keys for which given cache is neither primary nor backup.
* @throws IgniteCheckedException If failed.
*/
protected Integer nearKey(GridCacheProjection<?, ?> cache)
@@ -472,6 +480,42 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
}
/**
+ * @param cache Cache.
+ * @return Collection of keys for which given cache is primary.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected Integer primaryKey(IgniteCache<?, ?> cache)
+ throws IgniteCheckedException {
+ GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate");
+
+ return primaryKey(prj);
+ }
+
+ /**
+ * @param cache Cache.
+ * @return Keys for which given cache is backup.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected Integer backupKey(IgniteCache<?, ?> cache)
+ throws IgniteCheckedException {
+ GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate");
+
+ return backupKey(prj);
+ }
+
+ /**
+ * @param cache Cache.
+ * @return Key for which given cache is neither primary nor backup.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected Integer nearKey(IgniteCache<?, ?> cache)
+ throws IgniteCheckedException {
+ GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate");
+
+ return nearKey(prj);
+ }
+
+ /**
* @param comp Compute.
* @param task Task.
* @param arg Task argument.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
index e642753..d9a5e22 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
@@ -10,6 +10,7 @@
package org.gridgain.testsuites.bamboo;
import junit.framework.*;
+import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.expiry.*;
import org.gridgain.grid.*;
import org.gridgain.grid.cache.affinity.fair.*;
@@ -31,14 +32,24 @@ import org.gridgain.testsuites.*;
*/
public class GridDataGridTestSuite extends TestSuite {
/**
- * @return GridGain TeamCity in-memory data grid test suite.
+ * @return IgniteCache test suite.
* @throws Exception Thrown in case of the failure.
*/
public static TestSuite suite() throws Exception {
- TestSuite suite = new TestSuite("Gridgain In-Memory Data Grid Test Suite");
+ TestSuite suite = new TestSuite("IgniteCache Test Suite");
suite.addTest(IgniteCacheExpiryPolicyTestSuite.suite());
+ suite.addTestSuite(IgniteCacheAtomicInvokeTest.class);
+ suite.addTestSuite(IgniteCacheAtomicNearEnabledInvokeTest.class);
+ suite.addTestSuite(IgniteCacheAtomicPrimaryWriteOrderInvokeTest.class);
+ suite.addTestSuite(IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.class);
+ suite.addTestSuite(IgniteCacheAtomicLocalInvokeTest.class);
+ suite.addTestSuite(IgniteCacheAtomicLocalWithStoreInvokeTest.class);
+ suite.addTestSuite(IgniteCacheTxInvokeTest.class);
+ suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class);
+ suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class);
+
// Affinity tests.
suite.addTestSuite(GridCachePartitionFairAffinityNodesSelfTest.class);
suite.addTestSuite(GridCacheAffinityBackupsSelfTest.class);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
index 75cc266..23d98c1 100644
--- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
@@ -29,6 +29,8 @@ import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
+import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -46,10 +48,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
private final GridMutex mux = new GridMutex();
/** */
- private volatile GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj;
+ private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj;
/** Projection with expiry policy for finished job updates. */
- private volatile GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj;
+ private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj;
/** Map-reduce execution planner. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
@@ -96,8 +98,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @return Job meta projection.
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
- private GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> jobMetaCache() {
- GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> prj = jobMetaPrj;
+ private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaCache() {
+ GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = jobMetaPrj;
if (prj == null) {
synchronized (mux) {
@@ -118,7 +120,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
throw new IllegalStateException(e);
}
- jobMetaPrj = prj = sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class);
+ jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>)
+ sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class);
if (ctx.configuration().getFinishedJobInfoTtl() > 0) {
ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
@@ -139,8 +142,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
/**
* @return Projection with expiry policy for finished job updates.
*/
- private GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() {
- GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj;
+ private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() {
+ GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj;
if (prj == null) {
jobMetaCache();
@@ -430,10 +433,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)):
"Missing local state for finished task [info=" + info + ", status=" + status + ']';
- StackedClosure incrCntrs = null;
+ StackedProcessor incrCntrs = null;
if (status.state() == COMPLETED)
- incrCntrs = new IncrementCountersClosure(null, status.counters());
+ incrCntrs = new IncrementCountersProcessor(null, status.counters());
switch (info.type()) {
case SETUP: {
@@ -462,9 +465,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
case COMMIT:
case ABORT: {
- GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache();
+ GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache();
- cache.transformAsync(info.jobId(), new UpdatePhaseClosure(incrCntrs, PHASE_COMPLETE)).
+ cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)).
listenAsync(failsLog);
break;
@@ -480,8 +483,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param jobId Job id.
* @param c Closure of operation.
*/
- private void transform(GridHadoopJobId jobId, IgniteClosure<GridHadoopJobMetadata, GridHadoopJobMetadata> c) {
- jobMetaCache().transformAsync(jobId, c).listenAsync(failsLog);
+ private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void> c) {
+ jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog);
}
/**
@@ -493,7 +496,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
*/
public void onExternalMappersInitialized(GridHadoopJobId jobId, Collection<Integer> reducers,
GridHadoopProcessDescriptor desc) {
- transform(jobId, new InitializeReducersClosure(null, reducers, desc));
+ transform(jobId, new InitializeReducersProcessor(null, reducers, desc));
}
/**
@@ -601,7 +604,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
}
if (cancelSplits != null || cancelReducers != null)
- jobMetaCache().transform(meta.jobId(), new CancelJobClosure(null, new IgniteCheckedException(
+ jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException(
"One or more nodes participating in map-reduce job execution failed."), cancelSplits,
cancelReducers));
}
@@ -615,8 +618,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
/**
* @param updated Updated cache entries.
+ * @throws IgniteCheckedException If failed.
*/
- private void processJobMetadataUpdates(Iterable<Map.Entry<GridHadoopJobId, GridHadoopJobMetadata>> updated) throws IgniteCheckedException {
+ private void processJobMetadataUpdates(Iterable<Map.Entry<GridHadoopJobId, GridHadoopJobMetadata>> updated)
+ throws IgniteCheckedException {
UUID locNodeId = ctx.localNodeId();
for (Map.Entry<GridHadoopJobId, GridHadoopJobMetadata> entry : updated) {
@@ -637,7 +642,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
U.error(log, "Failed to process job state changed callback (will fail the job) " +
"[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e);
- transform(jobId, new CancelJobClosure(null, e));
+ transform(jobId, new CancelJobProcessor(null, e));
continue;
}
@@ -780,7 +785,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
}
if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty())
- transform(jobId, new CancelJobClosure(null, cancelMappers, cancelReducers));
+ transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers));
}
break;
@@ -1017,7 +1022,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) {
GridHadoopTaskCancelledException err = new GridHadoopTaskCancelledException("Job cancelled.");
- jobMetaCache().transform(jobId, new CancelJobClosure(null, err));
+ jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err));
}
}
finally {
@@ -1146,13 +1151,13 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param status Task status.
* @param prev Previous closure.
*/
- private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedClosure prev) {
+ private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) {
final GridHadoopJobId jobId = taskInfo.jobId();
if (status.state() == FAILED || status.state() == CRASHED)
- transform(jobId, new CancelJobClosure(prev, status.failCause()));
+ transform(jobId, new CancelJobProcessor(prev, status.failCause()));
else
- transform(jobId, new UpdatePhaseClosure(prev, PHASE_MAP));
+ transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP));
}
/**
@@ -1161,14 +1166,14 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param prev Previous closure.
*/
private void onMapFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status,
- final StackedClosure prev) {
+ final StackedProcessor prev) {
final GridHadoopJobId jobId = taskInfo.jobId();
boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size();
if (status.state() == FAILED || status.state() == CRASHED) {
// Fail the whole job.
- transform(jobId, new RemoveMappersClosure(prev, taskInfo.inputSplit(), status.failCause()));
+ transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause()));
return;
}
@@ -1186,7 +1191,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
}
}
- transform(jobId, new RemoveMappersClosure(prev, taskInfo.inputSplit(), err));
+ transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err));
}
};
@@ -1201,13 +1206,13 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param status Task status.
* @param prev Previous closure.
*/
- private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedClosure prev) {
+ private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) {
GridHadoopJobId jobId = taskInfo.jobId();
if (status.state() == FAILED || status.state() == CRASHED)
// Fail the whole job.
- transform(jobId, new RemoveReducerClosure(prev, taskInfo.taskNumber(), status.failCause()));
+ transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause()));
else
- transform(jobId, new RemoveReducerClosure(prev, taskInfo.taskNumber()));
+ transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber()));
}
/**
@@ -1216,12 +1221,12 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param prev Previous closure.
*/
private void onCombineFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status,
- final StackedClosure prev) {
+ final StackedProcessor prev) {
final GridHadoopJobId jobId = taskInfo.jobId();
if (status.state() == FAILED || status.state() == CRASHED)
// Fail the whole job.
- transform(jobId, new RemoveMappersClosure(prev, currMappers, status.failCause()));
+ transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause()));
else {
ctx.shuffle().flush(jobId).listenAsync(new CIX1<IgniteFuture<?>>() {
@Override public void applyx(IgniteFuture<?> f) {
@@ -1236,7 +1241,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
}
}
- transform(jobId, new RemoveMappersClosure(prev, currMappers, err));
+ transform(jobId, new RemoveMappersProcessor(prev, currMappers, err));
}
});
}
@@ -1272,7 +1277,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
/**
* Update job phase transform closure.
*/
- private static class UpdatePhaseClosure extends StackedClosure {
+ private static class UpdatePhaseProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
@@ -1283,7 +1288,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param prev Previous closure.
* @param phase Phase to update.
*/
- private UpdatePhaseClosure(@Nullable StackedClosure prev, GridHadoopJobPhase phase) {
+ private UpdatePhaseProcessor(@Nullable StackedProcessor prev, GridHadoopJobPhase phase) {
super(prev);
this.phase = phase;
@@ -1298,7 +1303,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
/**
* Remove mapper transform closure.
*/
- private static class RemoveMappersClosure extends StackedClosure {
+ private static class RemoveMappersProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
@@ -1313,7 +1318,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param split Mapper split to remove.
* @param err Error.
*/
- private RemoveMappersClosure(@Nullable StackedClosure prev, GridHadoopInputSplit split, Throwable err) {
+ private RemoveMappersProcessor(@Nullable StackedProcessor prev, GridHadoopInputSplit split, Throwable err) {
this(prev, Collections.singletonList(split), err);
}
@@ -1321,8 +1326,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param prev Previous closure.
* @param splits Mapper splits to remove.
*/
- private RemoveMappersClosure(@Nullable StackedClosure prev, Collection<GridHadoopInputSplit> splits,
- Throwable err) {
+ private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<GridHadoopInputSplit> splits,
+ Throwable err) {
super(prev);
this.splits = splits;
@@ -1354,7 +1359,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
/**
* Remove reducer transform closure.
*/
- private static class RemoveReducerClosure extends StackedClosure {
+ private static class RemoveReducerProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
@@ -1368,7 +1373,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param prev Previous closure.
* @param rdc Reducer to remove.
*/
- private RemoveReducerClosure(@Nullable StackedClosure prev, int rdc) {
+ private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) {
super(prev);
this.rdc = rdc;
@@ -1378,7 +1383,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param prev Previous closure.
* @param rdc Reducer to remove.
*/
- private RemoveReducerClosure(@Nullable StackedClosure prev, int rdc, Throwable err) {
+ private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) {
super(prev);
this.rdc = rdc;
@@ -1403,7 +1408,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
/**
* Initialize reducers.
*/
- private static class InitializeReducersClosure extends StackedClosure {
+ private static class InitializeReducersProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
@@ -1418,7 +1423,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param rdc Reducers to initialize.
* @param desc External process descriptor.
*/
- private InitializeReducersClosure(@Nullable StackedClosure prev, Collection<Integer> rdc,
+ private InitializeReducersProcessor(@Nullable StackedProcessor prev,
+ Collection<Integer> rdc,
GridHadoopProcessDescriptor desc) {
super(prev);
@@ -1446,7 +1452,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
/**
* Remove reducer transform closure.
*/
- private static class CancelJobClosure extends StackedClosure {
+ private static class CancelJobProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
@@ -1463,7 +1469,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param prev Previous closure.
* @param err Fail cause.
*/
- private CancelJobClosure(@Nullable StackedClosure prev, Throwable err) {
+ private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) {
this(prev, err, null, null);
}
@@ -1472,7 +1478,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param splits Splits to remove.
* @param rdc Reducers to remove.
*/
- private CancelJobClosure(@Nullable StackedClosure prev, Collection<GridHadoopInputSplit> splits,
+ private CancelJobProcessor(@Nullable StackedProcessor prev,
+ Collection<GridHadoopInputSplit> splits,
Collection<Integer> rdc) {
this(prev, null, splits, rdc);
}
@@ -1483,7 +1490,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param splits Splits to remove.
* @param rdc Reducers to remove.
*/
- private CancelJobClosure(@Nullable StackedClosure prev, Throwable err, Collection<GridHadoopInputSplit> splits,
+ private CancelJobProcessor(@Nullable StackedProcessor prev,
+ Throwable err,
+ Collection<GridHadoopInputSplit> splits,
Collection<Integer> rdc) {
super(prev);
@@ -1522,7 +1531,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
/**
* Increment counter values closure.
*/
- private static class IncrementCountersClosure extends StackedClosure {
+ private static class IncrementCountersProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
@@ -1533,7 +1542,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
* @param prev Previous closure.
* @param counters Task counters to add into job counters.
*/
- private IncrementCountersClosure(@Nullable StackedClosure prev, GridHadoopCounters counters) {
+ private IncrementCountersProcessor(@Nullable StackedProcessor prev, GridHadoopCounters counters) {
super(prev);
assert counters != null;
@@ -1554,22 +1563,33 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
/**
* Abstract stacked closure.
*/
- private abstract static class StackedClosure implements IgniteClosure<GridHadoopJobMetadata, GridHadoopJobMetadata> {
+ private abstract static class StackedProcessor implements
+ EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void>, Serializable {
/** */
private static final long serialVersionUID = 0L;
/** */
- private final StackedClosure prev;
+ private final StackedProcessor prev;
/**
* @param prev Previous closure.
*/
- private StackedClosure(@Nullable StackedClosure prev) {
+ private StackedProcessor(@Nullable StackedProcessor prev) {
this.prev = prev;
}
/** {@inheritDoc} */
- @Override public final GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) {
+ @Override public Void process(MutableEntry<GridHadoopJobId, GridHadoopJobMetadata> e, Object... args) {
+ e.setValue(apply(e.getValue()));
+
+ return null;
+ }
+
+ /**
+ * @param meta Old value.
+ * @return New value.
+ */
+ private GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) {
if (meta == null)
return null;
[3/3] incubator-ignite git commit: # ignite-44
Posted by sb...@apache.org.
# ignite-44
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bcb30d10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bcb30d10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bcb30d10
Branch: refs/heads/ignite-44
Commit: bcb30d10471e2764b8b6e2928cdb50f04971b618
Parents: 928aa3d
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 25 10:33:52 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 25 17:29:09 2014 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 176 +++++++++++++
.../cache/CachePartialUpdateException.java | 36 +++
.../processors/cache/IgniteCacheProxy.java | 255 ++++++++++++++++---
.../grid/cache/GridCacheInterceptor.java | 8 +-
.../cache/GridCachePartialUpdateException.java | 1 +
.../processors/cache/GridCacheAdapter.java | 83 +++++-
.../processors/cache/GridCacheEntryEx.java | 6 +-
.../processors/cache/GridCacheMapEntry.java | 38 ++-
.../processors/cache/GridCacheProcessor.java | 16 +-
.../processors/cache/GridCacheProjectionEx.java | 26 +-
.../cache/GridCacheProjectionImpl.java | 21 +-
.../processors/cache/GridCacheProxyImpl.java | 36 ++-
.../processors/cache/GridCacheStoreManager.java | 7 +-
.../dht/atomic/GridDhtAtomicCache.java | 56 ++--
.../distributed/near/GridNearAtomicCache.java | 28 ++
.../local/atomic/GridLocalAtomicCache.java | 229 +++++++++++++++--
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../transactions/IgniteTxLocalAdapter.java | 49 +++-
.../cache/transactions/IgniteTxLocalEx.java | 2 +-
.../processors/ggfs/GridGgfsDataManager.java | 26 +-
.../processors/ggfs/GridGgfsMetaManager.java | 72 ++++--
.../cache/IgniteCacheAtomicInvokeTest.java | 47 ++++
.../cache/IgniteCacheAtomicLocalInvokeTest.java | 41 +++
...niteCacheAtomicLocalWithStoreInvokeTest.java | 22 ++
.../IgniteCacheAtomicNearEnabledInvokeTest.java | 24 ++
.../cache/IgniteCacheInvokeAbstractTest.java | 21 +-
.../cache/IgniteCacheTxLocalInvokeTest.java | 41 +++
.../IgniteCacheTxNearEnabledInvokeTest.java | 24 ++
.../IgniteCacheExpiryPolicyAbstractTest.java | 7 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 171 ++++++-------
...idCacheGetAndTransformStoreAbstractTest.java | 21 +-
.../cache/GridCacheIncrementTransformTest.java | 19 +-
.../GridCacheInterceptorAbstractSelfTest.java | 66 +++--
...ridCacheMultinodeUpdateAbstractSelfTest.java | 28 +-
...HeapMultiThreadedUpdateAbstractSelfTest.java | 21 +-
...CacheOffHeapMultiThreadedUpdateSelfTest.java | 9 +-
.../GridCacheOffHeapTieredAbstractSelfTest.java | 45 ++--
...heOffHeapTieredEvictionAbstractSelfTest.java | 32 ++-
.../GridCacheReturnValueTransferSelfTest.java | 33 ++-
.../processors/cache/GridCacheTestEntryEx.java | 3 +-
.../IgniteTxExceptionAbstractSelfTest.java | 16 +-
.../IgniteTxStoreExceptionAbstractSelfTest.java | 25 +-
.../GridCacheTransformEventSelfTest.java | 68 +++--
.../IgniteTxPreloadAbstractTest.java | 44 +++-
...heAbstractTransformWriteThroughSelfTest.java | 37 ++-
.../dht/GridCacheAtomicNearCacheSelfTest.java | 72 +++---
...GridCacheValueConsistencyAtomicSelfTest.java | 23 +-
.../testframework/junits/GridAbstractTest.java | 3 +-
.../junits/common/GridCommonAbstractTest.java | 50 +++-
.../bamboo/GridDataGridTestSuite.java | 15 +-
.../hadoop/jobtracker/GridHadoopJobTracker.java | 124 +++++----
51 files changed, 1801 insertions(+), 524 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index a59573e..f51c237 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -13,6 +13,7 @@ import org.apache.ignite.cache.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
import org.jetbrains.annotations.*;
import javax.cache.*;
@@ -298,4 +299,179 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* @return Cache size on this node.
*/
public int localSize(CachePeekMode... peekModes);
+
+ /**
+ * Stores given key-value pair in cache. If filters are provided, then entries will
+ * be stored in cache only if they pass the filter. Note that filter check is atomic,
+ * so value stored in cache is guaranteed to be consistent with the filters. If cache
+ * previously contained value for the given key, then this value is returned.
+ * In case of {@link GridCacheMode#PARTITIONED} or {@link GridCacheMode#REPLICATED} caches,
+ * the value will be loaded from the primary node, which in its turn may load the value
+ * from the swap storage, and consecutively, if it's not in swap,
+ * from the underlying persistent storage. If value has to be loaded from persistent
+ * storage, {@link org.gridgain.grid.cache.store.GridCacheStore#load(IgniteTx, Object)} method will be used.
+ * <p>
+ * If the returned value is not needed, method {@link #putIf(Object, Object, IgnitePredicate)} should
+ * always be used instead of this one to avoid the overhead associated with returning of the previous value.
+ * <p>
+ * If write-through is enabled, the stored value will be persisted to {@link org.gridgain.grid.cache.store.GridCacheStore}
+ * via {@link org.gridgain.grid.cache.store.GridCacheStore#put(IgniteTx, Object, Object)} method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ * <h2 class="header">Cache Flags</h2>
+ * This method is not available if any of the following flags are set on projection:
+ * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}.
+ *
+ * @param key Key to store in cache.
+ * @param val Value to be associated with the given key.
+ * @param filter Optional filter to check prior to putting value in cache. Note
+ * that filter check is atomic with put operation.
+ * @return Previous value associated with specified key, or {@code null}
+ * if entry did not pass the filter, or if there was no mapping for the key in swap
+ * or in persistent storage.
+ * @throws NullPointerException If either key or value are {@code null}.
+ * @throws GridCacheFlagException If projection flags validation failed.
+ */
+ // TODO IGNITE-1 fix entry type.
+ @Nullable public V getAndPutIf(K key, V val, @Nullable IgnitePredicate<GridCacheEntry<K, V>> filter);
+
+ /**
+ * Stores given key-value pair in cache. If filters are provided, then entries will
+ * be stored in cache only if they pass the filter. Note that filter check is atomic,
+ * so value stored in cache is guaranteed to be consistent with the filters.
+ * <p>
+ * This method will return {@code true} if value is stored in cache and {@code false} otherwise.
+ * Unlike {@link #getAndPutIf(Object, Object, IgnitePredicate)} method, it does not return previous
+ * value and, therefore, does not have any overhead associated with returning a value. It
+ * should be used whenever return value is not required.
+ * <p>
+ * If write-through is enabled, the stored value will be persisted to {@link org.gridgain.grid.cache.store.GridCacheStore}
+ * via {@link org.gridgain.grid.cache.store.GridCacheStore#put(IgniteTx, Object, Object)} method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ * <h2 class="header">Cache Flags</h2>
+ * This method is not available if any of the following flags are set on projection:
+ * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}.
+ *
+ * @param key Key to store in cache.
+ * @param val Value to be associated with the given key.
+ * @param filter Optional filter to check prior to putting value in cache. Note
+ * that filter check is atomic with put operation.
+ * @return {@code True} if optional filter passed and value was stored in cache,
+ * {@code false} otherwise. Note that this method will return {@code true} if filter is not
+ * specified.
+ * @throws NullPointerException If either key or value are {@code null}.
+ * @throws GridCacheFlagException If projection flags validation failed.
+ */
+ // TODO IGNITE-1 fix entry type.
+ public boolean putIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter);
+
+ /**
+ * Removes given key mapping from cache. If cache previously contained value for the given key,
+ * then this value is returned. In case of {@link GridCacheMode#PARTITIONED} or {@link GridCacheMode#REPLICATED}
+ * caches, the value will be loaded from the primary node, which in its turn may load the value
+ * from the disk-based swap storage, and consecutively, if it's not in swap,
+ * from the underlying persistent storage. If value has to be loaded from persistent
+ * storage, {@link org.gridgain.grid.cache.store.GridCacheStore#load(IgniteTx, Object)} method will be used.
+ * <p>
+ * If the returned value is not needed, method {@link #removeIf(Object, IgnitePredicate)} should
+ * always be used instead of this one to avoid the overhead associated with returning of the
+ * previous value.
+ * <p>
+ * If write-through is enabled, the value will be removed from {@link org.gridgain.grid.cache.store.GridCacheStore}
+ * via {@link org.gridgain.grid.cache.store.GridCacheStore#remove(IgniteTx, Object)} method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ * <h2 class="header">Cache Flags</h2>
+ * This method is not available if any of the following flags are set on projection:
+ * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}.
+ *
+ * @param key Key whose mapping is to be removed from cache.
+ * @param filter Optional filter to check prior to removing value form cache. Note
+ * that filter is checked atomically together with remove operation.
+ * @return Previous value associated with specified key, or {@code null}
+ * if there was no value for this key.
+ * @throws NullPointerException If key is {@code null}.
+ * @throws GridCacheFlagException If projection flags validation failed.
+ */
+ // TODO IGNITE-1 fix entry type.
+ public V getAndRemoveIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter);
+
+ /**
+ * Removes given key mapping from cache.
+ * <p>
+ * This method will return {@code true} if remove did occur, which means that all optionally
+ * provided filters have passed and there was something to remove, {@code false} otherwise.
+ * <p>
+ * If write-through is enabled, the value will be removed from {@link org.gridgain.grid.cache.store.GridCacheStore}
+ * via {@link org.gridgain.grid.cache.store.GridCacheStore#remove(IgniteTx, Object)} method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ * <h2 class="header">Cache Flags</h2>
+ * This method is not available if any of the following flags are set on projection:
+ * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}.
+ *
+ * @param key Key whose mapping is to be removed from cache.
+ * @param filter Optional filter to check prior to removing value form cache. Note
+ * that filter is checked atomically together with remove operation.
+ * @return {@code True} if filter passed validation and entry was removed, {@code false} otherwise.
+ * Note that if filter is not specified, this method will return {@code true}.
+ * @throws NullPointerException if the key is {@code null}.
+ * @throws GridCacheFlagException If projection flags validation failed.
+ */
+ // TODO IGNITE-1 fix entry type.
+ public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter);
+
+ /**
+ * Creates projection that will operate with portable objects.
+ * <p>
+ * Projection returned by this method will force cache not to deserialize portable objects,
+ * so keys and values will be returned from cache API methods without changes. Therefore,
+ * signature of the projection can contain only following types:
+ * <ul>
+ * <li>{@link org.apache.ignite.portables.PortableObject} for portable classes</li>
+ * <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li>
+ * <li>Arrays of primitives (byte[], int[], ...)</li>
+ * <li>{@link String} and array of {@link String}s</li>
+ * <li>{@link UUID} and array of {@link UUID}s</li>
+ * <li>{@link Date} and array of {@link Date}s</li>
+ * <li>{@link java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li>
+ * <li>Enums and array of enums</li>
+ * <li>
+ * Maps, collections and array of objects (but objects inside
+ * them will still be converted if they are portable)
+ * </li>
+ * </ul>
+ * <p>
+ * For example, if you use {@link Integer} as a key and {@code Value} class as a value
+ * (which will be stored in portable format), you should acquire following projection
+ * to avoid deserialization:
+ * <pre>
+ * GridCacheProjection<Integer, GridPortableObject> prj = cache.keepPortable();
+ *
+ * // Value is not deserialized and returned in portable format.
+ * GridPortableObject po = prj.get(1);
+ * </pre>
+ * <p>
+ * Note that this method makes sense only if cache is working in portable mode
+ * ({@link org.gridgain.grid.cache.GridCacheConfiguration#isPortableEnabled()} returns {@code true}. If not,
+ * this method is no-op and will return current projection.
+ *
+ * @return Projection for portable objects.
+ */
+ public <K1, V1> IgniteCache<K1, V1> keepPortable();
+
+ /**
+ * Gets cache projection base on this one, but with the specified flags turned on.
+ * <h1 class="header">Cache Flags</h1>
+ * The resulting projection will inherit all the flags from this projection.
+ *
+ * @param flags Flags to turn on (if empty, then no-op).
+ * @return New projection based on this one, but with the specified flags turned on.
+ */
+ public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java b/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java
new file mode 100644
index 0000000..08ce72e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java
@@ -0,0 +1,36 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.cache;
+
+import org.gridgain.grid.cache.*;
+
+import javax.cache.*;
+import java.util.*;
+
+/**
+ * Exception thrown from non-transactional cache in case when update succeeded only partially.
+ * One can get list of keys for which update failed with method {@link #failedKeys()}.
+ */
+public class CachePartialUpdateException extends CacheException {
+ /**
+ * @param e Cause.
+ */
+ public CachePartialUpdateException(GridCachePartialUpdateException e) {
+ super(e.getMessage(), e);
+ }
+
+ /**
+ * Gets collection of failed keys.
+ * @return Collection of failed keys.
+ */
+ public <K> Collection<K> failedKeys() {
+ return ((GridCachePartialUpdateException)getCause()).failedKeys();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 410fb9a..f7c157f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -13,8 +13,10 @@ import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.lang.*;
+import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
@@ -30,7 +32,7 @@ import java.util.concurrent.locks.*;
/**
* Cache proxy.
*/
-public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable {
+public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements IgniteCache<K, V>, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -51,10 +53,14 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
* @param ctx Context.
* @param delegate Delegate.
* @param prj Projection.
+ * @param async Async support flag.
*/
public IgniteCacheProxy(GridCacheContext<K, V> ctx,
GridCacheProjectionEx<K, V> delegate,
- @Nullable GridCacheProjectionImpl<K, V> prj) {
+ @Nullable GridCacheProjectionImpl<K, V> prj,
+ boolean async) {
+ super(async);
+
assert ctx != null;
assert delegate != null;
@@ -84,7 +90,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
try {
GridCacheProjectionEx<K, V> prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc);
- return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0);
+ return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0, isAsync());
}
finally {
gate.leave(prev);
@@ -98,12 +104,81 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
/** {@inheritDoc} */
- @Override public void localLoadCache(@Nullable IgniteBiPredicate p, @Nullable Object... args) throws CacheException {
+ @Override public void localLoadCache(@Nullable IgniteBiPredicate p, @Nullable Object... args)
+ throws CacheException {
// TODO IGNITE-1.
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
+ @Nullable @Override public V getAndPutIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) {
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.put(key, val, filter);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean putIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) {
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.putx(key, val, filter);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getAndRemoveIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) {
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.remove(key, filter);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) {
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.removex(key, filter);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
try {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
@@ -116,7 +191,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -215,7 +290,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -257,7 +332,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -274,7 +349,44 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
+ }
+ }
+
+ /**
+ * @param keys Keys.
+ * @return Values map.
+ */
+ public Map<K, V> getAll(Collection<? extends K> keys) {
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.getAll(keys);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /**
+ * Gets entry set containing internal entries.
+ *
+ * @param filter Filter.
+ * @return Entry set.
+ */
+ public Set<GridCacheEntry<K, V>> entrySetx(IgnitePredicate<GridCacheEntry<K, V>> filter) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.entrySetx(filter);
+ }
+ finally {
+ gate.leave(prev);
}
}
@@ -305,7 +417,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -322,7 +434,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -339,7 +451,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -356,7 +468,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -373,7 +485,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -390,7 +502,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -407,7 +519,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -424,7 +536,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -441,7 +553,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -458,7 +570,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -475,7 +587,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -498,16 +610,34 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
- EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args).get();
+ if (isAsync()) {
+ IgniteFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args);
+
+ IgniteFuture<T> fut0 = fut.chain(new CX1<IgniteFuture<EntryProcessorResult<T>>, T>() {
+ @Override public T applyx(IgniteFuture<EntryProcessorResult<T>> fut)
+ throws IgniteCheckedException {
+ EntryProcessorResult<T> res = fut.get();
+
+ return res.get();
+ }
+ });
+
+ curFut.set(fut0);
- return res.get();
+ return null;
+ }
+ else {
+ EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args);
+
+ return res.get();
+ }
}
finally {
gate.leave(prev);
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -519,14 +649,14 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
- return delegate.invokeAll(keys, entryProcessor, args).get();
+ return saveOrGet(delegate.invokeAllAsync(keys, entryProcessor, args));
}
finally {
gate.leave(prev);
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -618,20 +748,81 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public IgniteCache<K, V> enableAsync() {
- // TODO IGNITE-1.
- throw new UnsupportedOperationException();
+ if (isAsync())
+ return this;
+
+ return new IgniteCacheProxy<>(ctx, delegate, prj, true);
+ }
+
+ /**
+ * @param e Checked exception.
+ * @return Cache exception.
+ */
+ private CacheException cacheException(IgniteCheckedException e) {
+ if (e instanceof GridCachePartialUpdateException)
+ return new CachePartialUpdateException((GridCachePartialUpdateException)e);
+
+ return new CacheException(e);
}
/** {@inheritDoc} */
- @Override public boolean isAsync() {
- // TODO IGNITE-1.
- throw new UnsupportedOperationException();
+ @SuppressWarnings("unchecked")
+ @Override public <K1, V1> IgniteCache<K1, V1> keepPortable() {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ GridCacheProjectionImpl<K1, V1> prj0 = new GridCacheProjectionImpl<>(
+ (GridCacheProjection<K1, V1>)(prj != null ? prj : delegate),
+ (GridCacheContext<K1, V1>)ctx,
+ null,
+ null,
+ prj != null ? prj.flags() : null,
+ prj != null ? prj.subjectId() : null,
+ true,
+ prj != null ? prj.expiry() : null);
+
+ return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx,
+ prj0,
+ prj0,
+ isAsync());
+ }
+ finally {
+ gate.leave(prev);
+ }
}
/** {@inheritDoc} */
- @Override public <R> IgniteFuture<R> future() {
- // TODO IGNITE-1.
- throw new UnsupportedOperationException();
+ @Override public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ Set<GridCacheFlag> res = EnumSet.noneOf(GridCacheFlag.class);
+
+ Set<GridCacheFlag> flags0 = prj !=null ? prj.flags() : null;
+
+ if (flags0 != null && !flags0.isEmpty())
+ res.addAll(flags0);
+
+ res.addAll(EnumSet.copyOf(F.asList(flags)));
+
+ GridCacheProjectionImpl<K, V> prj0 = new GridCacheProjectionImpl<>(
+ (prj != null ? prj : delegate),
+ ctx,
+ null,
+ null,
+ res,
+ prj != null ? prj.subjectId() : null,
+ true,
+ prj != null ? prj.expiry() : null);
+
+ return new IgniteCacheProxy<>(ctx,
+ prj0,
+ prj0,
+ isAsync());
+ }
+ finally {
+ gate.leave(prev);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java
index cb0192c..b1030bd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java
@@ -42,7 +42,7 @@ public interface GridCacheInterceptor<K, V> {
@Nullable public V onGet(K key, @Nullable V val);
/**
- * This method is called within {@link GridCacheProjection#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])}
+ * This method is called within {@link GridCacheProjection#put(Object, Object, IgnitePredicate[])}
* and similar operations before new value is stored in cache.
* <p>
* Implementations should not execute any complex logic,
@@ -56,7 +56,7 @@ public interface GridCacheInterceptor<K, V> {
* @param oldVal Old value.
* @param newVal New value.
* @return Value to be put to cache. Returning {@code null} cancels the update.
- * @see GridCacheProjection#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])
+ * @see GridCacheProjection#put(Object, Object, IgnitePredicate[])
*/
@Nullable public V onBeforePut(K key, @Nullable V oldVal, V newVal);
@@ -76,7 +76,7 @@ public interface GridCacheInterceptor<K, V> {
public void onAfterPut(K key, V val);
/**
- * This method is called within {@link GridCacheProjection#remove(Object, org.apache.ignite.lang.IgnitePredicate[])}
+ * This method is called within {@link GridCacheProjection#remove(Object, IgnitePredicate[])}
* and similar operations to provide control over returned value.
* <p>
* Implementations should not execute any complex logic,
@@ -91,7 +91,7 @@ public interface GridCacheInterceptor<K, V> {
* @return Tuple. The first value is the flag whether remove should be cancelled or not.
* The second is the value to be returned as result of {@code remove()} operation,
* may be {@code null}.
- * @see GridCacheProjection#remove(Object, org.apache.ignite.lang.IgnitePredicate[])
+ * @see GridCacheProjection#remove(Object, IgnitePredicate[])
*/
@Nullable public IgniteBiTuple<Boolean, V> onBeforeRemove(K key, @Nullable V val);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java
index dd41e55..015a5a7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java
+++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java
@@ -49,6 +49,7 @@ public class GridCachePartialUpdateException extends IgniteCheckedException {
addSuppressed(err);
}
+ /** {@inheritDoc} */
@Override public String getMessage() {
return super.getMessage() + ": " + failedKeys;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 62daeb9..d4028be 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -1420,12 +1420,18 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
* @param reload Reload flag.
* @param tx Transaction.
* @param filter Filter.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
* @param vis Visitor.
* @return Future.
*/
- public IgniteFuture<Object> readThroughAllAsync(final Collection<? extends K> keys, boolean reload,
- @Nullable final IgniteTxEx<K, V> tx, IgnitePredicate<GridCacheEntry<K, V>>[] filter, @Nullable UUID subjId,
- String taskName, final IgniteBiInClosure<K, V> vis) {
+ public IgniteFuture<Object> readThroughAllAsync(final Collection<? extends K> keys,
+ boolean reload,
+ @Nullable final IgniteTxEx<K, V> tx,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+ @Nullable UUID subjId,
+ String taskName,
+ final IgniteBiInClosure<K, V> vis) {
return ctx.closures().callLocalSafe(new GPC<Object>() {
@Nullable @Override public Object call() {
try {
@@ -2194,7 +2200,66 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(
+ @Override public <T> EntryProcessorResult<T> invoke(final K key,
+ final EntryProcessor<K, V, T> entryProcessor,
+ final Object... args)
+ throws IgniteCheckedException {
+ A.notNull(key, "key", entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKey(key);
+
+ ctx.denyOnLocalRead();
+
+ return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
+ @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter<K, V> tx)
+ throws IgniteCheckedException {
+ Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
+ Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
+
+ IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut =
+ tx.invokeAsync(ctx, false, invokeMap, args);
+
+ Map<K, EntryProcessorResult<T>> resMap = fut.get().value();
+
+ assert resMap != null;
+ assert resMap.size() == 1 : resMap.size();
+
+ return resMap.values().iterator().next();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(final Set<? extends K> keys,
+ final EntryProcessor<K, V, T> entryProcessor,
+ final Object... args) throws IgniteCheckedException {
+ A.notNull(keys, "keys", entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKeys(keys);
+
+ ctx.denyOnLocalRead();
+
+ return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
+ @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ throws IgniteCheckedException {
+ Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
+ @Override public EntryProcessor apply(K k) {
+ return entryProcessor;
+ }
+ });
+
+ IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut =
+ tx.invokeAsync(ctx, false, invokeMap, args);
+
+ return fut.get().value();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(
final K key,
final EntryProcessor<K, V, T> entryProcessor,
final Object... args)
@@ -2208,8 +2273,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
IgniteFuture<?> fut = asyncOp(new AsyncInOp(key) {
@Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) {
- Map<? extends K, EntryProcessor> invokeMap =
- Collections.singletonMap(key, (EntryProcessor)entryProcessor);
+ Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
+ Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
return tx.invokeAsync(ctx, false, invokeMap, args);
}
@@ -2238,11 +2303,11 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
final Set<? extends K> keys,
final EntryProcessor<K, V, T> entryProcessor,
final Object... args) {
- A.notNull(entryProcessor, "entryProcessor");
+ A.notNull(keys, "keys", entryProcessor, "entryProcessor");
if (keyCheck)
validateCacheKeys(keys);
@@ -2251,7 +2316,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
IgniteFuture<?> fut = asyncOp(new AsyncInOp(keys) {
@Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) {
- Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+ Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
@Override public EntryProcessor apply(K k) {
return entryProcessor;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index 1b71eec..101427f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -450,6 +450,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
* @param ver Cache version.
* @param op Operation.
* @param writeObj Value. Type depends on operation.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param writeThrough Write through flag.
* @param retval Return value flag.
* @param expiryPlc Expiry policy..
@@ -459,14 +460,15 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
* @param intercept If {@code true} then calls cache interceptor.
* @param subjId Subject ID initiated this update.
* @param taskName Task name.
- * @return Tuple containing success flag and old value.
+ * @return Tuple containing success flag and operation result.
* @throws IgniteCheckedException If update failed.
* @throws GridCacheEntryRemovedException If entry is obsolete.
*/
- public IgniteBiTuple<Boolean, V> innerUpdateLocal(
+ public IgniteBiTuple<Boolean, Object> innerUpdateLocal(
GridCacheVersion ver,
GridCacheOperation op,
@Nullable Object writeObj,
+ @Nullable Object[] invokeArgs,
boolean writeThrough,
boolean retval,
@Nullable ExpiryPolicy expiryPlc,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index f14bba5..1b9ea20 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -1365,10 +1365,11 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public IgniteBiTuple<Boolean, V> innerUpdateLocal(
+ @Override public IgniteBiTuple<Boolean, Object> innerUpdateLocal(
GridCacheVersion ver,
GridCacheOperation op,
@Nullable Object writeObj,
+ @Nullable Object[] invokeArgs,
boolean writeThrough,
boolean retval,
@Nullable ExpiryPolicy expiryPlc,
@@ -1381,7 +1382,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert cctx.isLocal() && cctx.atomic();
- V old;
+ Object opRes;
boolean res = true;
@@ -1397,7 +1398,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
unswap(true, retval);
// Possibly get old value form store.
- old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+ V old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+
+ opRes = old;
GridCacheValueBytes oldBytes = valueBytesUnlocked();
@@ -1428,7 +1431,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
updateTtl(ttl);
}
- return new IgniteBiTuple<>(false, retval ? old : null);
+ return new IgniteBiTuple<>(false, (Object)(retval ? old : null));
}
}
@@ -1444,11 +1447,24 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
if (op == GridCacheOperation.TRANSFORM) {
transformCloClsName = writeObj.getClass().getName();
- IgniteClosure<V, V> transform = (IgniteClosure<V, V>)writeObj;
+ EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)writeObj;
- assert transform != null;
+ assert entryProcessor != null;
- updated = cctx.unwrapTemporary(transform.apply(old));
+ CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(key, old);
+
+ try {
+ Object computed = entryProcessor.process(entry, invokeArgs);
+
+ updated = cctx.unwrapTemporary(entry.getValue());
+
+ opRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed));
+ }
+ catch (Exception e) {
+ updated = old;
+
+ opRes = new CacheInvokeResult<>(e);
+ }
}
else
updated = (V)writeObj;
@@ -1460,13 +1476,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
updated = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated);
if (updated == null)
- return new IgniteBiTuple<>(false, cctx.<V>unwrapTemporary(old));
+ return new IgniteBiTuple<>(false, (Object)cctx.<V>unwrapTemporary(old));
}
else {
interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key, old);
if (cctx.cancelRemove(interceptorRes))
- return new IgniteBiTuple<>(false, cctx.<V>unwrapTemporary(interceptorRes.get2()));
+ return new IgniteBiTuple<>(false,
+ (Object)cctx.<V>unwrapTemporary(interceptorRes.get2()));
}
}
@@ -1576,7 +1593,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
}
}
- return new IgniteBiTuple<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old));
+ return new IgniteBiTuple<>(res,
+ (Object)(cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : opRes)));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
index 0724a58..70798ff 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
@@ -1599,7 +1599,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cache == null)
throw new IllegalArgumentException("Cache is not configured: " + name);
- return new IgniteCacheProxy<>(cache.context(), cache, null);
+ return new IgniteCacheProxy<>(cache.context(), cache, null, false);
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache instance for given name.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> IgniteCacheProxy<K, V> jcache(@Nullable String name) {
+ GridCacheAdapter<K, V> cache = (GridCacheAdapter<K, V>)caches.get(name);
+
+ if (cache == null)
+ throw new IllegalArgumentException("Cache is not configured: " + name);
+
+ return new IgniteCacheProxy<>(cache.context(), cache, null, false);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
index 1a98192..8df7d10 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
@@ -399,9 +399,31 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> {
* @param key Key.
* @param entryProcessor Entry processor.
* @param args Arguments.
+ * @return Invoke result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public <T> EntryProcessorResult<T> invoke(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException;
+
+ /**
+ * @param keys Keys.
+ * @param entryProcessor Entry processor.
+ * @param args Arguments.
+ * @return Invoke results.
+ * @throws IgniteCheckedException If failed.
+ */
+ public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException;
+
+ /**
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param args Arguments.
* @return Future.
*/
- public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args);
@@ -411,7 +433,7 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> {
* @param args Arguments.
* @return Future.
*/
- public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
Object... args);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
index ad5cde3..62b6b72 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
@@ -789,17 +789,30 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
+ throws IgniteCheckedException {
+ return cache.invoke(key, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ return cache.invokeAll(keys, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) {
- return cache.invoke(key, entryProcessor, args);
+ return cache.invokeAsync(key, entryProcessor, args);
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
Object... args) {
- return cache.invokeAll(keys, entryProcessor, args);
+ return cache.invokeAllAsync(keys, entryProcessor, args);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
index 90aeb0b..66f8626 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
@@ -738,9 +738,9 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ @Override public <T> EntryProcessorResult<T> invoke(K key,
EntryProcessor<K, V, T> entryProcessor,
- Object... args) {
+ Object... args) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -752,9 +752,9 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
- Object... args) {
+ Object... args) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -766,6 +766,34 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.invokeAsync(key, entryProcessor, args);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.invokeAllAsync(keys, entryProcessor, args);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteFuture<Boolean> putxAsync(K key, V val,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
index a7e47b0..b6fe4be 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
@@ -180,7 +180,8 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> {
* @throws IgniteCheckedException If data loading failed.
*/
@SuppressWarnings({"unchecked"})
- public boolean loadAllFromStore(@Nullable IgniteTx tx, Collection<? extends K> keys,
+ public boolean loadAllFromStore(@Nullable IgniteTx tx,
+ Collection<? extends K> keys,
final IgniteBiInClosure<K, V> vis) throws IgniteCheckedException {
if (store != null) {
if (!keys.isEmpty()) {
@@ -230,6 +231,10 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> {
return true;
}
+ else {
+ for (K key : keys)
+ vis.apply(key, null);
+ }
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 78d92f8..0644821 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -455,7 +455,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException {
- transformAsync(key, transformer).get();
+ //transformAsync(key, transformer).get();
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@@ -482,8 +484,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException {
- transformAllAsync(m).get();
+ @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m)
+ throws IgniteCheckedException {
+ //transformAllAsync(m).get();
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@@ -632,8 +637,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
+ throws IgniteCheckedException {
+ return invokeAsync(key, entryProcessor, args).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args)
+ throws IgniteCheckedException {
+ return invokeAllAsync(keys, entryProcessor, args).get();
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) {
A.notNull(key, "key", entryProcessor, "entryProcessor");
@@ -671,7 +690,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
final EntryProcessor<K, V, T> entryProcessor,
Object... args) {
A.notNull(keys, "keys", entryProcessor, "entryProcessor");
@@ -701,8 +720,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* Entry point for all public API put/transform methods.
*
- * @param map Put map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed.
- * @param transformMap Transform map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed.
+ * @param map Put map. Either {@code map}, {@code invokeMap} or {@code drMap} should be passed.
+ * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or {@code drMap} should be passed.
* @param invokeArgs Optional arguments for EntryProcessor.
* @param drPutMap DR put map.
* @param drRmvMap DR remove map.
@@ -714,7 +733,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private IgniteFuture updateAllAsync0(
@Nullable final Map<? extends K, ? extends V> map,
- @Nullable final Map<? extends K, EntryProcessor> transformMap,
+ @Nullable final Map<? extends K, EntryProcessor> invokeMap,
@Nullable Object[] invokeArgs,
@Nullable final Map<? extends K, GridCacheDrInfo<V>> drPutMap,
@Nullable final Map<? extends K, GridCacheVersion> drRmvMap,
@@ -738,10 +757,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx,
this,
ctx.config().getWriteSynchronizationMode(),
- transformMap != null ? TRANSFORM : UPDATE,
- map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : drPutMap != null ?
+ invokeMap != null ? TRANSFORM : UPDATE,
+ map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : drPutMap != null ?
drPutMap.keySet() : drRmvMap.keySet(),
- map != null ? map.values() : transformMap != null ? transformMap.values() : null,
+ map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
invokeArgs,
drPutMap != null ? drPutMap.values() : null,
drRmvMap != null ? drRmvMap.values() : null,
@@ -1213,12 +1232,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
int size = req.keys().size();
Map<K, V> putMap = null;
+
Map<K, EntryProcessor<K, V, ?>> entryProcessorMap = null;
+
Collection<K> rmvKeys = null;
+
UpdateBatchResult<K, V> updRes = new UpdateBatchResult<>();
+
List<GridDhtCacheEntry<K, V>> filtered = new ArrayList<>(size);
+
GridCacheOperation op = req.operation();
- Map<Object, Object> invokeResMap = op == TRANSFORM ? U.newHashMap(size) : null;
+
+ Map<K, EntryProcessorResult> invokeResMap =
+ op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : null;
int firstEntryIdx = 0;
@@ -2644,7 +2670,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private boolean readersOnly;
/** */
- private Map<Object, Object> invokeRes;
+ private Map<K, EntryProcessorResult> invokeRes;
/**
* @param entry Entry.
@@ -2679,14 +2705,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @param invokeRes Result for invoke operation.
*/
- private void invokeResult(Map<Object, Object> invokeRes) {
+ private void invokeResult(Map<K, EntryProcessorResult> invokeRes) {
this.invokeRes = invokeRes;
}
/**
* @return Result for invoke operation.
*/
- Map<Object, Object> invokeResults() {
+ Map<K, EntryProcessorResult> invokeResults() {
return invokeRes;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
index 07e9785..5b3055a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -555,6 +555,34 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override public <T> EntryProcessorResult<T> invoke(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ return dht.invoke(key, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ return dht.invokeAll(keys, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws EntryProcessorException {
+ return dht.invokeAsync(key, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ return dht.invokeAllAsync(keys, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
@Override public V remove(K key,
@Nullable GridCacheEntryEx<K, V> entry,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) throws IgniteCheckedException {
[2/3] incubator-ignite git commit: # ignite-44
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index eaf0173..650f0ab 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -25,6 +25,7 @@ import org.jetbrains.annotations.*;
import sun.misc.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -106,6 +107,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (V)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
+ null,
expiryPerCall(),
true,
false,
@@ -127,6 +129,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
+ null,
expiryPerCall(),
false,
false,
@@ -145,6 +148,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
+ null,
expiryPerCall(),
false,
false,
@@ -163,7 +167,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ctx.denyOnLocalRead();
- return updateAllAsync0(F0.asMap(key, val), null, true, false, ttl, filter);
+ return updateAllAsync0(F0.asMap(key, val),
+ null,
+ null,
+ true,
+ false,
+ filter);
}
/** {@inheritDoc} */
@@ -177,7 +186,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ctx.denyOnLocalRead();
- return updateAllAsync0(F0.asMap(key, val), null, false, false, ttl, filter);
+ return updateAllAsync0(F0.asMap(key, val),
+ null,
+ null,
+ false,
+ false,
+ filter);
}
/** {@inheritDoc} */
@@ -242,6 +256,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (GridCacheReturn<V>)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(newVal),
+ null,
expiryPerCall(),
true,
true,
@@ -259,6 +274,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (GridCacheReturn<V>)updateAllInternal(DELETE,
Collections.singleton(key),
null,
+ null,
expiryPerCall(),
true,
true,
@@ -283,7 +299,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ctx.denyOnLocalRead();
- return updateAllAsync0(F.asMap(key, newVal), null, true, true, 0,
+ return updateAllAsync0(F.asMap(key, newVal),
+ null,
+ null,
+ true,
+ true,
ctx.equalsPeekArray(oldVal));
}
@@ -295,6 +315,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
updateAllInternal(UPDATE,
m.keySet(),
m.values(),
+ null,
expiryPerCall(),
false,
false,
@@ -307,11 +328,17 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
ctx.denyOnLocalRead();
- return updateAllAsync0(m, null, false, false, 0, filter);
+ return updateAllAsync0(m,
+ null,
+ null,
+ false,
+ false,
+ filter);
}
/** {@inheritDoc} */
@Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException {
+ /*
ctx.denyOnLocalRead();
updateAllInternal(TRANSFORM,
@@ -322,12 +349,16 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
null,
ctx.isStoreEnabled());
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer)
throws IgniteCheckedException {
+ /*
return (R)updateAllInternal(TRANSFORM,
Collections.singleton(key),
Collections.singleton(new GridCacheTransformComputeClosure<>(transformer)),
@@ -336,6 +367,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
null,
ctx.isStoreEnabled());
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@@ -343,14 +377,19 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
IgniteClosure<V, V> transformer,
@Nullable GridCacheEntryEx<K, V> entry,
long ttl) {
+ /*
ctx.denyOnLocalRead();
return updateAllAsync0(null, Collections.singletonMap(key, transformer), false, false, ttl, null);
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
@Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException {
+ /*
ctx.denyOnLocalRead();
if (F.isEmpty(m))
@@ -364,16 +403,23 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
null,
ctx.isStoreEnabled());
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) {
+ /*
ctx.denyOnLocalRead();
if (F.isEmpty(m))
return new GridFinishedFuture<Object>(ctx.kernalContext());
return updateAllAsync0(null, m, false, false, 0, null);
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@@ -386,6 +432,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (V)updateAllInternal(DELETE,
Collections.singleton(key),
null,
+ null,
expiryPerCall(),
true,
false,
@@ -412,6 +459,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
updateAllInternal(DELETE,
keys,
null,
+ null,
expiryPerCall(),
false,
false,
@@ -439,6 +487,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(DELETE,
Collections.singleton(key),
null,
+ null,
expiryPerCall(),
false,
false,
@@ -467,6 +516,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(DELETE,
Collections.singleton(key),
null,
+ null,
expiryPerCall(),
false,
false,
@@ -669,31 +719,107 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return getAllAsync(keys, null, false, subjId, taskName, deserializePortable, false, expiry, filter).get();
}
+ /** {@inheritDoc} */
+ @Override public <T> EntryProcessorResult<T> invoke(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ return invokeAsync(key, entryProcessor, args).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ return invokeAllAsync(keys, entryProcessor, args).get();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws EntryProcessorException {
+ A.notNull(key, "key", entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKey(key);
+
+ ctx.denyOnLocalRead();
+
+ Map<? extends K, EntryProcessor> invokeMap =
+ Collections.singletonMap(key, (EntryProcessor)entryProcessor);
+
+ IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
+ invokeMap,
+ args,
+ true,
+ false,
+ null);
+
+ return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
+ @Override public EntryProcessorResult<T> applyx(IgniteFuture<Map<K, EntryProcessorResult<T>>> fut)
+ throws IgniteCheckedException {
+ Map<K, EntryProcessorResult<T>> resMap = fut.get();
+
+ assert resMap != null;
+ assert resMap.size() == 1 : resMap.size();
+
+ return resMap.values().iterator().next();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
+ Set<? extends K> keys,
+ final EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ A.notNull(keys, "keys", entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKeys(keys);
+
+ ctx.denyOnLocalRead();
+
+ Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+ @Override public EntryProcessor apply(K k) {
+ return entryProcessor;
+ }
+ });
+
+ return updateAllAsync0(null,
+ invokeMap,
+ args,
+ true,
+ false,
+ null);
+ }
+
/**
* Entry point for public API update methods.
*
- * @param map Put map. Either {@code map} or {@code transformMap} should be passed.
- * @param transformMap Transform map. Either {@code map} or {@code transformMap} should be passed.
+ * @param map Put map. Either {@code map} or {@code invokeMap} should be passed.
+ * @param invokeMap Transform map. Either {@code map} or {@code invokeMap} should be passed.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
- * @param ttl Entry time-to-live.
* @param filter Cache entry filter for atomic updates.
* @return Completion future.
*/
private IgniteFuture updateAllAsync0(
@Nullable final Map<? extends K, ? extends V> map,
- @Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+ @Nullable final Map<? extends K, EntryProcessor> invokeMap,
+ @Nullable final Object[] invokeArgs,
final boolean retval,
final boolean rawRetval,
- final long ttl,
@Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter
) {
- final GridCacheOperation op = transformMap != null ? TRANSFORM : UPDATE;
+ final GridCacheOperation op = invokeMap != null ? TRANSFORM : UPDATE;
final Collection<? extends K> keys =
- map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : null;
+ map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : null;
- final Collection<?> vals = map != null ? map.values() : transformMap != null ? transformMap.values() : null;
+ final Collection<?> vals = map != null ? map.values() : invokeMap != null ? invokeMap.values() : null;
final boolean storeEnabled = ctx.isStoreEnabled();
@@ -704,6 +830,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return updateAllInternal(op,
keys,
vals,
+ invokeArgs,
expiry,
retval,
rawRetval,
@@ -737,6 +864,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return updateAllInternal(DELETE,
keys,
null,
+ null,
expiryPlc,
retval,
rawRetval,
@@ -747,11 +875,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
}
/**
- * Entry point for all public update methods (put, remove, transform).
+ * Entry point for all public update methods (put, remove, invoke).
*
* @param op Operation.
* @param keys Keys.
* @param vals Values.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param expiryPlc Expiry policy.
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
@@ -764,6 +893,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
private Object updateAllInternal(GridCacheOperation op,
Collection<? extends K> keys,
@Nullable Iterable<?> vals,
+ @Nullable Object[] invokeArgs,
@Nullable ExpiryPolicy expiryPlc,
boolean retval,
boolean rawRetval,
@@ -784,9 +914,15 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
UUID subjId = ctx.subjectIdPerCall(null);
if (storeEnabled && keys.size() > 1) {
- updateWithBatch(op, keys, vals, expiryPlc, ver, filter, subjId, taskName);
-
- return null;
+ return updateWithBatch(op,
+ keys,
+ vals,
+ invokeArgs,
+ expiryPlc,
+ ver,
+ filter,
+ subjId,
+ taskName);
}
Iterator<?> valsIter = vals != null ? vals.iterator() : null;
@@ -809,10 +945,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
try {
entry = entryEx(key);
- IgniteBiTuple<Boolean, V> t = entry.innerUpdateLocal(
+ IgniteBiTuple<Boolean, Object> t = entry.innerUpdateLocal(
ver,
val == null ? DELETE : op,
val,
+ invokeArgs,
storeEnabled,
retval,
expiryPlc,
@@ -823,16 +960,23 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
subjId,
taskName);
- if (res == null) {
- if (op == TRANSFORM && val instanceof GridCacheTransformComputeClosure) {
- assert retval;
+ if (op == TRANSFORM) {
+ assert t.get2() instanceof EntryProcessorResult : t.get2();
+
+ Map<K, EntryProcessorResult> computedMap;
- res = new IgniteBiTuple<>(t.get1(),
- ((GridCacheTransformComputeClosure<V, ?>)val).returnValue());
+ if (res == null) {
+ computedMap = U.newHashMap(keys.size());
+
+ res = new IgniteBiTuple<>(true, computedMap);
}
else
- res = t;
+ computedMap = (Map<K, EntryProcessorResult>)res.get2();
+
+ computedMap.put(key, (EntryProcessorResult)t.getValue());
}
+ else if (res == null)
+ res = t;
break; // While.
}
@@ -872,18 +1016,21 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
* @param op Operation.
* @param keys Keys.
* @param vals Values.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param expiryPlc Expiry policy.
* @param ver Cache version.
* @param filter Optional filter.
* @param subjId Subject ID.
* @param taskName Task name.
* @throws GridCachePartialUpdateException If update failed.
+ * @return Results map for invoke operation.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"})
- private void updateWithBatch(
+ private Map<K, EntryProcessorResult> updateWithBatch(
GridCacheOperation op,
Collection<? extends K> keys,
@Nullable Iterable<?> vals,
+ @Nullable Object[] invokeArgs,
@Nullable ExpiryPolicy expiryPlc,
GridCacheVersion ver,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@@ -896,7 +1043,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
int size = locked.size();
Map<K, V> putMap = null;
+
Collection<K> rmvKeys = null;
+
+ Map<K, EntryProcessorResult> invokeResMap =
+ op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : null;
+
List<GridCacheEntryEx<K, V>> filtered = new ArrayList<>(size);
GridCachePartialUpdateException err = null;
@@ -933,7 +1085,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
}
if (op == TRANSFORM) {
- IgniteClosure<V, V> transform = (IgniteClosure<V, V>)val;
+ EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)val;
V old = entry.innerGet(null,
/*swap*/true,
@@ -944,12 +1096,30 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/**event*/true,
/**temporary*/true,
subjId,
- transform,
+ entryProcessor,
taskName,
CU.<K, V>empty(),
null);
- V updated = transform.apply(old);
+ CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old);
+
+ V updated;
+ CacheInvokeResult invokeRes;
+
+ try {
+ Object computed = entryProcessor.process(invokeEntry, invokeArgs);
+
+ updated = ctx.unwrapTemporary(invokeEntry.getValue());
+
+ invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed));
+ }
+ catch (Exception e) {
+ invokeRes = new CacheInvokeResult<>(e);
+
+ updated = old;
+ }
+
+ invokeResMap.put(entry.key(), invokeRes);
if (updated == null) {
if (intercept) {
@@ -1107,6 +1277,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
if (err != null)
throw err;
+
+ return invokeResMap;
}
finally {
unlockEntries(locked);
@@ -1179,10 +1351,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
assert writeVal != null || op == DELETE : "null write value found.";
- IgniteBiTuple<Boolean, V> t = entry.innerUpdateLocal(
+ IgniteBiTuple<Boolean, Object> t = entry.innerUpdateLocal(
ver,
op,
writeVal,
+ null,
false,
false,
expiryPlc,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
index 6fb77a1..bfd9359 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1194,7 +1194,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
/*event*/recordEvt,
/*temporary*/true,
/*subjId*/subjId,
- /**closure name */recordEvt ? F.first(txEntry.entryProcessors()) : null,
+ /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null,
resolveTaskName(),
CU.<K, V>empty(),
null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 34938d5..1680724 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1768,7 +1768,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
@Override public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync(
GridCacheContext<K, V> cacheCtx,
boolean retval,
- @Nullable Map<? extends K, EntryProcessor> map,
+ @Nullable Map<? extends K, EntryProcessor<K, V, Object>> map,
Object... invokeArgs
) {
return (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)putAllAsync0(cacheCtx,
@@ -1829,7 +1829,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
@Nullable ExpiryPolicy expiryPlc,
boolean implicit,
@Nullable Map<? extends K, ? extends V> lookup,
- @Nullable Map<? extends K, EntryProcessor> invokeMap,
+ @Nullable Map<? extends K, EntryProcessor<K, V, Object>> invokeMap,
@Nullable Object[] invokeArgs,
boolean retval,
boolean lockOnly,
@@ -1992,7 +1992,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
break; // While.
}
- GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+ final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
txEntry = addEntry(op,
@@ -2019,7 +2019,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
txEntry.markValid();
if (old == null) {
- if (retval && !readThrough) {
+ boolean load = retval && !readThrough;
+
+ // Check for transform here to avoid map creation.
+ load |= (op == TRANSFORM && keys.size() == 1);
+
+ if (load) {
// If return value is required, then we know for sure that there is only
// one key in the keys collection.
assert keys.size() == 1;
@@ -2035,7 +2040,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
log.debug("Loaded value from remote node [key=" + k + ", val=" +
v + ']');
- ret.set(v, true);
+ if (op == TRANSFORM) {
+ IgniteTxEntry<K, V> e =
+ entry(new IgniteTxKey<>(k, cacheCtx.cacheId()));
+
+ assert e != null && e.op() == TRANSFORM : e;
+
+ addInvokeResult(e, v, ret);
+ }
+ else
+ ret.set(v, true);
}
});
@@ -2130,6 +2144,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
drVer);
enlisted.add(key);
+
+ if (txEntry.op() == TRANSFORM)
+ addInvokeResult(txEntry, txEntry.value(), ret);
}
if (!pessimistic()) {
@@ -2137,6 +2154,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (retval)
ret.set(v, true);
+ else
+ ret.success(true);
}
}
}
@@ -2155,11 +2174,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
missedForInvoke,
deserializePortables(cacheCtx),
new CI2<K, V>() {
- @Override public void apply(K k, V v) {
+ @Override public void apply(K key, V val) {
if (log.isDebugEnabled())
- log.debug("Loaded value from remote node [key=" + k + ", val=" + v + ']');
+ log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
+
+ IgniteTxEntry<K, V> e = entry(new IgniteTxKey<>(key, cacheCtx.cacheId()));
- addInvokeResult(entry(new IgniteTxKey<>(k, cacheCtx.cacheId())), v, ret);
+ assert e != null && e.op() == TRANSFORM : e;
+
+ addInvokeResult(e, val, ret);
}
});
@@ -2373,17 +2396,19 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
private IgniteFuture putAllAsync0(
final GridCacheContext<K, V> cacheCtx,
@Nullable Map<? extends K, ? extends V> map,
- @Nullable Map<? extends K, EntryProcessor> invokeMap,
+ @Nullable Map<? extends K, EntryProcessor<K, V, Object>> invokeMap,
@Nullable final Object[] invokeArgs,
@Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap,
final boolean retval,
@Nullable GridCacheEntryEx<K, V> cached,
@Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+ assert filter == null || invokeMap == null;
+
cacheCtx.checkSecurity(GridSecurityPermission.CACHE_PUT);
// Cached entry may be passed only from entry wrapper.
final Map<K, V> map0;
- final Map<K, EntryProcessor> invokeMap0;
+ final Map<K, EntryProcessor<K, V, Object>> invokeMap0;
if (drMap != null) {
assert map == null;
@@ -2419,7 +2444,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
invokeMap0 = U.newHashMap(invokeMap.size());
try {
- for (Map.Entry<? extends K, EntryProcessor> e : invokeMap.entrySet()) {
+ for (Map.Entry<? extends K, EntryProcessor<K, V, Object>> e : invokeMap.entrySet()) {
K key = (K)cacheCtx.marshalToPortable(e.getKey());
invokeMap0.put(key, e.getValue());
@@ -2434,7 +2459,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
}
else {
map0 = (Map<K, V>)map;
- invokeMap0 = (Map<K, EntryProcessor>)invokeMap;
+ invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
}
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
index 8a485b6..12680f3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -96,7 +96,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> {
public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync(
GridCacheContext<K, V> cacheCtx,
boolean retval,
- Map<? extends K, EntryProcessor> map,
+ Map<? extends K, EntryProcessor<K, V, Object>> map,
Object... invokeArgs);
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
index a21c8fc..89bcbe6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
@@ -34,6 +34,7 @@ import org.gridgain.grid.util.worker.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
import java.io.*;
import java.nio.*;
import java.util.*;
@@ -1101,7 +1102,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
// No affinity key present, just concat and return.
if (colocatedKey.affinityKey() == null) {
- dataCachePrj.transform(colocatedKey, new UpdateClosure(startOff, data));
+ dataCachePrj.invoke(colocatedKey, new UpdateProcessor(startOff, data));
return;
}
@@ -1125,16 +1126,16 @@ public class GridGgfsDataManager extends GridGgfsManager {
boolean hasVal = false;
- UpdateClosure transformClos = new UpdateClosure(startOff, data);
+ UpdateProcessor transformClos = new UpdateProcessor(startOff, data);
if (vals.get(colocatedKey) != null) {
- dataCachePrj.transform(colocatedKey, transformClos);
+ dataCachePrj.invoke(colocatedKey, transformClos);
hasVal = true;
}
if (vals.get(key) != null) {
- dataCachePrj.transform(key, transformClos);
+ dataCachePrj.invoke(key, transformClos);
hasVal = true;
}
@@ -1570,7 +1571,8 @@ public class GridGgfsDataManager extends GridGgfsManager {
* Helper closure to update data in cache.
*/
@GridInternal
- private static final class UpdateClosure implements IgniteClosure<byte[], byte[]>, Externalizable {
+ private static final class UpdateProcessor implements EntryProcessor<GridGgfsBlockKey, byte[], Void>,
+ Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -1584,7 +1586,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
* Empty constructor required for {@link Externalizable}.
*
*/
- public UpdateClosure() {
+ public UpdateProcessor() {
// No-op.
}
@@ -1594,7 +1596,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
* @param start Start position in the block to write new data from.
* @param data Data block to write into cache.
*/
- private UpdateClosure(int start, byte[] data) {
+ private UpdateProcessor(int start, byte[] data) {
assert start >= 0;
assert data != null;
assert start + data.length >= 0 : "Too much data [start=" + start + ", data.length=" + data.length + ']';
@@ -1604,7 +1606,9 @@ public class GridGgfsDataManager extends GridGgfsManager {
}
/** {@inheritDoc} */
- @Override public byte[] apply(byte[] e) {
+ @Override public Void process(MutableEntry<GridGgfsBlockKey, byte[]> entry, Object... args) {
+ byte[] e = entry.getValue();
+
final int size = data.length;
if (e == null || e.length == 0)
@@ -1621,7 +1625,9 @@ public class GridGgfsDataManager extends GridGgfsManager {
// Copy data into entry.
U.arrayCopy(data, 0, e, start, size);
- return e;
+ entry.setValue(e);
+
+ return null;
}
/** {@inheritDoc} */
@@ -1638,7 +1644,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(UpdateClosure.class, this, "start", start, "data.length", data.length);
+ return S.toString(UpdateProcessor.class, this, "start", start, "data.length", data.length);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
index eb0a728..87e09b9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
@@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.grid.util.lang.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
@@ -751,7 +752,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
assert metaCache.get(parentId) != null;
- id2InfoPrj.transform(parentId, new UpdateListing(fileName, new GridGgfsListingEntry(newFileInfo), false));
+ id2InfoPrj.invoke(parentId, new UpdateListing(fileName, new GridGgfsListingEntry(newFileInfo), false));
return null;
}
@@ -868,10 +869,10 @@ public class GridGgfsMetaManager extends GridGgfsManager {
assert metaCache.get(destParentId) != null;
// Remove listing entry from the source parent listing.
- id2InfoPrj.transform(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
+ id2InfoPrj.invoke(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
// Add listing entry into the destination parent listing.
- id2InfoPrj.transform(destParentId, new UpdateListing(destFileName, srcEntry, false));
+ id2InfoPrj.invoke(destParentId, new UpdateListing(destFileName, srcEntry, false));
}
/**
@@ -987,7 +988,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
// Update a file info of the removed file with a file path,
// which will be used by delete worker for event notifications.
- id2InfoPrj.transform(fileId, new UpdatePath(path));
+ id2InfoPrj.invoke(fileId, new UpdatePath(path));
return GridGgfsFileInfo.builder(fileInfo).path(path).build();
}
@@ -1086,12 +1087,12 @@ public class GridGgfsMetaManager extends GridGgfsManager {
id2InfoPrj.put(newInfo.id(), newInfo);
// Add new info to trash listing.
- id2InfoPrj.transform(TRASH_ID, new UpdateListing(newInfo.id().toString(),
+ id2InfoPrj.invoke(TRASH_ID, new UpdateListing(newInfo.id().toString(),
new GridGgfsListingEntry(newInfo), false));
// Remove listing entries from root.
for (Map.Entry<String, GridGgfsListingEntry> entry : transferListing.entrySet())
- id2InfoPrj.transform(ROOT_ID, new UpdateListing(entry.getKey(), entry.getValue(), true));
+ id2InfoPrj.invoke(ROOT_ID, new UpdateListing(entry.getKey(), entry.getValue(), true));
resId = newInfo.id();
}
@@ -1228,7 +1229,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
GridGgfsListingEntry listingEntry = parentInfo.listing().get(name);
if (listingEntry != null)
- id2InfoPrj.transform(parentId, new UpdateListing(name, listingEntry, true));
+ id2InfoPrj.invoke(parentId, new UpdateListing(name, listingEntry, true));
id2InfoPrj.remove(id);
@@ -1359,7 +1360,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
assert metaCache.get(parentId) != null;
- id2InfoPrj.transform(parentId, new UpdateListing(fileName, entry, false));
+ id2InfoPrj.invoke(parentId, new UpdateListing(fileName, entry, false));
}
return newInfo;
@@ -1424,7 +1425,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
assert validTxState(false);
- id2InfoPrj.transformAsync(parentId, new UpdateListingEntry(fileId, fileName, lenDelta, 0,
+ id2InfoPrj.invokeAsync(parentId, new UpdateListingEntry(fileId, fileName, lenDelta, 0,
modificationTime));
}
finally {
@@ -1659,9 +1660,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
id2InfoPrj.removex(oldId); // Remove the old one.
id2InfoPrj.putx(newInfo.id(), newInfo); // Put the new one.
- id2InfoPrj.transform(parentInfo.id(),
+ id2InfoPrj.invoke(parentInfo.id(),
new UpdateListing(path.name(), parentInfo.listing().get(path.name()), true));
- id2InfoPrj.transform(parentInfo.id(),
+ id2InfoPrj.invoke(parentInfo.id(),
new UpdateListing(path.name(), new GridGgfsListingEntry(newInfo), false));
IgniteFuture<?> delFut = ggfsCtx.data().delete(oldInfo);
@@ -2150,7 +2151,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
}
// Update the deleted file info with path information for delete worker.
- id2InfoPrj.transform(info.id(), new UpdatePath(path));
+ id2InfoPrj.invoke(info.id(), new UpdatePath(path));
return true; // No additional handling is required.
}
@@ -2606,7 +2607,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
id2InfoPrj.putx(fileId, updated);
- id2InfoPrj.transform(parentId, new UpdateListingEntry(fileId, fileName, 0, accessTime,
+ id2InfoPrj.invoke(parentId, new UpdateListingEntry(fileId, fileName, 0, accessTime,
modificationTime));
tx.commit();
@@ -2741,7 +2742,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
/**
* Updates file length information in parent listing.
*/
- private static final class UpdateListingEntry implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>,
+ private static final class UpdateListingEntry implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -2775,8 +2776,11 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* @param accessTime Last access time.
* @param modificationTime Last modification time.
*/
- private UpdateListingEntry(IgniteUuid fileId, String fileName, long lenDelta,
- long accessTime, long modificationTime) {
+ private UpdateListingEntry(IgniteUuid fileId,
+ String fileName,
+ long lenDelta,
+ long accessTime,
+ long modificationTime) {
this.fileId = fileId;
this.fileName = fileName;
this.lenDelta = lenDelta;
@@ -2785,13 +2789,18 @@ public class GridGgfsMetaManager extends GridGgfsManager {
}
/** {@inheritDoc} */
- @Override public GridGgfsFileInfo apply(GridGgfsFileInfo fileInfo) {
+ @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) {
+ GridGgfsFileInfo fileInfo = e.getValue();
+
Map<String, GridGgfsListingEntry> listing = fileInfo.listing();
GridGgfsListingEntry entry = listing.get(fileName);
- if (entry == null || !entry.fileId().equals(fileId))
- return fileInfo;
+ if (entry == null || !entry.fileId().equals(fileId)) {
+ e.setValue(fileInfo);
+
+ return null;
+ }
entry = new GridGgfsListingEntry(entry, entry.length() + lenDelta,
accessTime == -1 ? entry.accessTime() : accessTime,
@@ -2803,7 +2812,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
// Modify listing map in-place since map is serialization-safe.
listing.put(fileName, entry);
- return new GridGgfsFileInfo(listing, fileInfo);
+ e.setValue(new GridGgfsFileInfo(listing, fileInfo));
+
+ return null;
}
/** {@inheritDoc} */
@@ -2829,7 +2840,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* Update directory listing closure.
*/
@GridInternal
- private static final class UpdateListing implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>,
+ private static final class UpdateListing implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -2868,7 +2879,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
}
/** {@inheritDoc} */
- @Override @Nullable public GridGgfsFileInfo apply(GridGgfsFileInfo fileInfo) {
+ @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) {
+ GridGgfsFileInfo fileInfo = e.getValue();
+
assert fileInfo != null : "File info not found for the child: " + entry.fileId();
assert fileInfo.isDirectory();
@@ -2897,7 +2910,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
", oldEntry=" + oldEntry + ']');
}
- return new GridGgfsFileInfo(listing, fileInfo);
+ e.setValue(new GridGgfsFileInfo(listing, fileInfo));
+
+ return null;
}
/** {@inheritDoc} */
@@ -2924,7 +2939,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* Update path closure.
*/
@GridInternal
- private static final class UpdatePath implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>,
+ private static final class UpdatePath implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -2943,11 +2958,16 @@ public class GridGgfsMetaManager extends GridGgfsManager {
* Default constructor (required by Externalizable).
*/
public UpdatePath() {
+ // No-op.
}
/** {@inheritDoc} */
- @Override public GridGgfsFileInfo apply(GridGgfsFileInfo info) {
- return GridGgfsFileInfo.builder(info).path(path).build();
+ @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) {
+ GridGgfsFileInfo info = e.getValue();
+
+ e.setValue(GridGgfsFileInfo.builder(info).path(path).build());
+
+ return null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java
new file mode 100644
index 0000000..6a97b19
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java
@@ -0,0 +1,47 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicInvokeTest extends IgniteCacheInvokeAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+ return CLOCK;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return PARTITIONED_ONLY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java
new file mode 100644
index 0000000..7048f18
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java
@@ -0,0 +1,41 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicLocalInvokeTest extends IgniteCacheInvokeAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return LOCAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return PARTITIONED_ONLY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java
new file mode 100644
index 0000000..2ff0468
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java
@@ -0,0 +1,22 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.store.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicLocalWithStoreInvokeTest extends IgniteCacheAtomicLocalInvokeTest {
+ /** {@inheritDoc} */
+ @Override protected GridCacheStore<?, ?> cacheStore() {
+ return new TestStore();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java
new file mode 100644
index 0000000..8f4d71c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java
@@ -0,0 +1,24 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicNearEnabledInvokeTest extends IgniteCacheAtomicInvokeTest {
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return NEAR_PARTITIONED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
index 8731306..068476c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
@@ -39,7 +39,7 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
public void testInvoke() throws Exception {
// TODO IGNITE41 test with forceTransformBackups.
- invoke(null);
+ invoke(null);
if (atomicityMode() == TRANSACTIONAL) {
invoke(PESSIMISTIC);
@@ -165,19 +165,21 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
invokeAll(cache, new HashSet<>(primaryKeys(cache, 3, 0)), txMode);
- invokeAll(cache, new HashSet<>(backupKeys(cache, 3, 0)), txMode);
+ if (gridCount() > 1) {
+ invokeAll(cache, new HashSet<>(backupKeys(cache, 3, 0)), txMode);
- invokeAll(cache, new HashSet<>(nearKeys(cache, 3, 0)), txMode);
+ invokeAll(cache, new HashSet<>(nearKeys(cache, 3, 0)), txMode);
- Set<Integer> keys = new HashSet<>();
+ Set<Integer> keys = new HashSet<>();
- keys.addAll(primaryKeys(jcache(0), 3, 0));
- keys.addAll(primaryKeys(jcache(1), 3, 0));
- keys.addAll(primaryKeys(jcache(2), 3, 0));
+ keys.addAll(primaryKeys(jcache(0), 3, 0));
+ keys.addAll(primaryKeys(jcache(1), 3, 0));
+ keys.addAll(primaryKeys(jcache(2), 3, 0));
- invokeAll(cache, keys, txMode);
+ invokeAll(cache, keys, txMode);
+ }
- keys = new HashSet<>();
+ Set<Integer> keys = new HashSet<>();
for (int i = 0; i < 1000; i++)
keys.add(i);
@@ -415,7 +417,6 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<Integer, Integer> e,
Object... arguments) throws EntryProcessorException {
- System.out.println(Thread.currentThread() + " compute, old=" + e.getValue());
if (e.exists()) {
Integer val = e.getValue();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java
new file mode 100644
index 0000000..20576ab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java
@@ -0,0 +1,41 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheTxLocalInvokeTest extends IgniteCacheInvokeAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return LOCAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return PARTITIONED_ONLY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java
new file mode 100644
index 0000000..ffc50ff
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java
@@ -0,0 +1,24 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheTxNearEnabledInvokeTest extends IgniteCacheTxInvokeTest {
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return NEAR_PARTITIONED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index a57da71..0d3b54f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -363,12 +363,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
createUpdatePutAll(null);
if (atomicityMode() == TRANSACTIONAL) {
- IgniteTxConcurrency[] txModes;
-
- if (cacheMode() == LOCAL)
- txModes= new IgniteTxConcurrency[]{PESSIMISTIC};
- else
- txModes= new IgniteTxConcurrency[]{PESSIMISTIC, OPTIMISTIC};
+ IgniteTxConcurrency[] txModes = new IgniteTxConcurrency[]{PESSIMISTIC, OPTIMISTIC};
for (IgniteTxConcurrency tx : txModes) {
for (final Integer key : keys()) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index fd3751c..9d56c7f 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -891,6 +891,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertEquals("3", res.get("key3").get());
assertEquals(3, res.size());
+
+ cache.remove("key1");
+ cache.put("key2", 1);
+ cache.put("key3", 3);
}
Map<String, EntryProcessorResult<String>> res = cache.invokeAll(F.asSet("key1", "key2", "key3"), RMV_PROCESSOR);
@@ -901,9 +905,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache(i).peek("key3"));
}
- assertEquals("1", res.get("key1").get());
- assertEquals("2", res.get("key2").get());
- assertEquals("4", res.get("key3").get());
+ assertEquals("null", res.get("key1").get());
+ assertEquals("1", res.get("key2").get());
+ assertEquals("3", res.get("key3").get());
assertEquals(3, res.size());
@@ -928,9 +932,23 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
public void testTransformAllWithNulls() throws Exception {
- final GridCacheProjection<String, Integer> cache = cache();
+ final IgniteCache<String, Integer> cache = jcache();
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.invokeAll(null, INCR_PROCESSOR);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
- cache.transformAll(null); // This should be no-op.
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.invokeAll(F.asSet("key1"), null);
+
+ return null;
+ }
+ }, NullPointerException.class, null);
{
Map<String, Integer> m = new HashMap<>(2);
@@ -944,38 +962,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
{
- Map<String, IgniteClosure<Integer, Integer>> tm = new HashMap<>(2);
-
- tm.put("key1", INCR_PROCESSOR);
- tm.put(null, INCR_PROCESSOR);
-
- // WARN: F.asMap() doesn't work here, because it will throw NPE.
-
- cache.transformAll(tm);
- }
-
- {
- Map<String, IgniteClosure<Integer, Integer>> tm = new HashMap<>(2);
-
- tm.put("key1", INCR_PROCESSOR);
- tm.put("key2", null);
-
- // WARN: F.asMap() doesn't work here, because it will throw NPE.
-
- cache.transformAll(tm);
- }
-
- cache.transformAll(null, INCR_PROCESSOR); // This should be no-op.
-
- {
- Set<String> ts = new HashSet<>(3);
+ Set<String> keys = new HashSet<>(2);
- ts.add("key1");
- ts.add(null);
+ keys.add("key1");
+ keys.add(null);
// WARN: F.asSet() doesn't work here, because it will throw NPE.
- cache.transformAll(ts, INCR_PROCESSOR);
+ cache.invokeAll(keys, INCR_PROCESSOR);
}
}
@@ -1014,17 +1008,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
*/
private void checkTransformSequential0(boolean startVal, IgniteTxConcurrency concurrency)
throws Exception {
- GridCacheProjection<String, Integer> cache = cache();
+ IgniteCache<String, Integer> cache = jcache();
- IgniteTx tx = txEnabled() ? cache.txStart(concurrency, READ_COMMITTED) : null;
+ IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;
try {
if (startVal)
cache.put("key", 2);
- cache.transform("key", INCR_PROCESSOR);
- cache.transform("key", INCR_PROCESSOR);
- cache.transform("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
if (tx != null)
tx.commit();
@@ -1063,18 +1057,18 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
private void checkTransformAfterRemove(IgniteTxConcurrency concurrency) throws Exception {
- GridCacheProjection<String, Integer> cache = cache();
+ IgniteCache<String, Integer> cache = jcache();
cache.put("key", 4);
- IgniteTx tx = txEnabled() ? cache.txStart(concurrency, READ_COMMITTED) : null;
+ IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;
try {
cache.remove("key");
- cache.transform("key", INCR_PROCESSOR);
- cache.transform("key", INCR_PROCESSOR);
- cache.transform("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
if (tx != null)
tx.commit();
@@ -1128,20 +1122,23 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @param isolation Isolation.
* @throws Exception If failed.
*/
- private void checkTransformReturnValue(boolean put, IgniteTxConcurrency concurrency,
- IgniteTxIsolation isolation) throws Exception {
- GridCacheProjection<String, Integer> cache = cache();
+ private void checkTransformReturnValue(boolean put,
+ IgniteTxConcurrency concurrency,
+ IgniteTxIsolation isolation)
+ throws Exception
+ {
+ IgniteCache<String, Integer> cache = jcache();
if (!put)
cache.put("key", 1);
- IgniteTx tx = txEnabled() ? cache.txStart(concurrency, isolation) : null;
+ IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, isolation) : null;
try {
if (put)
cache.put("key", 1);
- cache.transform("key", INCR_PROCESSOR);
+ cache.invoke("key", INCR_PROCESSOR);
assertEquals((Integer)2, cache.get("key"));
@@ -1211,33 +1208,25 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @throws Exception If failed.
*/
- public void testTransformEntry() throws Exception {
- GridCacheEntry<String, Integer> entry = cache().entry("test");
+ public void testInvokeAsync() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
- entry.setValue(1);
+ cache.put("key2", 1);
+ cache.put("key3", 3);
- // Make user entry capture cache entry.
- entry.version();
+ cache = cache.enableAsync();
- assertEquals((Integer)1, entry.getValue());
+ assertNull(cache.invoke("key1", INCR_PROCESSOR));
- entry.transform(INCR_PROCESSOR);
+ IgniteFuture<?> fut0 = cache.future();
- assertEquals((Integer)2, entry.getValue());
- }
+ assertNull(cache.invoke("key2", INCR_PROCESSOR));
- /**
- * @throws Exception If failed.
- */
- public void testTransformAsync() throws Exception {
- GridCacheProjection<String, Integer> cache = cache();
+ IgniteFuture<?> fut1 = cache.future();
- cache.put("key2", 1);
- cache.put("key3", 3);
+ assertNull(cache.invoke("key3", RMV_PROCESSOR));
- IgniteFuture<?> fut0 = cache.transformAsync("key1", INCR_PROCESSOR);
- IgniteFuture<?> fut1 = cache.transformAsync("key2", INCR_PROCESSOR);
- IgniteFuture<?> fut2 = cache.transformAsync("key3", RMV_PROCESSOR);
+ IgniteFuture<?> fut2 = cache.future();
fut0.get();
fut1.get();
@@ -1254,46 +1243,54 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @throws Exception If failed.
*/
- public void testTransformCompute() throws Exception {
- GridCacheProjection<String, Integer> cache = cache();
-
- IgniteClosure<Integer, IgniteBiTuple<Integer, String>> c;
-
- c = new IgniteClosure<Integer, IgniteBiTuple<Integer, String>>() {
- @Override public IgniteBiTuple<Integer, String> apply(Integer val) {
- return val == null ? new IgniteBiTuple<>(0, "null") : new IgniteBiTuple<>(val + 1, String.valueOf(val));
- }
- };
+ public void testInvoke() throws Exception {
+ final IgniteCache<String, Integer> cache = jcache();
- assertEquals("null", cache.transformAndCompute("k0", c));
+ assertEquals("null", cache.invoke("k0", INCR_PROCESSOR));
- assertEquals((Integer)0, cache.get("k0"));
+ assertEquals((Integer)1, cache.get("k0"));
- assertEquals("0", cache.transformAndCompute("k0", c));
+ assertEquals("1", cache.invoke("k0", INCR_PROCESSOR));
- assertEquals((Integer)1, cache.get("k0"));
+ assertEquals((Integer)2, cache.get("k0"));
cache.put("k1", 1);
- assertEquals("1", cache.transformAndCompute("k1", c));
+ assertEquals("1", cache.invoke("k1", INCR_PROCESSOR));
assertEquals((Integer)2, cache.get("k1"));
- assertEquals("2", cache.transformAndCompute("k1", c));
+ assertEquals("2", cache.invoke("k1", INCR_PROCESSOR));
assertEquals((Integer)3, cache.get("k1"));
- c = new IgniteClosure<Integer, IgniteBiTuple<Integer, String>>() {
- @Override public IgniteBiTuple<Integer, String> apply(Integer integer) {
- return new IgniteBiTuple<>(null, null);
+ EntryProcessor<String, Integer, Integer> c = new EntryProcessor<String, Integer, Integer>() {
+ @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+ e.remove();
+
+ return null;
}
};
- assertNull(cache.transformAndCompute("k1", c));
+ assertNull(cache.invoke("k1", c));
assertNull(cache.get("k1"));
for (int i = 0; i < gridCount(); i++)
assertNull(cache(i).peek("k1"));
+
+ final EntryProcessor<String, Integer, Integer> errProcessor = new EntryProcessor<String, Integer, Integer>() {
+ @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+ throw new EntryProcessorException("Test entry processor exception.");
+ }
+ };
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.invoke("k1", errProcessor);
+
+ return null;
+ }
+ }, EntryProcessorException.class, "Test entry processor exception.");
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
index 8a929bf..c204e00 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
@@ -9,6 +9,7 @@
package org.gridgain.grid.kernal.processors.cache;
+import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
@@ -17,6 +18,8 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.gridgain.testframework.junits.common.*;
+import javax.cache.processor.*;
+import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -108,19 +111,21 @@ public abstract class GridCacheGetAndTransformStoreAbstractTest extends GridComm
startGrid(1);
startGrid(2);
- final IgniteClosure<String, String> trans = new TransformClosure();
+ final Processor entryProcessor = new Processor();
IgniteFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
- GridCache<Integer, String> c = cache(ThreadLocalRandom.current().nextInt(3));
+ IgniteCache<Integer, String> c = jcache(ThreadLocalRandom.current().nextInt(3));
while (!finish.get() && !Thread.currentThread().isInterrupted()) {
c.get(ThreadLocalRandom.current().nextInt(100));
+
c.put(ThreadLocalRandom.current().nextInt(100), "s");
- c.transform(
+
+ c.invoke(
ThreadLocalRandom.current().nextInt(100),
- trans);
+ entryProcessor);
}
return null;
@@ -147,10 +152,12 @@ public abstract class GridCacheGetAndTransformStoreAbstractTest extends GridComm
/**
*
*/
- private static class TransformClosure implements IgniteClosure<String, String> {
+ private static class Processor implements EntryProcessor<Integer, String, Void>, Serializable {
/** {@inheritDoc} */
- @Override public String apply(String s) {
- return "str";
+ @Override public Void process(MutableEntry<Integer, String> e, Object... args) {
+ e.setValue("str");
+
+ return null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
index 44b8618..0e8602c 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
@@ -10,16 +10,17 @@
package org.gridgain.grid.kernal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.grid.util.typedef.*;
import org.gridgain.testframework.*;
import org.gridgain.testframework.junits.common.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.*;
@@ -162,7 +163,7 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest {
ignite = restarts ? grids.getAndSet(idx, null) : grid(idx);
}
- GridCache <String, TestObject> cache = ignite.cache(null);
+ IgniteCache<String, TestObject> cache = ignite.jcache(null);
assertNotNull(cache);
@@ -173,11 +174,11 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest {
while (true) {
try {
- cache.transform("key", new Transformer());
+ cache.invoke("key", new Processor());
break;
}
- catch (GridCachePartialUpdateException ignored) {
+ catch (CachePartialUpdateException ignored) {
// Need to re-check if update actually succeeded.
TestObject updated = cache.get("key");
@@ -210,12 +211,16 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest {
}
/** */
- private static class Transformer implements C1<TestObject, TestObject> {
+ private static class Processor implements EntryProcessor<String, TestObject, Void>, Serializable {
/** {@inheritDoc} */
- @Override public TestObject apply(TestObject obj) {
+ @Override public Void process(MutableEntry<String, TestObject> e, Object... args) {
+ TestObject obj = e.getValue();
+
assert obj != null;
- return new TestObject(obj.val + 1);
+ e.setValue(new TestObject(obj.val + 1));
+
+ return null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
index b42406d..66abdc6 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
@@ -9,6 +9,7 @@
package org.gridgain.grid.kernal.processors.cache;
+import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
@@ -18,6 +19,7 @@ import org.gridgain.grid.util.typedef.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
import java.util.*;
import java.util.concurrent.atomic.*;
@@ -1172,27 +1174,28 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
private void cacheUpdate(int grid, boolean rmv, Operation op, String key, final Integer val,
@Nullable final Integer expOld, @Nullable final Integer expRmvRet)
throws Exception {
- GridCache<String, Integer> cache = cache(grid);
+ IgniteCache<String, Integer> cache = jcache(grid);
if (rmv) {
assertNull(val);
switch (op) {
case UPDATE: {
- assertEquals(expRmvRet, cache.remove(key));
+ assertEquals(expRmvRet, cache.getAndRemove(key));
break;
}
case UPDATEX: {
- cache.removex(key);
+ cache.remove(key);
break;
}
case UPDATE_FILTER: {
- Object old = cache.remove(key, new IgnitePredicate<GridCacheEntry<String, Integer>>() {
- @Override public boolean apply(GridCacheEntry<String, Integer> entry) {
+ Object old = cache.getAndRemoveIf(key, new IgnitePredicate<GridCacheEntry<String, Integer>>() {
+ @Override
+ public boolean apply(GridCacheEntry<String, Integer> entry) {
return true;
}
});
@@ -1203,10 +1206,15 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
}
case TRANSFORM: {
- cache.transform(key, new IgniteClosure<Integer, Integer>() {
- @Nullable @Override public Integer apply(Integer old) {
+ cache.invoke(key, new EntryProcessor<String, Integer, Void>() {
+ @Override
+ public Void process(MutableEntry<String, Integer> e, Object... args) {
+ Integer old = e.getValue();
+
assertEquals(expOld, old);
+ e.remove();
+
return null;
}
});
@@ -1221,20 +1229,21 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
else {
switch (op) {
case UPDATE: {
- assertEquals(expOld, cache.put(key, val));
+ assertEquals(expOld, cache.getAndPut(key, val));
break;
}
case UPDATEX: {
- cache.putx(key, val);
+ cache.put(key, val);
break;
}
case UPDATE_FILTER: {
- Object old = cache.put(key, val, new P1<GridCacheEntry<String, Integer>>() {
- @Override public boolean apply(GridCacheEntry<String, Integer> entry) {
+ Object old = cache.getAndPutIf(key, val, new P1<GridCacheEntry<String, Integer>>() {
+ @Override
+ public boolean apply(GridCacheEntry<String, Integer> entry) {
return true;
}
});
@@ -1245,11 +1254,16 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
}
case TRANSFORM: {
- cache.transform(key, new IgniteClosure<Integer, Integer>() {
- @Override public Integer apply(Integer old) {
+ cache.invoke(key, new EntryProcessor<String, Integer, Void>() {
+ @Override
+ public Void process(MutableEntry<String, Integer> e, Object... args) {
+ Integer old = e.getValue();
+
assertEquals(expOld, old);
- return val;
+ e.setValue(val);
+
+ return null;
}
});
@@ -1294,7 +1308,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
@SuppressWarnings("unchecked")
private void cacheBatchUpdate(int grid, boolean rmv, Operation op, final Map<String, Integer> map)
throws Exception {
- GridCache<String, Integer> cache = cache(grid);
+ IgniteCache<String, Integer> cache = jcache(grid);
if (rmv) {
switch (op) {
@@ -1305,8 +1319,10 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
}
case TRANSFORM: {
- cache.transformAll(map.keySet(), new IgniteClosure<Integer, Integer>() {
- @Nullable @Override public Integer apply(Integer old) {
+ cache.invokeAll(map.keySet(), new EntryProcessor<String, Integer, Void>() {
+ @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+ e.remove();
+
return null;
}
});
@@ -1327,17 +1343,13 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
}
case TRANSFORM: {
- Map<String, IgniteClosure<Integer, Integer>> m = new HashMap<>();
-
- for (final String key : map.keySet()) {
- m.put(key, new IgniteClosure<Integer, Integer>() {
- @Override public Integer apply(Integer old) {
- return map.get(key);
- }
- });
- }
+ cache.invokeAll(map.keySet(), new EntryProcessor<String, Integer, Void>() {
+ @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+ e.setValue(map.get(e.getKey()));
- cache.transformAll(m);
+ return null;
+ }
+ });
break;
}