You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/04 16:03:45 UTC

[37/50] [abbrv] ignite git commit: IGNITE-9540: MVCC: support IgniteCache.invoke method family. This closes #4832. This closes #4881.

http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index af74996..4d1145c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -41,12 +41,15 @@ import java.util.stream.Collectors;
 import javax.cache.Cache;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.TouchedExpiryPolicy;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ScanQuery;
@@ -283,6 +286,56 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticTx3() throws Exception {
+        checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() {
+            @Override public void apply(IgniteCache<Integer, Integer> cache) {
+                try {
+                    IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+                    List<Integer> keys = testKeys(cache);
+
+                    for (Integer key : keys) {
+                        log.info("Test key: " + key);
+
+                        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                            Integer val = cache.get(key);
+
+                            assertNull(val);
+
+                            Integer res = cache.invoke(key, new CacheEntryProcessor<Integer, Integer, Integer>() {
+                                @Override public Integer process(MutableEntry<Integer, Integer> entry,
+                                    Object... arguments) throws EntryProcessorException {
+
+                                    entry.setValue(key);
+
+                                    return -key;
+                                }
+                            });
+
+                            assertEquals(Integer.valueOf(-key), res);
+
+                            val = (Integer)checkAndGet(true, cache, key, GET, SCAN);
+
+                            assertEquals(key, val);
+
+                            tx.commit();
+                        }
+
+                        Integer val = (Integer)checkAndGet(false, cache, key, SCAN, GET);
+
+                        assertEquals(key, val);
+                    }
+                }
+                catch (Exception e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+    }
+
+    /**
      * @param c Closure to run.
      * @throws Exception If failed.
      */
@@ -3055,6 +3108,34 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
             assertEquals(size, cache.size());
         }
 
+        // Check rollback create.
+        for (int i = 0; i < KEYS; i++) {
+            if (i % 2 == 0) {
+                final Integer key = i;
+
+                try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.put(key, i);
+
+                    tx.rollback();
+                }
+
+                assertEquals(size, cache.size());
+            }
+        }
+
+        // Check rollback update.
+        for (int i = 0; i < KEYS; i++) {
+            final Integer key = i;
+
+            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.put(key, -1);
+
+                tx.rollback();
+            }
+
+            assertEquals(size, cache.size());
+        }
+
         // Check rollback remove.
         for (int i = 0; i < KEYS; i++) {
             final Integer key = i;

http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
index 76f8013..dcd46ff 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
@@ -263,33 +263,6 @@ public class SqlTransactionsCommandsWithMvccEnabledSelfTest extends AbstractSche
             return arg.getClass();
     }
 
