You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/04 16:03:45 UTC
[37/50] [abbrv] ignite git commit: IGNITE-9540: MVCC: support
IgniteCache.invoke method family. This closes #4832. This closes #4881.
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index af74996..4d1145c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -41,12 +41,15 @@ import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.expiry.Duration;
import javax.cache.expiry.TouchedExpiryPolicy;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ScanQuery;
@@ -283,6 +286,56 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticTx3() throws Exception {
+ checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() {
+ @Override public void apply(IgniteCache<Integer, Integer> cache) {
+ try {
+ IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ Integer res = cache.invoke(key, new CacheEntryProcessor<Integer, Integer, Integer>() {
+ @Override public Integer process(MutableEntry<Integer, Integer> entry,
+ Object... arguments) throws EntryProcessorException {
+
+ entry.setValue(key);
+
+ return -key;
+ }
+ });
+
+ assertEquals(Integer.valueOf(-key), res);
+
+ val = (Integer)checkAndGet(true, cache, key, GET, SCAN);
+
+ assertEquals(key, val);
+
+ tx.commit();
+ }
+
+ Integer val = (Integer)checkAndGet(false, cache, key, SCAN, GET);
+
+ assertEquals(key, val);
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ /**
* @param c Closure to run.
* @throws Exception If failed.
*/
@@ -3055,6 +3108,34 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
assertEquals(size, cache.size());
}
+ // Check rollback create.
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.rollback();
+ }
+
+ assertEquals(size, cache.size());
+ }
+ }
+
+ // Check rollback update.
+ for (int i = 0; i < KEYS; i++) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, -1);
+
+ tx.rollback();
+ }
+
+ assertEquals(size, cache.size());
+ }
+
// Check rollback remove.
for (int i = 0; i < KEYS; i++) {
final Integer key = i;
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
index 76f8013..dcd46ff 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
@@ -263,33 +263,6 @@ public class SqlTransactionsCommandsWithMvccEnabledSelfTest extends AbstractSche
return arg.getClass();
}
- /**
- * Test that attempting to perform a cache PUT operation from within an SQL transaction fails.
- */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public void testCacheOperationsFromSqlTransaction() {
- checkCacheOperationThrows("invoke", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invoke", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAsync", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAsync", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAll", Collections.singletonMap(1, CACHE_ENTRY_PROC), X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAll", Collections.singleton(1), CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAll", Collections.singleton(1), ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAllAsync", Collections.singletonMap(1, CACHE_ENTRY_PROC),
- X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAllAsync", Collections.singleton(1), CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAllAsync", Collections.singleton(1), ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
- }
-
/** */
private final static EntryProcessor<Integer, Integer, Object> ENTRY_PROC =
new EntryProcessor<Integer, Integer, Object>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
index 4ea53e0..bcbfbc2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
@@ -151,7 +151,7 @@ public abstract class CacheMvccSqlQueriesAbstractTest extends CacheMvccAbstractT
* @throws Exception If failed.
*/
private void updateSingleValue(boolean singleNode, final boolean locQry) throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-9540");
+ fail("https://issues.apache.org/jira/browse/IGNITE-9470");
final int VALS = 100;
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
index 46aeaa1..5ec96e4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
@@ -26,12 +28,22 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.testframework.GridTestUtils;
@@ -41,6 +53,7 @@ import org.apache.ignite.transactions.TransactionIsolation;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.GET;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.INVOKE;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT;
@@ -103,6 +116,14 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testRepeatableReadIsolationInvoke() throws Exception {
+ checkOperations(GET, GET, WriteMode.INVOKE, true);
+ checkOperations(GET, GET, WriteMode.INVOKE, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testRepeatableReadIsolationSqlPut() throws Exception {
checkOperations(SQL, SQL, PUT, true);
checkOperations(SQL, SQL, PUT, false);
@@ -111,6 +132,14 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testRepeatableReadIsolationSqlInvoke() throws Exception {
+ checkOperations(SQL, SQL, WriteMode.INVOKE, true);
+ checkOperations(SQL, SQL, WriteMode.INVOKE, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testRepeatableReadIsolationSqlDml() throws Exception {
checkOperations(SQL, SQL, DML, true);
checkOperations(SQL, SQL, DML, false);
@@ -130,6 +159,8 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
public void testRepeatableReadIsolationMixedPut() throws Exception {
checkOperations(SQL, GET, PUT, false);
checkOperations(SQL, GET, PUT, true);
+ checkOperations(SQL, GET, WriteMode.INVOKE, false);
+ checkOperations(SQL, GET, WriteMode.INVOKE, true);
}
/**
@@ -138,6 +169,8 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
public void testRepeatableReadIsolationMixedPut2() throws Exception {
checkOperations(GET, SQL, PUT, false);
checkOperations(GET, SQL, PUT, true);
+ checkOperations(GET, SQL, WriteMode.INVOKE, false);
+ checkOperations(GET, SQL, WriteMode.INVOKE, true);
}
/**
@@ -162,15 +195,72 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
public void testOperationConsistency() throws Exception {
checkOperationsConsistency(PUT, false);
checkOperationsConsistency(DML, false);
+ checkOperationsConsistency(WriteMode.INVOKE, false);
checkOperationsConsistency(PUT, true);
checkOperationsConsistency(DML, true);
+ checkOperationsConsistency(WriteMode.INVOKE, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeConsistency() throws Exception {
+ Ignite node = grid(/*requestFromClient ? nodesCount() - 1 :*/ 0);
+
+ TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME));
+
+ final Set<Integer> keys1 = new HashSet<>(3);
+ final Set<Integer> keys2 = new HashSet<>(3);
+
+ Set<Integer> allKeys = generateKeySet(cache.cache, keys1, keys2);
+
+ final Map<Integer, MvccTestAccount> map1 = keys1.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ final Map<Integer, MvccTestAccount> map2 = keys2.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ assertEquals(0, cache.cache.size());
+
+ updateEntries(cache, map1, WriteMode.INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ updateEntries(cache, map1, WriteMode.INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ getEntries(cache, allKeys, INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ updateEntries(cache, map2, WriteMode.INVOKE);
+ assertEquals(6, cache.cache.size());
+
+ getEntries(cache, keys2, INVOKE);
+ assertEquals(6, cache.cache.size());
+
+ removeEntries(cache, keys1, WriteMode.INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ removeEntries(cache, keys1, WriteMode.INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ getEntries(cache, allKeys, INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ updateEntries(cache, map1, WriteMode.INVOKE);
+ assertEquals(6, cache.cache.size());
+
+ removeEntries(cache, allKeys, WriteMode.INVOKE);
+ assertEquals(0, cache.cache.size());
+
+ getEntries(cache, allKeys, INVOKE);
+ assertEquals(0, cache.cache.size());
}
/**
* Checks SQL and CacheAPI operation isolation consistency.
*
* @param readModeBefore read mode used before value updated.
- * @param readModeBefore read mode used after value updated.
+ * @param readModeAfter read mode used after value updated.
* @param writeMode write mode used for update.
* @throws Exception If failed.
*/
@@ -206,20 +296,23 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
@Override public Void call() throws Exception {
updateStart.await();
+ assertEquals(initialMap.size(), cache2.cache.size());
+
try (Transaction tx = txs2.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ tx.timeout(TX_TIMEOUT);
updateEntries(cache2, updateMap, writeMode);
removeEntries(cache2, keysForRemove, writeMode);
- checkContains(cache2, true, updateMap.keySet());
- checkContains(cache2, false, keysForRemove);
-
assertEquals(updateMap, cache2.cache.getAll(allKeys));
tx.commit();
}
+ finally {
+ updateFinish.countDown();
+ }
- updateFinish.countDown();
+ assertEquals(updateMap.size(), cache2.cache.size());
return null;
}
@@ -270,7 +363,7 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
* @return All keys.
* @throws IgniteCheckedException If failed.
*/
- protected Set<Integer> generateKeySet(IgniteCache<Object, Object> cache, Set<Integer> keySet1,
+ protected Set<Integer> generateKeySet(IgniteCache<?, ?> cache, Set<Integer> keySet1,
Set<Integer> keySet2) throws IgniteCheckedException {
LinkedHashSet<Integer> allKeys = new LinkedHashSet<>();
@@ -302,50 +395,72 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME));
- final Set<Integer> keysForUpdate = new HashSet<>(3);
- final Set<Integer> keysForRemove = new HashSet<>(3);
- final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove);
+ final Set<Integer> keysForUpdate = new HashSet<>(3);
+ final Set<Integer> keysForRemove = new HashSet<>(3);
- int updCnt = 1;
+ final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove);
- final Map<Integer, MvccTestAccount> initialVals = allKeys.stream().collect(
- Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+ try {
+ int updCnt = 1;
+
+ final Map<Integer, MvccTestAccount> initialVals = allKeys.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ updateEntries(cache, initialVals, writeMode);
+
+ assertEquals(initialVals.size(), cache.cache.size());
+
+ IgniteTransactions txs = node.transactions();
+
+ Map<Integer, MvccTestAccount> updatedVals = null;
- cache.cache.putAll(initialVals);
+ try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ Map<Integer, MvccTestAccount> vals1 = getEntries(cache, allKeys, GET);
+ Map<Integer, MvccTestAccount> vals2 = getEntries(cache, allKeys, SQL);
+ Map<Integer, MvccTestAccount> vals3 = getEntries(cache, allKeys, ReadMode.INVOKE);
- IgniteTransactions txs = node.transactions();
+ assertEquals(initialVals, vals1);
+ assertEquals(initialVals, vals2);
+ assertEquals(initialVals, vals3);
- Map<Integer, MvccTestAccount> updatedVals = null;
+ assertEquals(initialVals.size(), cache.cache.size());
- try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
- Map<Integer, MvccTestAccount> vals1 = getEntries(cache, allKeys, GET);
- Map<Integer, MvccTestAccount> vals2 = getEntries(cache, allKeys, SQL);
+ for (ReadMode readMode : new ReadMode[] {GET, SQL, INVOKE}) {
+ int updCnt0 = ++updCnt;
- assertEquals(initialVals, vals1);
- assertEquals(initialVals, vals2);
+ updatedVals = allKeys.stream().collect(Collectors.toMap(Function.identity(),
+ k -> new MvccTestAccount(k, updCnt0)));
- for (ReadMode readMode : new ReadMode[] {GET, SQL}) {
- int updCnt0 = ++updCnt;
+ updateEntries(cache, updatedVals, writeMode);
+ assertEquals(allKeys.size(), cache.cache.size());
- updatedVals = keysForUpdate.stream().collect(Collectors.toMap(Function.identity(),
- k -> new MvccTestAccount(k, updCnt0)));
+ removeEntries(cache, keysForRemove, writeMode);
- updateEntries(cache, updatedVals, writeMode);
- removeEntries(cache, keysForRemove, writeMode);
+ for (Integer key : keysForRemove)
+ updatedVals.remove(key);
- assertEquals(String.valueOf(readMode), updatedVals, getEntries(cache, allKeys, readMode));
+ assertEquals(String.valueOf(readMode), updatedVals, getEntries(cache, allKeys, readMode));
+ }
+
+ tx.commit();
}
- tx.commit();
- }
+ try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ assertEquals(updatedVals, getEntries(cache, allKeys, GET));
+ assertEquals(updatedVals, getEntries(cache, allKeys, SQL));
+ assertEquals(updatedVals, getEntries(cache, allKeys, INVOKE));
- try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
- assertEquals(updatedVals, getEntries(cache, allKeys, GET));
- assertEquals(updatedVals, getEntries(cache, allKeys, SQL));
+ tx.commit();
+ }
- tx.commit();
+ assertEquals(updatedVals.size(), cache.cache.size());
+ }
+ finally {
+ cache.cache.removeAll(keysForUpdate);
}
+
+ assertEquals(0, cache.cache.size());
}
/**
@@ -365,6 +480,18 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
return cache.cache.getAll(keys);
case SQL:
return getAllSql(cache);
+ case INVOKE: {
+ Map<Integer, MvccTestAccount> res = new HashMap<>();
+
+ CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new GetEntryProcessor<>();
+
+ Map<Integer, EntryProcessorResult<MvccTestAccount>> invokeRes = cache.cache.invokeAll(keys, ep);
+
+ for (Map.Entry<Integer, EntryProcessorResult<MvccTestAccount>> e : invokeRes.entrySet())
+ res.put(e.getKey(), e.getValue().get());
+
+ return res;
+ }
default:
fail();
}
@@ -395,6 +522,19 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
break;
}
+ case INVOKE: {
+ CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep =
+ new GetAndPutEntryProcessor<Integer, MvccTestAccount>(){
+ /** {@inheritDoc} */
+ @Override MvccTestAccount newValForKey(Integer key) {
+ return entries.get(key);
+ }
+ };
+
+ cache.cache.invokeAll(entries.keySet(), ep);
+
+ break;
+ }
default:
fail();
}
@@ -423,6 +563,13 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
break;
}
+ case INVOKE: {
+ CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new RemoveEntryProcessor<>();
+
+ cache.cache.invokeAll(keys, ep);
+
+ break;
+ }
default:
fail();
}
@@ -438,4 +585,52 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
protected void checkContains(TestCache<Integer, MvccTestAccount> cache, boolean expected, Set<Integer> keys) {
assertEquals(expected, cache.cache.containsKeys(keys));
}
+
+ /**
+ * Applies get operation.
+ */
+ static class GetEntryProcessor<K, V> implements CacheEntryProcessor<K, V, V> {
+ /** {@inheritDoc} */
+ @Override public V process(MutableEntry<K, V> entry,
+ Object... arguments) throws EntryProcessorException {
+ return entry.getValue();
+ }
+ }
+
+ /**
+ * Applies remove operation.
+ */
+ static class RemoveEntryProcessor<K, V, R> implements CacheEntryProcessor<K, V, R> {
+ /** {@inheritDoc} */
+ @Override public R process(MutableEntry<K, V> entry,
+ Object... arguments) throws EntryProcessorException {
+ entry.remove();
+
+ return null;
+ }
+ }
+
+ /**
+ * Applies get and put operation.
+ */
+ static class GetAndPutEntryProcessor<K, V> implements CacheEntryProcessor<K, V, V> {
+ /** {@inheritDoc} */
+ @Override public V process(MutableEntry<K, V> entry,
+ Object... args) throws EntryProcessorException {
+ V newVal = !F.isEmpty(args) ? (V)args[0] : newValForKey(entry.getKey());
+
+ V oldVal = entry.getValue();
+ entry.setValue(newVal);
+
+ return oldVal;
+ }
+
+ /**
+ * @param key Key.
+ * @return New value.
+ */
+ V newValForKey(K key){
+ throw new UnsupportedOperationException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
index c782f98..618d910 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
@@ -22,9 +22,12 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -55,8 +58,24 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
return res;
}
+
case SQL:
return getAllSql(cache);
+
+ case INVOKE: {
+ Map<Integer, MvccTestAccount> res = new HashMap<>();
+
+ CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new GetEntryProcessor();
+
+ for (Integer key : keys) {
+ MvccTestAccount val = cache.cache.invoke(key, ep);
+
+ if(val != null)
+ res.put(key, val);
+ }
+
+ return res;
+ }
default:
fail();
}
@@ -65,7 +84,7 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
}
/** {@inheritDoc} */
- protected void updateEntries(
+ @Override protected void updateEntries(
TestCache<Integer, MvccTestAccount> cache,
Map<Integer, MvccTestAccount> entries,
WriteMode writeMode) {
@@ -79,6 +98,7 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
break;
}
+
case DML: {
for (Map.Entry<Integer, MvccTestAccount> e : entries.entrySet()) {
if (e.getValue() == null)
@@ -88,13 +108,23 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
}
break;
}
+
+ case INVOKE: {
+ GetAndPutEntryProcessor<Integer, MvccTestAccount> ep = new GetAndPutEntryProcessor<>();
+
+ for (final Map.Entry<Integer, MvccTestAccount> e : entries.entrySet())
+ cache.cache.invoke(e.getKey(), ep, e.getValue());
+
+ break;
+ }
+
default:
fail();
}
}
/** {@inheritDoc} */
- protected void removeEntries(
+ @Override protected void removeEntries(
TestCache<Integer, MvccTestAccount> cache,
Set<Integer> keys,
WriteMode writeMode) {
@@ -111,6 +141,14 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
break;
}
+ case INVOKE: {
+ CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new RemoveEntryProcessor<>();
+
+ for (Integer key : keys)
+ cache.cache.invoke(key, ep);
+
+ break;
+ }
default:
fail();
}