-    /**
-     * Test that attempting to perform a cache PUT operation from within an SQL transaction fails.
-     */
-    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    public void testCacheOperationsFromSqlTransaction() {
-        checkCacheOperationThrows("invoke", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
-        checkCacheOperationThrows("invoke", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
-        checkCacheOperationThrows("invokeAsync", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
-        checkCacheOperationThrows("invokeAsync", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
-        checkCacheOperationThrows("invokeAll", Collections.singletonMap(1, CACHE_ENTRY_PROC), X.EMPTY_OBJECT_ARRAY);
-
-        checkCacheOperationThrows("invokeAll", Collections.singleton(1), CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
-        checkCacheOperationThrows("invokeAll", Collections.singleton(1), ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
-        checkCacheOperationThrows("invokeAllAsync", Collections.singletonMap(1, CACHE_ENTRY_PROC),
-            X.EMPTY_OBJECT_ARRAY);
-
-        checkCacheOperationThrows("invokeAllAsync", Collections.singleton(1), CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
-        checkCacheOperationThrows("invokeAllAsync", Collections.singleton(1), ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-    }
-
     /** */
     private final static EntryProcessor<Integer, Integer, Object> ENTRY_PROC =
         new EntryProcessor<Integer, Integer, Object>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
index 4ea53e0..bcbfbc2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
@@ -151,7 +151,7 @@ public abstract class CacheMvccSqlQueriesAbstractTest extends CacheMvccAbstractT
      * @throws Exception If failed.
      */
     private void updateSingleValue(boolean singleNode, final boolean locQry) throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-9540");
+        fail("https://issues.apache.org/jira/browse/IGNITE-9470");
 
         final int VALS = 100;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
index 46aeaa1..5ec96e4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -26,12 +28,22 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -41,6 +53,7 @@ import org.apache.ignite.transactions.TransactionIsolation;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.GET;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.INVOKE;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT;
@@ -103,6 +116,14 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testRepeatableReadIsolationInvoke() throws Exception {
+        checkOperations(GET, GET, WriteMode.INVOKE, true);
+        checkOperations(GET, GET, WriteMode.INVOKE, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testRepeatableReadIsolationSqlPut() throws Exception {
         checkOperations(SQL, SQL, PUT, true);
         checkOperations(SQL, SQL, PUT, false);
@@ -111,6 +132,14 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testRepeatableReadIsolationSqlInvoke() throws Exception {
+        checkOperations(SQL, SQL, WriteMode.INVOKE, true);
+        checkOperations(SQL, SQL, WriteMode.INVOKE, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testRepeatableReadIsolationSqlDml() throws Exception {
         checkOperations(SQL, SQL, DML, true);
         checkOperations(SQL, SQL, DML, false);
@@ -130,6 +159,8 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
     public void testRepeatableReadIsolationMixedPut() throws Exception {
         checkOperations(SQL, GET, PUT, false);
         checkOperations(SQL, GET, PUT, true);
+        checkOperations(SQL, GET, WriteMode.INVOKE, false);
+        checkOperations(SQL, GET, WriteMode.INVOKE, true);
     }
 
     /**
@@ -138,6 +169,8 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
     public void testRepeatableReadIsolationMixedPut2() throws Exception {
         checkOperations(GET, SQL, PUT, false);
         checkOperations(GET, SQL, PUT, true);
+        checkOperations(GET, SQL, WriteMode.INVOKE, false);
+        checkOperations(GET, SQL, WriteMode.INVOKE, true);
     }
 
     /**
@@ -162,15 +195,72 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
     public void testOperationConsistency() throws Exception {
         checkOperationsConsistency(PUT, false);
         checkOperationsConsistency(DML, false);
+        checkOperationsConsistency(WriteMode.INVOKE, false);
         checkOperationsConsistency(PUT, true);
         checkOperationsConsistency(DML, true);
+        checkOperationsConsistency(WriteMode.INVOKE, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeConsistency() throws Exception {
+        Ignite node = grid(/*requestFromClient ? nodesCount() - 1 :*/ 0);
+
+        TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME));
+
+        final Set<Integer> keys1 = new HashSet<>(3);
+        final Set<Integer> keys2 = new HashSet<>(3);
+
+        Set<Integer> allKeys = generateKeySet(cache.cache, keys1, keys2);
+
+        final Map<Integer, MvccTestAccount> map1 = keys1.stream().collect(
+            Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+        final Map<Integer, MvccTestAccount> map2 = keys2.stream().collect(
+            Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+        assertEquals(0, cache.cache.size());
+
+        updateEntries(cache, map1, WriteMode.INVOKE);
+        assertEquals(3, cache.cache.size());
+
+        updateEntries(cache, map1, WriteMode.INVOKE);
+        assertEquals(3, cache.cache.size());
+
+        getEntries(cache, allKeys, INVOKE);
+        assertEquals(3, cache.cache.size());
+
+        updateEntries(cache, map2, WriteMode.INVOKE);
+        assertEquals(6, cache.cache.size());
+
+        getEntries(cache, keys2, INVOKE);
+        assertEquals(6, cache.cache.size());
+
+        removeEntries(cache, keys1, WriteMode.INVOKE);
+        assertEquals(3, cache.cache.size());
+
+        removeEntries(cache, keys1, WriteMode.INVOKE);
+        assertEquals(3, cache.cache.size());
+
+        getEntries(cache, allKeys, INVOKE);
+        assertEquals(3, cache.cache.size());
+
+        updateEntries(cache, map1, WriteMode.INVOKE);
+        assertEquals(6, cache.cache.size());
+
+        removeEntries(cache, allKeys, WriteMode.INVOKE);
+        assertEquals(0, cache.cache.size());
+
+        getEntries(cache, allKeys, INVOKE);
+        assertEquals(0, cache.cache.size());
     }
 
     /**
      * Checks SQL and CacheAPI operation isolation consistency.
      *
      * @param readModeBefore read mode used before value updated.
-     * @param readModeBefore read mode used after value updated.
+     * @param readModeAfter read mode used after value updated.
      * @param writeMode write mode used for update.
      * @throws Exception If failed.
      */
@@ -206,20 +296,23 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
             @Override public Void call() throws Exception {
                 updateStart.await();
 
+                assertEquals(initialMap.size(), cache2.cache.size());
+
                 try (Transaction tx = txs2.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+                    tx.timeout(TX_TIMEOUT);
 
                     updateEntries(cache2, updateMap, writeMode);
                     removeEntries(cache2, keysForRemove, writeMode);
 
-                    checkContains(cache2, true, updateMap.keySet());
-                    checkContains(cache2, false, keysForRemove);
-
                     assertEquals(updateMap, cache2.cache.getAll(allKeys));
 
                     tx.commit();
                 }
+                finally {
+                    updateFinish.countDown();
+                }
 
-                updateFinish.countDown();
+                assertEquals(updateMap.size(), cache2.cache.size());
 
                 return null;
             }
@@ -270,7 +363,7 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
      * @return All keys.
      * @throws IgniteCheckedException If failed.
      */
-    protected Set<Integer> generateKeySet(IgniteCache<Object, Object> cache, Set<Integer> keySet1,
+    protected Set<Integer> generateKeySet(IgniteCache<?, ?> cache, Set<Integer> keySet1,
         Set<Integer> keySet2) throws IgniteCheckedException {
         LinkedHashSet<Integer> allKeys = new LinkedHashSet<>();
 
@@ -302,50 +395,72 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
 
         TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME));
 
-        final Set<Integer> keysForUpdate = new HashSet<>(3);
-        final Set<Integer> keysForRemove = new HashSet<>(3);
 
-        final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove);
+            final Set<Integer> keysForUpdate = new HashSet<>(3);
+            final Set<Integer> keysForRemove = new HashSet<>(3);
 
-        int updCnt = 1;
+            final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove);
 
-        final Map<Integer, MvccTestAccount> initialVals = allKeys.stream().collect(
-            Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+        try {
+            int updCnt = 1;
+
+            final Map<Integer, MvccTestAccount> initialVals = allKeys.stream().collect(
+                Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+            updateEntries(cache, initialVals, writeMode);
+
+            assertEquals(initialVals.size(), cache.cache.size());
+
+            IgniteTransactions txs = node.transactions();
+
+            Map<Integer, MvccTestAccount> updatedVals = null;
 
-        cache.cache.putAll(initialVals);
+            try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+                Map<Integer, MvccTestAccount> vals1 = getEntries(cache, allKeys, GET);
+                Map<Integer, MvccTestAccount> vals2 = getEntries(cache, allKeys, SQL);
+                Map<Integer, MvccTestAccount> vals3 = getEntries(cache, allKeys, ReadMode.INVOKE);
 
-        IgniteTransactions txs = node.transactions();
+                assertEquals(initialVals, vals1);
+                assertEquals(initialVals, vals2);
+                assertEquals(initialVals, vals3);
 
-        Map<Integer, MvccTestAccount> updatedVals = null;
+                assertEquals(initialVals.size(), cache.cache.size());
 
-        try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
-            Map<Integer, MvccTestAccount> vals1 = getEntries(cache, allKeys, GET);
-            Map<Integer, MvccTestAccount> vals2 = getEntries(cache, allKeys, SQL);
+                for (ReadMode readMode : new ReadMode[] {GET, SQL, INVOKE}) {
+                    int updCnt0 = ++updCnt;
 
-            assertEquals(initialVals, vals1);
-            assertEquals(initialVals, vals2);
+                    updatedVals = allKeys.stream().collect(Collectors.toMap(Function.identity(),
+                        k -> new MvccTestAccount(k, updCnt0)));
 
-            for (ReadMode readMode : new ReadMode[] {GET, SQL}) {
-                int updCnt0 = ++updCnt;
+                    updateEntries(cache, updatedVals, writeMode);
+                    assertEquals(allKeys.size(), cache.cache.size());
 
-                updatedVals = keysForUpdate.stream().collect(Collectors.toMap(Function.identity(),
-                    k -> new MvccTestAccount(k, updCnt0)));
+                    removeEntries(cache, keysForRemove, writeMode);
 
-                updateEntries(cache, updatedVals, writeMode);
-                removeEntries(cache, keysForRemove, writeMode);
+                    for (Integer key : keysForRemove)
+                        updatedVals.remove(key);
 
-                assertEquals(String.valueOf(readMode), updatedVals, getEntries(cache, allKeys, readMode));
+                    assertEquals(String.valueOf(readMode), updatedVals, getEntries(cache, allKeys, readMode));
+                }
+
+                tx.commit();
             }
 
-            tx.commit();
-        }
+            try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+                assertEquals(updatedVals, getEntries(cache, allKeys, GET));
+                assertEquals(updatedVals, getEntries(cache, allKeys, SQL));
+                assertEquals(updatedVals, getEntries(cache, allKeys, INVOKE));
 
-        try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
-            assertEquals(updatedVals, getEntries(cache, allKeys, GET));
-            assertEquals(updatedVals, getEntries(cache, allKeys, SQL));
+                tx.commit();
+            }
 
-            tx.commit();
+            assertEquals(updatedVals.size(), cache.cache.size());
+        }
+        finally {
+            cache.cache.removeAll(keysForUpdate);
         }
+
+        assertEquals(0, cache.cache.size());
     }
 
     /**
@@ -365,6 +480,18 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
                 return cache.cache.getAll(keys);
             case SQL:
                 return getAllSql(cache);
+            case INVOKE: {
+                Map<Integer, MvccTestAccount> res = new HashMap<>();
+
+                CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new GetEntryProcessor<>();
+
+                Map<Integer, EntryProcessorResult<MvccTestAccount>> invokeRes = cache.cache.invokeAll(keys, ep);
+
+                for (Map.Entry<Integer, EntryProcessorResult<MvccTestAccount>> e : invokeRes.entrySet())
+                    res.put(e.getKey(), e.getValue().get());
+
+                return res;
+            }
             default:
                 fail();
         }
@@ -395,6 +522,19 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
 
                 break;
             }
+            case INVOKE: {
+                CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep =
+                    new GetAndPutEntryProcessor<Integer, MvccTestAccount>(){
+                    /** {@inheritDoc} */
+                    @Override MvccTestAccount newValForKey(Integer key) {
+                        return entries.get(key);
+                    }
+                };
+
+                cache.cache.invokeAll(entries.keySet(), ep);
+
+                break;
+            }
             default:
                 fail();
         }
@@ -423,6 +563,13 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
 
                 break;
             }
+            case INVOKE: {
+                CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new RemoveEntryProcessor<>();
+
+                cache.cache.invokeAll(keys, ep);
+
+                break;
+            }
             default:
                 fail();
         }
@@ -438,4 +585,52 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
     protected void checkContains(TestCache<Integer, MvccTestAccount> cache, boolean expected, Set<Integer> keys) {
         assertEquals(expected, cache.cache.containsKeys(keys));
     }
+
+    /**
+     * Applies get operation.
+     */
+    static class GetEntryProcessor<K, V> implements CacheEntryProcessor<K, V, V> {
+        /** {@inheritDoc} */
+        @Override public V process(MutableEntry<K, V> entry,
+            Object... arguments) throws EntryProcessorException {
+            return entry.getValue();
+        }
+    }
+
+    /**
+     * Applies remove operation.
+     */
+    static class RemoveEntryProcessor<K, V, R> implements CacheEntryProcessor<K, V, R> {
+        /** {@inheritDoc} */
+        @Override public R process(MutableEntry<K, V> entry,
+            Object... arguments) throws EntryProcessorException {
+            entry.remove();
+
+            return null;
+        }
+    }
+
+    /**
+     * Applies get and put operation.
+     */
+    static class GetAndPutEntryProcessor<K, V> implements CacheEntryProcessor<K, V, V> {
+        /** {@inheritDoc} */
+        @Override public V process(MutableEntry<K, V> entry,
+            Object... args) throws EntryProcessorException {
+            V newVal = !F.isEmpty(args) ? (V)args[0] : newValForKey(entry.getKey());
+
+            V oldVal = entry.getValue();
+            entry.setValue(newVal);
+
+            return oldVal;
+        }
+
+        /**
+         * @param key Key.
+         * @return New value.
+         */
+        V newValForKey(K key){
+            throw new UnsupportedOperationException();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
index c782f98..618d910 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
@@ -22,9 +22,12 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -55,8 +58,24 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
 
                 return res;
             }
+
             case SQL:
                 return getAllSql(cache);
+
+            case INVOKE: {
+                Map<Integer, MvccTestAccount> res = new HashMap<>();
+
+                CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new GetEntryProcessor();
+
+                for (Integer key : keys) {
+                    MvccTestAccount val = cache.cache.invoke(key, ep);
+
+                    if(val != null)
+                        res.put(key, val);
+                }
+
+                return res;
+            }
             default:
                 fail();
         }
@@ -65,7 +84,7 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
     }
 
     /** {@inheritDoc} */
-    protected void updateEntries(
+    @Override protected void updateEntries(
         TestCache<Integer, MvccTestAccount> cache,
         Map<Integer, MvccTestAccount> entries,
         WriteMode writeMode) {
@@ -79,6 +98,7 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
 
                 break;
             }
+
             case DML: {
                 for (Map.Entry<Integer, MvccTestAccount> e : entries.entrySet()) {
                     if (e.getValue() == null)
@@ -88,13 +108,23 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
                 }
                 break;
             }
+
+            case INVOKE: {
+                GetAndPutEntryProcessor<Integer, MvccTestAccount> ep = new GetAndPutEntryProcessor<>();
+
+                for (final Map.Entry<Integer, MvccTestAccount> e : entries.entrySet())
+                    cache.cache.invoke(e.getKey(), ep, e.getValue());
+
+                break;
+            }
+
             default:
                 fail();
         }
     }
 
     /** {@inheritDoc} */
-    protected void removeEntries(
+    @Override protected void removeEntries(
         TestCache<Integer, MvccTestAccount> cache,
         Set<Integer> keys,
         WriteMode writeMode) {
@@ -111,6 +141,14 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
 
                 break;
             }
+            case INVOKE: {
+                CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new RemoveEntryProcessor<>();
+
+                for (Integer key : keys)
+                    cache.cache.invoke(key, ep);
+
+                break;
+            }
             default:
                 fail();
         }