You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/10/03 12:50:21 UTC
[1/2] ignite git commit: IGNITE-9540: MVCC: support
IgniteCache.invoke method family. This closes #4832. This closes #4881.
Repository: ignite
Updated Branches:
refs/heads/master 71836d95a -> dab050acc
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index af74996..4d1145c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -41,12 +41,15 @@ import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.expiry.Duration;
import javax.cache.expiry.TouchedExpiryPolicy;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ScanQuery;
@@ -283,6 +286,56 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticTx3() throws Exception {
+ checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() {
+ @Override public void apply(IgniteCache<Integer, Integer> cache) {
+ try {
+ IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ Integer res = cache.invoke(key, new CacheEntryProcessor<Integer, Integer, Integer>() {
+ @Override public Integer process(MutableEntry<Integer, Integer> entry,
+ Object... arguments) throws EntryProcessorException {
+
+ entry.setValue(key);
+
+ return -key;
+ }
+ });
+
+ assertEquals(Integer.valueOf(-key), res);
+
+ val = (Integer)checkAndGet(true, cache, key, GET, SCAN);
+
+ assertEquals(key, val);
+
+ tx.commit();
+ }
+
+ Integer val = (Integer)checkAndGet(false, cache, key, SCAN, GET);
+
+ assertEquals(key, val);
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ /**
* @param c Closure to run.
* @throws Exception If failed.
*/
@@ -3055,6 +3108,34 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
assertEquals(size, cache.size());
}
+ // Check rollback create.
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.rollback();
+ }
+
+ assertEquals(size, cache.size());
+ }
+ }
+
+ // Check rollback update.
+ for (int i = 0; i < KEYS; i++) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, -1);
+
+ tx.rollback();
+ }
+
+ assertEquals(size, cache.size());
+ }
+
// Check rollback remove.
for (int i = 0; i < KEYS; i++) {
final Integer key = i;
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
index 76f8013..dcd46ff 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
@@ -263,33 +263,6 @@ public class SqlTransactionsCommandsWithMvccEnabledSelfTest extends AbstractSche
return arg.getClass();
}
- /**
- * Test that attempting to perform a cache PUT operation from within an SQL transaction fails.
- */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public void testCacheOperationsFromSqlTransaction() {
- checkCacheOperationThrows("invoke", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invoke", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAsync", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAsync", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAll", Collections.singletonMap(1, CACHE_ENTRY_PROC), X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAll", Collections.singleton(1), CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAll", Collections.singleton(1), ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAllAsync", Collections.singletonMap(1, CACHE_ENTRY_PROC),
- X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAllAsync", Collections.singleton(1), CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
-
- checkCacheOperationThrows("invokeAllAsync", Collections.singleton(1), ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
- }
-
/** */
private final static EntryProcessor<Integer, Integer, Object> ENTRY_PROC =
new EntryProcessor<Integer, Integer, Object>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
index 4ea53e0..bcbfbc2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
@@ -151,7 +151,7 @@ public abstract class CacheMvccSqlQueriesAbstractTest extends CacheMvccAbstractT
* @throws Exception If failed.
*/
private void updateSingleValue(boolean singleNode, final boolean locQry) throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-9540");
+ fail("https://issues.apache.org/jira/browse/IGNITE-9470");
final int VALS = 100;
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
index 46aeaa1..5ec96e4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
@@ -26,12 +28,22 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.testframework.GridTestUtils;
@@ -41,6 +53,7 @@ import org.apache.ignite.transactions.TransactionIsolation;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.GET;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.INVOKE;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT;
@@ -103,6 +116,14 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testRepeatableReadIsolationInvoke() throws Exception {
+ checkOperations(GET, GET, WriteMode.INVOKE, true);
+ checkOperations(GET, GET, WriteMode.INVOKE, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testRepeatableReadIsolationSqlPut() throws Exception {
checkOperations(SQL, SQL, PUT, true);
checkOperations(SQL, SQL, PUT, false);
@@ -111,6 +132,14 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testRepeatableReadIsolationSqlInvoke() throws Exception {
+ checkOperations(SQL, SQL, WriteMode.INVOKE, true);
+ checkOperations(SQL, SQL, WriteMode.INVOKE, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testRepeatableReadIsolationSqlDml() throws Exception {
checkOperations(SQL, SQL, DML, true);
checkOperations(SQL, SQL, DML, false);
@@ -130,6 +159,8 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
public void testRepeatableReadIsolationMixedPut() throws Exception {
checkOperations(SQL, GET, PUT, false);
checkOperations(SQL, GET, PUT, true);
+ checkOperations(SQL, GET, WriteMode.INVOKE, false);
+ checkOperations(SQL, GET, WriteMode.INVOKE, true);
}
/**
@@ -138,6 +169,8 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
public void testRepeatableReadIsolationMixedPut2() throws Exception {
checkOperations(GET, SQL, PUT, false);
checkOperations(GET, SQL, PUT, true);
+ checkOperations(GET, SQL, WriteMode.INVOKE, false);
+ checkOperations(GET, SQL, WriteMode.INVOKE, true);
}
/**
@@ -162,15 +195,72 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
public void testOperationConsistency() throws Exception {
checkOperationsConsistency(PUT, false);
checkOperationsConsistency(DML, false);
+ checkOperationsConsistency(WriteMode.INVOKE, false);
checkOperationsConsistency(PUT, true);
checkOperationsConsistency(DML, true);
+ checkOperationsConsistency(WriteMode.INVOKE, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeConsistency() throws Exception {
+ Ignite node = grid(/*requestFromClient ? nodesCount() - 1 :*/ 0);
+
+ TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME));
+
+ final Set<Integer> keys1 = new HashSet<>(3);
+ final Set<Integer> keys2 = new HashSet<>(3);
+
+ Set<Integer> allKeys = generateKeySet(cache.cache, keys1, keys2);
+
+ final Map<Integer, MvccTestAccount> map1 = keys1.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ final Map<Integer, MvccTestAccount> map2 = keys2.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ assertEquals(0, cache.cache.size());
+
+ updateEntries(cache, map1, WriteMode.INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ updateEntries(cache, map1, WriteMode.INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ getEntries(cache, allKeys, INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ updateEntries(cache, map2, WriteMode.INVOKE);
+ assertEquals(6, cache.cache.size());
+
+ getEntries(cache, keys2, INVOKE);
+ assertEquals(6, cache.cache.size());
+
+ removeEntries(cache, keys1, WriteMode.INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ removeEntries(cache, keys1, WriteMode.INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ getEntries(cache, allKeys, INVOKE);
+ assertEquals(3, cache.cache.size());
+
+ updateEntries(cache, map1, WriteMode.INVOKE);
+ assertEquals(6, cache.cache.size());
+
+ removeEntries(cache, allKeys, WriteMode.INVOKE);
+ assertEquals(0, cache.cache.size());
+
+ getEntries(cache, allKeys, INVOKE);
+ assertEquals(0, cache.cache.size());
}
/**
* Checks SQL and CacheAPI operation isolation consistency.
*
* @param readModeBefore read mode used before value updated.
- * @param readModeBefore read mode used after value updated.
+ * @param readModeAfter read mode used after value updated.
* @param writeMode write mode used for update.
* @throws Exception If failed.
*/
@@ -206,20 +296,23 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
@Override public Void call() throws Exception {
updateStart.await();
+ assertEquals(initialMap.size(), cache2.cache.size());
+
try (Transaction tx = txs2.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ tx.timeout(TX_TIMEOUT);
updateEntries(cache2, updateMap, writeMode);
removeEntries(cache2, keysForRemove, writeMode);
- checkContains(cache2, true, updateMap.keySet());
- checkContains(cache2, false, keysForRemove);
-
assertEquals(updateMap, cache2.cache.getAll(allKeys));
tx.commit();
}
+ finally {
+ updateFinish.countDown();
+ }
- updateFinish.countDown();
+ assertEquals(updateMap.size(), cache2.cache.size());
return null;
}
@@ -270,7 +363,7 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
* @return All keys.
* @throws IgniteCheckedException If failed.
*/
- protected Set<Integer> generateKeySet(IgniteCache<Object, Object> cache, Set<Integer> keySet1,
+ protected Set<Integer> generateKeySet(IgniteCache<?, ?> cache, Set<Integer> keySet1,
Set<Integer> keySet2) throws IgniteCheckedException {
LinkedHashSet<Integer> allKeys = new LinkedHashSet<>();
@@ -302,50 +395,72 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME));
- final Set<Integer> keysForUpdate = new HashSet<>(3);
- final Set<Integer> keysForRemove = new HashSet<>(3);
- final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove);
+ final Set<Integer> keysForUpdate = new HashSet<>(3);
+ final Set<Integer> keysForRemove = new HashSet<>(3);
- int updCnt = 1;
+ final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove);
- final Map<Integer, MvccTestAccount> initialVals = allKeys.stream().collect(
- Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+ try {
+ int updCnt = 1;
+
+ final Map<Integer, MvccTestAccount> initialVals = allKeys.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ updateEntries(cache, initialVals, writeMode);
+
+ assertEquals(initialVals.size(), cache.cache.size());
+
+ IgniteTransactions txs = node.transactions();
+
+ Map<Integer, MvccTestAccount> updatedVals = null;
- cache.cache.putAll(initialVals);
+ try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ Map<Integer, MvccTestAccount> vals1 = getEntries(cache, allKeys, GET);
+ Map<Integer, MvccTestAccount> vals2 = getEntries(cache, allKeys, SQL);
+ Map<Integer, MvccTestAccount> vals3 = getEntries(cache, allKeys, ReadMode.INVOKE);
- IgniteTransactions txs = node.transactions();
+ assertEquals(initialVals, vals1);
+ assertEquals(initialVals, vals2);
+ assertEquals(initialVals, vals3);
- Map<Integer, MvccTestAccount> updatedVals = null;
+ assertEquals(initialVals.size(), cache.cache.size());
- try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
- Map<Integer, MvccTestAccount> vals1 = getEntries(cache, allKeys, GET);
- Map<Integer, MvccTestAccount> vals2 = getEntries(cache, allKeys, SQL);
+ for (ReadMode readMode : new ReadMode[] {GET, SQL, INVOKE}) {
+ int updCnt0 = ++updCnt;
- assertEquals(initialVals, vals1);
- assertEquals(initialVals, vals2);
+ updatedVals = allKeys.stream().collect(Collectors.toMap(Function.identity(),
+ k -> new MvccTestAccount(k, updCnt0)));
- for (ReadMode readMode : new ReadMode[] {GET, SQL}) {
- int updCnt0 = ++updCnt;
+ updateEntries(cache, updatedVals, writeMode);
+ assertEquals(allKeys.size(), cache.cache.size());
- updatedVals = keysForUpdate.stream().collect(Collectors.toMap(Function.identity(),
- k -> new MvccTestAccount(k, updCnt0)));
+ removeEntries(cache, keysForRemove, writeMode);
- updateEntries(cache, updatedVals, writeMode);
- removeEntries(cache, keysForRemove, writeMode);
+ for (Integer key : keysForRemove)
+ updatedVals.remove(key);
- assertEquals(String.valueOf(readMode), updatedVals, getEntries(cache, allKeys, readMode));
+ assertEquals(String.valueOf(readMode), updatedVals, getEntries(cache, allKeys, readMode));
+ }
+
+ tx.commit();
}
- tx.commit();
- }
+ try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ assertEquals(updatedVals, getEntries(cache, allKeys, GET));
+ assertEquals(updatedVals, getEntries(cache, allKeys, SQL));
+ assertEquals(updatedVals, getEntries(cache, allKeys, INVOKE));
- try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
- assertEquals(updatedVals, getEntries(cache, allKeys, GET));
- assertEquals(updatedVals, getEntries(cache, allKeys, SQL));
+ tx.commit();
+ }
- tx.commit();
+ assertEquals(updatedVals.size(), cache.cache.size());
+ }
+ finally {
+ cache.cache.removeAll(keysForUpdate);
}
+
+ assertEquals(0, cache.cache.size());
}
/**
@@ -365,6 +480,18 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
return cache.cache.getAll(keys);
case SQL:
return getAllSql(cache);
+ case INVOKE: {
+ Map<Integer, MvccTestAccount> res = new HashMap<>();
+
+ CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new GetEntryProcessor<>();
+
+ Map<Integer, EntryProcessorResult<MvccTestAccount>> invokeRes = cache.cache.invokeAll(keys, ep);
+
+ for (Map.Entry<Integer, EntryProcessorResult<MvccTestAccount>> e : invokeRes.entrySet())
+ res.put(e.getKey(), e.getValue().get());
+
+ return res;
+ }
default:
fail();
}
@@ -395,6 +522,19 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
break;
}
+ case INVOKE: {
+ CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep =
+ new GetAndPutEntryProcessor<Integer, MvccTestAccount>(){
+ /** {@inheritDoc} */
+ @Override MvccTestAccount newValForKey(Integer key) {
+ return entries.get(key);
+ }
+ };
+
+ cache.cache.invokeAll(entries.keySet(), ep);
+
+ break;
+ }
default:
fail();
}
@@ -423,6 +563,13 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
break;
}
+ case INVOKE: {
+ CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new RemoveEntryProcessor<>();
+
+ cache.cache.invokeAll(keys, ep);
+
+ break;
+ }
default:
fail();
}
@@ -438,4 +585,52 @@ public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
protected void checkContains(TestCache<Integer, MvccTestAccount> cache, boolean expected, Set<Integer> keys) {
assertEquals(expected, cache.cache.containsKeys(keys));
}
+
+ /**
+ * Applies get operation.
+ */
+ static class GetEntryProcessor<K, V> implements CacheEntryProcessor<K, V, V> {
+ /** {@inheritDoc} */
+ @Override public V process(MutableEntry<K, V> entry,
+ Object... arguments) throws EntryProcessorException {
+ return entry.getValue();
+ }
+ }
+
+ /**
+ * Applies remove operation.
+ */
+ static class RemoveEntryProcessor<K, V, R> implements CacheEntryProcessor<K, V, R> {
+ /** {@inheritDoc} */
+ @Override public R process(MutableEntry<K, V> entry,
+ Object... arguments) throws EntryProcessorException {
+ entry.remove();
+
+ return null;
+ }
+ }
+
+ /**
+ * Applies get and put operation.
+ */
+ static class GetAndPutEntryProcessor<K, V> implements CacheEntryProcessor<K, V, V> {
+ /** {@inheritDoc} */
+ @Override public V process(MutableEntry<K, V> entry,
+ Object... args) throws EntryProcessorException {
+ V newVal = !F.isEmpty(args) ? (V)args[0] : newValForKey(entry.getKey());
+
+ V oldVal = entry.getValue();
+ entry.setValue(newVal);
+
+ return oldVal;
+ }
+
+ /**
+ * @param key Key.
+ * @return New value.
+ */
+ V newValForKey(K key){
+ throw new UnsupportedOperationException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
index c782f98..618d910 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
@@ -22,9 +22,12 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -55,8 +58,24 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
return res;
}
+
case SQL:
return getAllSql(cache);
+
+ case INVOKE: {
+ Map<Integer, MvccTestAccount> res = new HashMap<>();
+
+ CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new GetEntryProcessor();
+
+ for (Integer key : keys) {
+ MvccTestAccount val = cache.cache.invoke(key, ep);
+
+ if(val != null)
+ res.put(key, val);
+ }
+
+ return res;
+ }
default:
fail();
}
@@ -65,7 +84,7 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
}
/** {@inheritDoc} */
- protected void updateEntries(
+ @Override protected void updateEntries(
TestCache<Integer, MvccTestAccount> cache,
Map<Integer, MvccTestAccount> entries,
WriteMode writeMode) {
@@ -79,6 +98,7 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
break;
}
+
case DML: {
for (Map.Entry<Integer, MvccTestAccount> e : entries.entrySet()) {
if (e.getValue() == null)
@@ -88,13 +108,23 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
}
break;
}
+
+ case INVOKE: {
+ GetAndPutEntryProcessor<Integer, MvccTestAccount> ep = new GetAndPutEntryProcessor<>();
+
+ for (final Map.Entry<Integer, MvccTestAccount> e : entries.entrySet())
+ cache.cache.invoke(e.getKey(), ep, e.getValue());
+
+ break;
+ }
+
default:
fail();
}
}
/** {@inheritDoc} */
- protected void removeEntries(
+ @Override protected void removeEntries(
TestCache<Integer, MvccTestAccount> cache,
Set<Integer> keys,
WriteMode writeMode) {
@@ -111,6 +141,14 @@ public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsT
break;
}
+ case INVOKE: {
+ CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new RemoveEntryProcessor<>();
+
+ for (Integer key : keys)
+ cache.cache.invoke(key, ep);
+
+ break;
+ }
default:
fail();
}
[2/2] ignite git commit: IGNITE-9540: MVCC: support
IgniteCache.invoke method family. This closes #4832. This closes #4881.
Posted by vo...@apache.org.
IGNITE-9540: MVCC: support IgniteCache.invoke method family. This closes #4832. This closes #4881.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dab050ac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dab050ac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dab050ac
Branch: refs/heads/master
Commit: dab050acc31bf74f7c159c1cb9c5a8faa966f4f7
Parents: 71836d9
Author: AMRepo <an...@gmail.com>
Authored: Wed Oct 3 15:50:07 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Oct 3 15:50:07 2018 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 1 +
.../communication/GridIoMessageFactory.java | 10 +-
.../processors/cache/CacheInvokeEntry.java | 45 +++-
.../processors/cache/GridCacheEntryEx.java | 10 +
.../processors/cache/GridCacheMapEntry.java | 108 +++++++-
.../cache/GridCacheUpdateTxResult.java | 27 +-
.../cache/IgniteCacheOffheapManager.java | 11 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 104 +++++++-
.../dht/GridDhtTxAbstractEnlistFuture.java | 35 ++-
.../distributed/dht/GridDhtTxEnlistFuture.java | 22 +-
.../dht/GridDhtTxQueryEnlistRequest.java | 8 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 18 +-
.../cache/distributed/dht/GridInvokeValue.java | 186 +++++++++++++
.../near/GridNearTxEnlistFuture.java | 19 +-
.../near/GridNearTxEnlistRequest.java | 35 ++-
.../cache/distributed/near/GridNearTxLocal.java | 55 ++--
.../persistence/GridCacheOffheapManager.java | 5 +-
.../cache/tree/mvcc/data/MvccUpdateDataRow.java | 29 ++-
.../cache/tree/mvcc/data/MvccUpdateResult.java | 7 +
.../cache/tree/mvcc/data/ResultType.java | 4 +-
.../processors/query/EnlistOperation.java | 11 +-
.../cache/GridCacheAbstractMetricsSelfTest.java | 46 ++--
.../processors/cache/GridCacheTestEntryEx.java | 8 +-
.../cache/mvcc/CacheMvccAbstractTest.java | 10 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 81 ++++++
...sactionsCommandsWithMvccEnabledSelfTest.java | 27 --
.../mvcc/CacheMvccSqlQueriesAbstractTest.java | 2 +-
.../mvcc/MvccRepeatableReadBulkOpsTest.java | 261 ++++++++++++++++---
.../mvcc/MvccRepeatableReadOperationsTest.java | 42 ++-
29 files changed, 1047 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index bcb9ef4..2f7e6c0 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
import org.apache.ignite.internal.util.IgniteUtils;
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 389d8c0..54efb47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -54,8 +54,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
@@ -79,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQuer
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryFirstEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
@@ -100,9 +99,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -1078,6 +1079,11 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 161:
+ msg = new GridInvokeValue();
+
+ break;
+
// [-3..119] [124..129] [-23..-27] [-36..-55]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
index 2526146..dddc735 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
@@ -96,13 +96,30 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta
/** {@inheritDoc} */
@Override public void remove() {
+ if (!entry.isMvcc()) {
+ if (op == Operation.CREATE)
+ op = Operation.NONE;
+ else
+ op = Operation.REMOVE;
+ }
+ else {
+ if (op == Operation.CREATE) {
+ assert !hadVal;
+
+ op = Operation.NONE;
+ }
+ else if (exists()) {
+ assert hadVal;
+
+ op = Operation.REMOVE;
+ }
+
+ if (hadVal && oldVal == null)
+ oldVal = val;
+ }
+
val = null;
valObj = null;
-
- if (op == Operation.CREATE)
- op = Operation.NONE;
- else
- op = Operation.REMOVE;
}
/** {@inheritDoc} */
@@ -110,7 +127,12 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta
if (val == null)
throw new NullPointerException();
- this.oldVal = this.val;
+ if (!entry.isMvcc())
+ this.oldVal = this.val;
+ else {
+ if (hadVal && oldVal == null)
+ this.oldVal = this.val;
+ }
this.val = val;
@@ -118,6 +140,15 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta
}
/**
+ * Entry processor operation.
+ *
+ * @return Operation.
+ */
+ public Operation op() {
+ return op;
+ }
+
+ /**
* @return Return origin value, before modification.
*/
public V oldVal() {
@@ -160,7 +191,7 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta
/**
*
*/
- private static enum Operation {
+ public static enum Operation {
/** */
NONE,
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 2e96a9c..eb49c79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.UUID;
import javax.cache.Cache;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.eviction.EvictableEntry;
@@ -80,6 +81,11 @@ public interface GridCacheEntryEx {
public boolean isLocal();
/**
+ * @return {@code True} if this is n entry from MVCC cache.
+ */
+ public boolean isMvcc();
+
+ /**
* @return {@code False} if entry belongs to cache map, {@code true} if this entry was created in colocated
* cache and node is not primary for this key.
*/
@@ -346,6 +352,8 @@ public interface GridCacheEntryEx {
* @param tx Cache transaction.
* @param affNodeId Partitioned node iD.
* @param val Value to set.
+ * @param entryProc Entry processor.
+ * @param invokeArgs Entry processor invoke arguments.
* @param ttl0 TTL.
* @param topVer Topology version.
* @param mvccVer Mvcc version.
@@ -363,6 +371,8 @@ public interface GridCacheEntryEx {
@Nullable IgniteInternalTx tx,
UUID affNodeId,
CacheObject val,
+ EntryProcessor entryProc,
+ Object[] invokeArgs,
long ttl0,
AffinityTopologyVersion topVer,
MvccSnapshot mvccVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f58a3dc..1a04bd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -280,6 +280,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
+ @Override public boolean isMvcc() {
+ return cctx.mvccEnabled();
+ }
+
+ /** {@inheritDoc} */
@Override public boolean isNear() {
return false;
}
@@ -1042,6 +1047,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
IgniteInternalTx tx,
UUID affNodeId,
CacheObject val,
+ EntryProcessor entryProc,
+ Object[] invokeArgs,
long ttl0,
AffinityTopologyVersion topVer,
MvccSnapshot mvccVer,
@@ -1054,6 +1061,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
final boolean valid = valid(tx.topologyVersion());
+ final boolean invoke = entryProc != null;
+
final GridCacheVersion newVer;
WALPointer logPtr = null;
@@ -1087,10 +1096,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Detach value before index update.
val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
- assert val != null;
+ assert val != null || invoke;
- res = cctx.offheap().mvccUpdate(
- this, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate, filter, retVal);
+ res = cctx.offheap().mvccUpdate(this, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate,
+ filter, retVal, entryProc, invokeArgs);
assert res != null;
@@ -1103,7 +1112,17 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (res.resultType() == ResultType.VERSION_MISMATCH)
throw new IgniteSQLException("Mvcc version mismatch.", CONCURRENT_UPDATE);
- else if (res.resultType() == ResultType.FILTERED || (noCreate && res.resultType() == ResultType.PREV_NULL))
+ else if (res.resultType() == ResultType.FILTERED) {
+ GridCacheUpdateTxResult updRes = new GridCacheUpdateTxResult(invoke);
+
+ assert !invoke || res.invokeResult() != null;
+
+ if(invoke) // No-op invoke happened.
+ updRes.invokeResult(res.invokeResult());
+
+ return updRes;
+ }
+ else if(noCreate && !invoke && res.resultType() == ResultType.PREV_NULL)
return new GridCacheUpdateTxResult(false);
else if (res.resultType() == ResultType.LOCKED) {
unlockEntry();
@@ -1115,7 +1134,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer);
lockFut.listen(new MvccUpdateLockListener(tx, this, affNodeId, topVer, val, ttl0, mvccVer,
- op, needHistory, noCreate, filter, retVal, resFut));
+ op, needHistory, noCreate, filter, retVal, resFut, entryProc, invokeArgs));
return new GridCacheUpdateTxResult(false, resFut);
}
@@ -1143,13 +1162,26 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
counters.incrementUpdateCounter(cctx.cacheId(), partition());
}
+ else if (res.resultType() == ResultType.REMOVED_NOT_NULL) {
+ TxCounters counters = tx.txCounters(true);
+
+ if (res.isOwnValueOverridden()) {
+ if (res.isKeyAbsentBefore()) // Do not count own update removal.
+ counters.decrementUpdateCounter(cctx.cacheId(), partition());
+ }
+ else
+ counters.incrementUpdateCounter(cctx.cacheId(), partition());
+
+ counters.accumulateSizeDelta(cctx.cacheId(), partition(), -1);
+ }
if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) {
logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
cctx.cacheId(),
key,
val,
- res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE,
+ res.resultType() == ResultType.PREV_NULL ? CREATE :
+ (res.resultType() == ResultType.REMOVED_NOT_NULL) ? DELETE : UPDATE,
tx.nearXidVersion(),
newVer,
expireTime,
@@ -1184,6 +1216,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updRes.prevValue(oldRow.value());
}
+ if(invoke) {
+ assert res.invokeResult() != null;
+
+ updRes.invokeResult(res.invokeResult());
+ }
+
updRes.mvccHistory(res.history());
return updRes;
@@ -5237,15 +5275,21 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/** */
private GridCacheOperation op;
+ /** Entry processor. */
+ private final EntryProcessor entryProc;
+
+ /** Invoke arguments. */
+ private final Object[] invokeArgs;
+
+ /** Filter. */
+ private final CacheEntryPredicate filter;
+
/** */
private final boolean needHistory;
/** */
private final boolean noCreate;
- /** Filter. */
- private final CacheEntryPredicate filter;
-
/** Need previous value flag.*/
private final boolean needVal;
@@ -5262,7 +5306,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean noCreate,
CacheEntryPredicate filter,
boolean needVal,
- GridFutureAdapter<GridCacheUpdateTxResult> resFut) {
+ GridFutureAdapter<GridCacheUpdateTxResult> resFut,
+ EntryProcessor entryProc,
+ Object[] invokeArgs) {
this.tx = tx;
this.entry = entry;
this.affNodeId = affNodeId;
@@ -5276,6 +5322,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
this.filter = filter;
this.needVal = needVal;
this.resFut = resFut;
+ this.entryProc = entryProc;
+ this.invokeArgs = invokeArgs;
}
/** {@inheritDoc} */
@@ -5286,6 +5334,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheContext cctx = entry.context();
GridCacheVersion newVer = tx.writeVersion();
+ final boolean invoke = entryProc != null;
+
MvccUpdateResult res;
try {
@@ -5322,8 +5372,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
cctx.shared().database().checkpointReadLock();
try {
- res = cctx.offheap().mvccUpdate(
- entry, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate, filter, needVal);
+ res = cctx.offheap().mvccUpdate(entry, val, newVer, expireTime, mvccVer, tx.local(), needHistory,
+ noCreate, filter, needVal, entryProc, invokeArgs);
}
finally {
cctx.shared().database().checkpointReadUnlock();
@@ -5343,6 +5393,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return;
}
+ else if (res.resultType() == ResultType.FILTERED) {
+ GridCacheUpdateTxResult updRes = new GridCacheUpdateTxResult(invoke);
+
+ if (invoke) { // No-op invoke happened.
+ assert res.invokeResult() != null;
+
+ updRes.invokeResult(res.invokeResult());
+ }
+
+ resFut.onDone(updRes);
+
+ return;
+ }
else if (op == CREATE && tx.local() && (res.resultType() == ResultType.PREV_NOT_NULL ||
res.resultType() == ResultType.VERSION_FOUND)) {
resFut.onDone(new IgniteSQLException("Duplicate key during INSERT [key=" + entry.key() + ']',
@@ -5371,13 +5434,26 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
counters.incrementUpdateCounter(cctx.cacheId(), entry.partition());
}
+ else if (res.resultType() == ResultType.REMOVED_NOT_NULL) {
+ TxCounters counters = tx.txCounters(true);
+
+ if (res.isOwnValueOverridden()) {
+ if (res.isKeyAbsentBefore()) // Do not count own update removal.
+ counters.decrementUpdateCounter(cctx.cacheId(), entry.partition());
+ }
+ else
+ counters.incrementUpdateCounter(cctx.cacheId(), entry.partition());
+
+ counters.accumulateSizeDelta(cctx.cacheId(), entry.partition(), -1);
+ }
if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
cctx.cacheId(),
entry.key(),
val,
- res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE,
+ res.resultType() == ResultType.PREV_NULL ? CREATE :
+ (res.resultType() == ResultType.REMOVED_NOT_NULL) ? DELETE : UPDATE,
tx.nearXidVersion(),
newVer,
expireTime,
@@ -5408,6 +5484,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr)
: new GridCacheUpdateTxResult(false, logPtr);
+ if(invoke) {
+ assert res.invokeResult() != null;
+
+ updRes.invokeResult(res.invokeResult());
+ }
+
updRes.mvccHistory(res.history());
resFut.onDone(updRes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
index 4543dfd..d2a2870 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
@@ -30,7 +30,7 @@ import org.jetbrains.annotations.Nullable;
* Cache entry transactional update result.
*/
public class GridCacheUpdateTxResult {
- /** Success flag.*/
+ /** Success flag. */
private final boolean success;
/** Partition update counter. */
@@ -51,6 +51,9 @@ public class GridCacheUpdateTxResult {
/** Previous value. */
private CacheObject prevVal;
+ /** Invoke result. */
+ private CacheInvokeResult invokeRes;
+
/**
* Constructor.
*
@@ -146,7 +149,6 @@ public class GridCacheUpdateTxResult {
}
/**
- *
* @return Mvcc history rows.
*/
@Nullable public List<MvccLinkAwareSearchRow> mvccHistory() {
@@ -154,7 +156,6 @@ public class GridCacheUpdateTxResult {
}
/**
- *
* @param mvccHistory Mvcc history rows.
*/
public void mvccHistory(List<MvccLinkAwareSearchRow> mvccHistory) {
@@ -162,21 +163,33 @@ public class GridCacheUpdateTxResult {
}
/**
- *
* @return Previous value.
*/
- @Nullable public CacheObject prevValue() {
+ @Nullable public CacheObject prevValue() {
return prevVal;
}
/**
- *
* @param prevVal Previous value.
*/
- public void prevValue( @Nullable CacheObject prevVal) {
+ public void prevValue(@Nullable CacheObject prevVal) {
this.prevVal = prevVal;
}
+ /**
+ * @param result Entry processor invoke result.
+ */
+ public void invokeResult(CacheInvokeResult result) {
+ invokeRes = result;
+ }
+
+ /**
+ * @return Invoke result.
+ */
+ public CacheInvokeResult invokeResult() {
+ return invokeRes;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheUpdateTxResult.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index f576cc5..c9c2430 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import java.util.List;
import java.util.Map;
import javax.cache.Cache;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
@@ -278,6 +279,8 @@ public interface IgniteCacheOffheapManager {
* @param noCreate Flag indicating that row should not be created if absent.
* @param filter Filter.
* @param retVal Flag to return previous value.
+ * @param entryProc Entry processor.
+ * @param invokeArgs Entry processor invoke arguments.
* @return Update result.
* @throws IgniteCheckedException If failed.
*/
@@ -291,7 +294,9 @@ public interface IgniteCacheOffheapManager {
boolean needHistory,
boolean noCreate,
@Nullable CacheEntryPredicate filter,
- boolean retVal) throws IgniteCheckedException;
+ boolean retVal,
+ EntryProcessor entryProc,
+ Object[] invokeArgs) throws IgniteCheckedException;
/**
* @param entry Entry.
@@ -797,6 +802,8 @@ public interface IgniteCacheOffheapManager {
* @param expireTime Expire time.
* @param mvccSnapshot MVCC snapshot.
* @param filter Filter.
+ * @param entryProc Entry processor.
+ * @param invokeArgs Entry processor invoke arguments.
* @param primary {@code True} if update is executed on primary node.
* @param needHistory Flag to collect history.
* @param noCreate Flag indicating that row should not be created if absent.
@@ -812,6 +819,8 @@ public interface IgniteCacheOffheapManager {
long expireTime,
MvccSnapshot mvccSnapshot,
@Nullable CacheEntryPredicate filter,
+ EntryProcessor entryProc,
+ Object[] invokeArgs,
boolean primary,
boolean needHistory,
boolean noCreate,
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index e0b9c06..a968737 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.Cache;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -42,12 +43,13 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccMarkUpdat
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateNewTxStateHintRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateTxStateHintRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
@@ -516,7 +518,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
boolean needHistory,
boolean noCreate,
@Nullable CacheEntryPredicate filter,
- boolean retVal) throws IgniteCheckedException {
+ boolean retVal,
+ EntryProcessor entryProc,
+ Object[] invokeArgs) throws IgniteCheckedException {
if (entry.detached() || entry.isNear())
return null;
@@ -529,6 +533,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
expireTime,
mvccSnapshot,
filter,
+ entryProc,
+ invokeArgs,
primary,
needHistory,
noCreate,
@@ -1857,6 +1863,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
long expireTime,
MvccSnapshot mvccSnapshot,
@Nullable CacheEntryPredicate filter,
+ EntryProcessor entryProc,
+ Object[] invokeArgs,
boolean primary,
boolean needHistory,
boolean noCreate,
@@ -1874,7 +1882,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
// Make sure value bytes initialized.
key.valueBytes(coCtx);
- val.valueBytes(coCtx);
+
+ if(val != null)
+ val.valueBytes(coCtx);
MvccUpdateDataRow updateRow = new MvccUpdateDataRow(
cctx,
@@ -1891,7 +1901,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
needHistory,
// we follow fast update visit flow here if row cannot be created by current operation
noCreate,
- retVal);
+ retVal || entryProc != null);
assert cctx.shared().database().checkpointLockIsHeldByThread();
@@ -1920,12 +1930,44 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
assert oldRow != null && oldRow.link() != 0 : oldRow;
oldRow.key(key);
-
- rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, mvccSnapshot);
}
else
assert res == ResultType.PREV_NULL;
+ if (entryProc != null) {
+ CacheInvokeEntry.Operation op = applyEntryProcessor(cctx, key, ver, entryProc, invokeArgs, updateRow, oldRow);
+
+ if (op == CacheInvokeEntry.Operation.NONE) {
+ if (res == ResultType.PREV_NOT_NULL)
+ updateRow.value(oldRow.value()); // Restore prev. value.
+
+ updateRow.resultType(ResultType.FILTERED);
+
+ cleanup(cctx, updateRow.cleanupRows());
+
+ return updateRow;
+ }
+
+ // Mark old version as removed.
+ if (res == ResultType.PREV_NOT_NULL) {
+ rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, mvccSnapshot);
+
+ if (op == CacheInvokeEntry.Operation.REMOVE) {
+ updateRow.resultType(ResultType.REMOVED_NOT_NULL);
+
+ cleanup(cctx, updateRow.cleanupRows());
+
+ clearPendingEntries(cctx, oldRow);
+
+ return updateRow; // Won't create new version on remove.
+ }
+ }
+ else
+ assert op != CacheInvokeEntry.Operation.REMOVE;
+ }
+ else if (oldRow != null)
+ rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, mvccSnapshot);
+
if (!grp.storeCacheIdInDataPage() && updateRow.cacheId() != CU.UNDEFINED_CACHE_ID) {
updateRow.cacheId(CU.UNDEFINED_CACHE_ID);
@@ -1967,6 +2009,54 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
}
+ /**
+ *
+ * @param cctx Cache context.
+ * @param key entry key.
+ * @param ver Entry version.
+ * @param entryProc Entry processor.
+ * @param invokeArgs Entry processor invoke arguments.
+ * @param updateRow Row for update.
+ * @param oldRow Old row.
+ * @return Entry processor operation.
+ */
+ @SuppressWarnings("unchecked")
+ private CacheInvokeEntry.Operation applyEntryProcessor(GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver,
+ EntryProcessor entryProc, Object[] invokeArgs, MvccUpdateDataRow updateRow,
+ CacheDataRow oldRow) {
+ Object procRes = null;
+ Exception err = null;
+
+ CacheObject oldVal = oldRow == null ? null : oldRow.value();
+
+ CacheInvokeEntry invokeEntry = new CacheInvokeEntry<>(key, oldVal, ver, cctx.keepBinary(),
+ new GridDhtDetachedCacheEntry(cctx, key));
+
+ try {
+ procRes = entryProc.process(invokeEntry, invokeArgs);
+
+ if(invokeEntry.modified() && invokeEntry.op() != CacheInvokeEntry.Operation.REMOVE) {
+ Object val = invokeEntry.getValue(true);
+
+ CacheObject val0 = cctx.toCacheObject(val);
+
+ val0.prepareForCache(cctx.cacheObjectContext());
+
+ updateRow.value(val0);
+ }
+ }
+ catch (Exception e) {
+ err = e;
+ }
+
+ CacheInvokeResult invokeRes = err == null ? CacheInvokeResult.fromResult(procRes) :
+ CacheInvokeResult.fromError(err);
+
+ updateRow.invokeResult(invokeRes);
+
+ return invokeEntry.op();
+ }
+
/** {@inheritDoc} */
@Override public MvccUpdateResult mvccRemove(GridCacheContext cctx,
KeyCacheObject key,
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index 64f966d..647a801 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
@@ -406,7 +407,23 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
assert !entry.detached();
- CacheObject val = op.isDeleteOrLock() ? null : cctx.toCacheObject(((IgniteBiTuple)cur).getValue());
+ CacheObject val = op.isDeleteOrLock() || op.isInvoke()
+ ? null : cctx.toCacheObject(((IgniteBiTuple)cur).getValue());
+
+ GridInvokeValue invokeVal = null;
+ EntryProcessor entryProc = null;
+ Object[] invokeArgs = null;
+
+ if(op.isInvoke()) {
+ assert needResult();
+
+ invokeVal = (GridInvokeValue)((IgniteBiTuple)cur).getValue();
+
+ entryProc = invokeVal.entryProcessor();
+ invokeArgs = invokeVal.invokeArgs();
+ }
+
+ assert entryProc != null || !op.isInvoke();
tx.markQueryEnlisted(mvccSnapshot);
@@ -430,12 +447,15 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
break;
case INSERT:
+ case TRANSFORM:
case UPSERT:
case UPDATE:
res = entry.mvccSet(
tx,
cctx.localNodeId(),
val,
+ entryProc,
+ invokeArgs,
0,
topVer,
mvccSnapshot,
@@ -471,11 +491,12 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
IgniteInternalFuture<GridCacheUpdateTxResult> updateFut = res.updateFuture();
+ final Message val0 = invokeVal != null ? invokeVal : val;
+
if (updateFut != null) {
if (updateFut.isDone())
res = updateFut.get();
else {
- CacheObject val0 = val;
GridDhtCacheEntry entry0 = entry;
it.beforeDetach();
@@ -498,7 +519,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
}
}
- processEntry(entry, op, res, val);
+ processEntry(entry, op, res, val0);
}
if (!hasNext0()) {
@@ -595,7 +616,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
* @throws IgniteCheckedException If failed.
*/
private void processEntry(GridDhtCacheEntry entry, EnlistOperation op,
- GridCacheUpdateTxResult updRes, CacheObject val) throws IgniteCheckedException {
+ GridCacheUpdateTxResult updRes, Message val) throws IgniteCheckedException {
checkCompleted();
assert updRes != null && updRes.updateFuture() == null;
@@ -621,8 +642,9 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
* @param key Key.
* @param val Value.
* @param hist History rows.
+ * @param cacheId Cache Id.
*/
- private void addToBatch(KeyCacheObject key, CacheObject val, List<MvccLinkAwareSearchRow> hist,
+ private void addToBatch(KeyCacheObject key, Message val, List<MvccLinkAwareSearchRow> hist,
int cacheId) throws IgniteCheckedException {
List<ClusterNode> backups = backupNodes(key);
@@ -1098,7 +1120,8 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
* @param val Value or preload entries collection.
*/
public void add(KeyCacheObject key, Message val) {
- assert val == null || val instanceof CacheObject || val instanceof CacheEntryInfoCollection;
+ assert val == null || val instanceof GridInvokeValue || val instanceof CacheObject
+ || val instanceof CacheEntryInfoCollection;
if (keys == null)
keys = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
index 58d6b15..7719638 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheInvokeResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
@@ -114,10 +115,23 @@ public final class GridDhtTxEnlistFuture extends GridDhtTxAbstractEnlistFuture<G
/** {@inheritDoc} */
@Override protected void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult txRes) {
- if (needRes && txRes.success())
- res.set(cctx, txRes.prevValue(), txRes.success(), true);
- else
- res.success(txRes.success());
+ assert txRes.invokeResult() == null || needRes;
+
+ res.success(txRes.success());
+
+ if(txRes.invokeResult() != null)
+ res.invokeResult(true);
+
+ if (needRes && txRes.success()) {
+ CacheInvokeResult invokeRes = txRes.invokeResult();
+
+ if (invokeRes != null) {
+ if(invokeRes.result() != null || invokeRes.error() != null)
+ res.addEntryProcessResult(cctx, key, null, invokeRes.result(), invokeRes.error(), cctx.keepBinary());
+ }
+ else
+ res.set(cctx, txRes.prevValue(), txRes.success(), true);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java
index a1bc26b..b3aa56d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -173,7 +174,8 @@ public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- CacheObjectContext objCtx = ctx.cacheContext(cacheId).cacheObjectContext();
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+ CacheObjectContext objCtx = cctx.cacheObjectContext();
if (keys != null) {
for (int i = 0; i < keys.size(); i++) {
@@ -193,6 +195,8 @@ public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage {
entryVal.prepareMarshal(objCtx);
}
}
+ else if (val instanceof GridInvokeValue)
+ ((GridInvokeValue)val).prepareMarshal(cctx);
}
}
}
@@ -221,6 +225,8 @@ public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage {
entryVal.finishUnmarshal(objCtx, ldr);
}
}
+ else if (val instanceof GridInvokeValue)
+ ((GridInvokeValue)val).finishUnmarshal(ctx, ldr);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 9883f6d..1f5f5a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -415,15 +415,28 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
try {
CacheObject val = null;
+ EntryProcessor entryProc = null;
+ Object[] invokeArgs = null;
Message val0 = vals != null ? vals.get(i) : null;
CacheEntryInfoCollection entries =
val0 instanceof CacheEntryInfoCollection ? (CacheEntryInfoCollection)val0 : null;
- if (entries == null && !op.isDeleteOrLock())
+ if (entries == null && !op.isDeleteOrLock() && !op.isInvoke())
val = (val0 instanceof CacheObject) ? (CacheObject)val0 : null;
+ if(entries == null && op.isInvoke()) {
+ assert val0 instanceof GridInvokeValue;
+
+ GridInvokeValue invokeVal = (GridInvokeValue)val0;
+
+ entryProc = invokeVal.entryProcessor();
+ invokeArgs = invokeVal.invokeArgs();
+ }
+
+ assert entryProc != null || !op.isInvoke();
+
GridDhtCacheEntry entry = dht.entryExx(key, topologyVersion());
GridCacheUpdateTxResult updRes;
@@ -447,12 +460,15 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
break;
case INSERT:
+ case TRANSFORM:
case UPSERT:
case UPDATE:
updRes = entry.mvccSet(
this,
ctx.localNodeId(),
val,
+ entryProc,
+ invokeArgs,
0,
topologyVersion(),
snapshot,
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridInvokeValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridInvokeValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridInvokeValue.java
new file mode 100644
index 0000000..b88df4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridInvokeValue.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.nio.ByteBuffer;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class GridInvokeValue implements Message {
+ /** */
+ private static final long serialVersionUID = 1L;
+
+ /** Optional arguments for entry processor. */
+ @GridDirectTransient
+ private Object[] invokeArgs;
+
+ /** Entry processor arguments bytes. */
+ private byte[] invokeArgsBytes;
+
+ /** Entry processors. */
+ @GridDirectTransient
+ private EntryProcessor<Object, Object, Object> entryProcessor;
+
+ /** Entry processors bytes. */
+ private byte[] entryProcessorBytes;
+
+ /**
+ * Constructor.
+ */
+ public GridInvokeValue() {
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Entry processor invoke arguments.
+ */
+ public GridInvokeValue(EntryProcessor<Object, Object, Object> entryProcessor, Object[] invokeArgs) {
+ this.invokeArgs = invokeArgs;
+ this.entryProcessor = entryProcessor;
+ }
+
+ /**
+ * @return Invoke arguments.
+ */
+ public Object[] invokeArgs() {
+ return invokeArgs;
+ }
+
+ /**
+ * @return Entry processor.
+ */
+ public EntryProcessor<Object, Object, Object> entryProcessor() {
+ return entryProcessor;
+ }
+
+ /**
+ * Marshalls invoke value.
+ *
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException {
+ if (entryProcessor != null && entryProcessorBytes == null) {
+ entryProcessorBytes = CU.marshal(ctx, entryProcessor);
+ }
+
+ if (invokeArgsBytes == null)
+ invokeArgsBytes = CU.marshal(ctx, invokeArgs);
+ }
+
+ /**
+ * Unmarshalls invoke value.
+ *
+ * @param ctx Cache context.
+ * @param ldr Class loader.
+ * @throws IgniteCheckedException If un-marshalling failed.
+ */
+ public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ if (entryProcessorBytes != null && entryProcessor == null)
+ entryProcessor = U.unmarshal(ctx, entryProcessorBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+ if (invokeArgs == null)
+ invokeArgs = U.unmarshal(ctx, invokeArgsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByteArray("entryProcessorBytes", entryProcessorBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeByteArray("invokeArgsBytes", invokeArgsBytes))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ entryProcessorBytes = reader.readByteArray("entryProcessorBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ invokeArgsBytes = reader.readByteArray("invokeArgsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridInvokeValue.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 161;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
index 8d85bd9..208d4bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
@@ -338,7 +338,11 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
keys.add(cctx.toCacheKeyObject(row));
else {
keys.add(cctx.toCacheKeyObject(((IgniteBiTuple)row).getKey()));
- vals.add(cctx.toCacheObject(((IgniteBiTuple)row).getValue()));
+
+ if (op.isInvoke())
+ vals.add((Message)((IgniteBiTuple)row).getValue());
+ else
+ vals.add(cctx.toCacheObject(((IgniteBiTuple)row).getValue()));
}
}
@@ -583,9 +587,18 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
assert res != null;
- this.res = res.result();
+ if (res.result().invokeResult()) {
+ if(this.res == null)
+ this.res = new GridCacheReturn(true, true);
+
+ this.res.success(this.res.success() && err == null && res.result().success());
+
+ this.res.mergeEntryProcessResults(res.result());
+ }
+ else
+ this.res = res.result();
- assert this.res != null && (this.res.emptyResult() || needRes || !this.res.success());
+ assert this.res != null && (this.res.emptyResult() || needRes || this.res.invokeResult() || !this.res.success());
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
index 1d87023..e71de89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.EnlistOperation;
@@ -38,6 +39,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -95,7 +97,7 @@ public class GridNearTxEnlistRequest extends GridCacheIdMessage {
/** Serialized rows values. */
@GridToStringExclude
- private CacheObject[] values;
+ private Message[] values;
/** Enlist operation. */
private EnlistOperation op;
@@ -286,7 +288,7 @@ public class GridNearTxEnlistRequest extends GridCacheIdMessage {
boolean keysOnly = op.isDeleteOrLock();
- values = keysOnly ? null : new CacheObject[keys.length];
+ values = keysOnly ? null : new Message[keys.length];
for (Object row : rows) {
Object key, val = null;
@@ -309,13 +311,24 @@ public class GridNearTxEnlistRequest extends GridCacheIdMessage {
keys[i] = key0;
if (!keysOnly) {
- CacheObject val0 = cctx.toCacheObject(val);
+ if (op.isInvoke()) {
+ GridInvokeValue val0 = (GridInvokeValue)val;
- assert val0 != null;
+ assert val0 != null;
- val0.prepareMarshal(objCtx);
+ val0.prepareMarshal(cctx);
- values[i] = val0;
+ values[i] = val0;
+ }
+ else {
+ CacheObject val0 = cctx.toCacheObject(val);
+
+ assert val0 != null;
+
+ val0.prepareMarshal(objCtx);
+
+ values[i] = val0;
+ }
}
i++;
@@ -341,8 +354,12 @@ public class GridNearTxEnlistRequest extends GridCacheIdMessage {
if (op.isDeleteOrLock())
rows.add(keys[i]);
else {
- if (values[i] != null)
- values[i].finishUnmarshal(objCtx, ldr);
+ if (values[i] != null) {
+ if(op.isInvoke())
+ ((GridInvokeValue)values[i]).finishUnmarshal(ctx, ldr);
+ else
+ ((CacheObject)values[i]).finishUnmarshal(objCtx, ldr);
+ }
rows.add(new IgniteBiTuple<>(keys[i], values[i]));
}
@@ -608,7 +625,7 @@ public class GridNearTxEnlistRequest extends GridCacheIdMessage {
reader.incrementState();
case 18:
- values = reader.readObjectArray("values", MessageCollectionItemType.MSG, CacheObject.class);
+ values = reader.readObjectArray("values", MessageCollectionItemType.MSG, Message.class);
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 9493510..111f5d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
@@ -99,6 +100,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -736,10 +738,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
try {
validateTxMode(cacheCtx);
- // TODO: IGNITE-9540: Fix invoke/invokeAll.
- if(invokeMap != null)
- MvccUtils.verifyMvccOperationSupport(cacheCtx, "invoke/invokeAll");
-
if (mvccSnapshot == null) {
MvccUtils.mvccTracker(cacheCtx, this);
@@ -752,16 +750,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
return new GridFinishedFuture(e);
}
- // Cached entry may be passed only from entry wrapper.
- final Map<?, ?> map0 = map;
- final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
-
if (log.isDebugEnabled())
- log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
+ log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map + ", retval=" + retval + "]");
- assert map0 != null || invokeMap0 != null;
+ assert map != null || invokeMap != null;
- if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
+ if (F.isEmpty(map) && F.isEmpty(invokeMap)) {
if (implicit())
try {
commit();
@@ -773,14 +767,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
return new GridFinishedFuture<>(new GridCacheReturn(true, false));
}
- try {
- // Set transform flag for transaction.
- if (invokeMap != null)
- transform = true;
+ // Set transform flag for operation.
+ boolean transform = invokeMap != null;
- Set<?> keys = map0 != null ? map0.keySet() : invokeMap0.keySet();
+ try {
+ Set<?> keys = map != null ? map.keySet() : invokeMap.keySet();
- final Map<KeyCacheObject, CacheObject> enlisted = new HashMap<>(keys.size());
+ final Map<KeyCacheObject, Message> enlisted = new HashMap<>(keys.size());
for (Object key : keys) {
if (isRollbackOnly())
@@ -792,7 +785,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
throw new NullPointerException("Null key.");
}
- Object val = map0 == null ? null : map0.get(key);
+ Object val = map == null ? null : map.get(key);
EntryProcessor entryProcessor = transform ? invokeMap.get(key) : null;
if (val == null && entryProcessor == null) {
@@ -802,25 +795,27 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
- CacheObject cacheVal = cacheCtx.toCacheObject(val);
- enlisted.put(cacheKey, cacheVal);
+ if (transform)
+ enlisted.put(cacheKey, new GridInvokeValue(entryProcessor, invokeArgs));
+ else
+ enlisted.put(cacheKey, cacheCtx.toCacheObject(val));
}
- return updateAsync(cacheCtx, new UpdateSourceIterator<IgniteBiTuple<KeyCacheObject, CacheObject>>() {
+ return updateAsync(cacheCtx, new UpdateSourceIterator<IgniteBiTuple<KeyCacheObject, Message>>() {
- private Iterator<Map.Entry<KeyCacheObject, CacheObject>> it = enlisted.entrySet().iterator();
+ private Iterator<Map.Entry<KeyCacheObject, Message>> it = enlisted.entrySet().iterator();
@Override public EnlistOperation operation() {
- return EnlistOperation.UPSERT;
+ return transform ? EnlistOperation.TRANSFORM : EnlistOperation.UPSERT;
}
@Override public boolean hasNextX() throws IgniteCheckedException {
return it.hasNext();
}
- @Override public IgniteBiTuple<KeyCacheObject, CacheObject> nextX() throws IgniteCheckedException {
- Map.Entry<KeyCacheObject, CacheObject> next = it.next();
+ @Override public IgniteBiTuple<KeyCacheObject, Message> nextX() throws IgniteCheckedException {
+ Map.Entry<KeyCacheObject, Message> next = it.next();
return new IgniteBiTuple<>(next.getKey(), next.getValue());
}
@@ -2120,7 +2115,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
mvccSnapshot.incrementOperationCounter();
- return new GridCacheReturn(cacheCtx, true, keepBinary, futRes.value(), futRes.success());
+ Object val = futRes.value();
+
+ if (futRes.invokeResult()) {
+ assert val instanceof Map;
+
+ val = cacheCtx.unwrapInvokeResult((Map)val, keepBinary);
+ }
+
+ return new GridCacheReturn(cacheCtx, true, keepBinary, val, futRes.success());
}
}));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index d704abd..801703b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.failure.FailureContext;
@@ -1798,13 +1799,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
long expireTime,
MvccSnapshot mvccVer,
CacheEntryPredicate filter,
+ EntryProcessor entryProc,
+ Object[] invokeArgs,
boolean primary,
boolean needHistory,
boolean noCreate,
boolean retVal) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
- return delegate.mvccUpdate(cctx, key, val, ver, expireTime, mvccVer, filter, primary,
+ return delegate.mvccUpdate(cctx, key, val, ver, expireTime, mvccVer, filter, entryProc, invokeArgs, primary,
needHistory, noCreate, retVal);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
index 2a0b582..23711a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheInvokeResult;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -138,6 +139,9 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
@GridToStringExclude
private CacheEntryPredicate filter;
+ /** */
+ private CacheInvokeResult invokeRes;
+
/**
* @param cctx Cache context.
* @param key Key.
@@ -207,7 +211,8 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
@Override public int visit(BPlusTree<CacheSearchRow, CacheDataRow> tree,
BPlusIO<CacheSearchRow> io,
long pageAddr,
- int idx, IgniteWriteAheadLogManager wal)
+ int idx,
+ IgniteWriteAheadLogManager wal)
throws IgniteCheckedException {
unsetFlags(DIRTY);
@@ -557,6 +562,23 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
}
/** */
+ public void value(CacheObject val0) {
+ val = val0;
+ }
+
+ /** */
+ public void invokeResult(CacheInvokeResult invokeRes) {
+ this.invokeRes = invokeRes;
+ }
+
+ /**
+ * @return Invoke result.
+ */
+ @Override public CacheInvokeResult invokeResult(){
+ return invokeRes;
+ }
+
+ /** */
private boolean isFlagsSet(int flags) {
return (state & flags) == flags;
}
@@ -571,6 +593,11 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
return state &= (~flags);
}
+ /** */
+ public void resultType(ResultType type) {
+ res = type;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MvccUpdateDataRow.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateResult.java
index d76a6e8..a8f5bb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateResult.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.tree.mvcc.data;
import java.util.List;
+import org.apache.ignite.internal.processors.cache.CacheInvokeResult;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
@@ -50,4 +51,10 @@ public interface MvccUpdateResult {
* @return Flag whether tx has overridden it's own update.
*/
public boolean isOwnValueOverridden();
+
+ /**
+ *
+ * @return Entry processor invoke result.
+ */
+ CacheInvokeResult invokeResult();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
index 16e7e1e..d863684 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
@@ -32,5 +32,7 @@ public enum ResultType {
/** */
VERSION_MISMATCH,
/** */
- FILTERED
+ FILTERED,
+ /** */
+ REMOVED_NOT_NULL,
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/main/java/org/apache/ignite/internal/processors/query/EnlistOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/EnlistOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/EnlistOperation.java
index fdb6f1e..631bf18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/EnlistOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/EnlistOperation.java
@@ -46,7 +46,11 @@ public enum EnlistOperation {
* This operation locks existing entry protecting it from updates by other transactions
* or does notrhing if entry does not exist.
*/
- LOCK(null);
+ LOCK(null),
+ /**
+ * This operation applies entry transformer.
+ */
+ TRANSFORM(GridCacheOperation.UPDATE);
/** */
private final GridCacheOperation cacheOp;
@@ -68,6 +72,11 @@ public enum EnlistOperation {
return this == DELETE || this == LOCK;
}
+ /** */
+ public boolean isInvoke() {
+ return this == TRANSFORM;
+ }
+
/**
* Indicates that an operation cannot create new row.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
index eb4d2d5..3c0f001 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
@@ -1066,29 +1066,29 @@ public abstract class GridCacheAbstractMetricsSelfTest extends GridCacheAbstract
assertEquals(1, cache0.localMetrics().getEntryProcessorRemovals());
- if (emptyCache) {
- assertEquals(1, cache0.localMetrics().getEntryProcessorMisses());
-
- assertEquals(100f, cache0.localMetrics().getEntryProcessorMissPercentage());
- assertEquals(0f, cache0.localMetrics().getEntryProcessorHitPercentage());
- }
- else {
- assertEquals(1, cache0.localMetrics().getEntryProcessorHits());
-
- assertEquals(0f, cache0.localMetrics().getEntryProcessorMissPercentage());
- assertEquals(100f, cache0.localMetrics().getEntryProcessorHitPercentage());
- }
-
- for (int i = 1; i < gridCount(); i++) {
- Ignite ignite = ignite(i);
-
- IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
-
- if (affinity(cache).isPrimaryOrBackup(ignite.cluster().localNode(), key))
- assertEquals(1, cache.localMetrics().getEntryProcessorRemovals());
- }
-
- assertEquals(1, cache0.localMetrics().getEntryProcessorInvocations());
+// if (emptyCache) {
+// assertEquals(1, cache0.localMetrics().getEntryProcessorMisses());
+//
+// assertEquals(100f, cache0.localMetrics().getEntryProcessorMissPercentage());
+// assertEquals(0f, cache0.localMetrics().getEntryProcessorHitPercentage());
+// }
+// else {
+// assertEquals(1, cache0.localMetrics().getEntryProcessorHits());
+//
+// assertEquals(0f, cache0.localMetrics().getEntryProcessorMissPercentage());
+// assertEquals(100f, cache0.localMetrics().getEntryProcessorHitPercentage());
+// }
+//
+// for (int i = 1; i < gridCount(); i++) {
+// Ignite ignite = ignite(i);
+//
+// IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+//
+// if (affinity(cache).isPrimaryOrBackup(ignite.cluster().localNode(), key))
+// assertEquals(1, cache.localMetrics().getEntryProcessorRemovals());
+// }
+//
+// assertEquals(1, cache0.localMetrics().getEntryProcessorInvocations());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 1a3c8d7..26d1b94 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.UUID;
import javax.cache.Cache;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.eviction.EvictableEntry;
@@ -107,6 +108,11 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** {@inheritDoc} */
+ @Override public boolean isMvcc() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean detached() {
return false;
}
@@ -482,7 +488,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
/** {@inheritDoc} */
@Override public GridCacheUpdateTxResult mvccSet(@Nullable IgniteInternalTx tx, UUID affNodeId, CacheObject val,
- long ttl0, AffinityTopologyVersion topVer, MvccSnapshot mvccVer,
+ EntryProcessor entryProc, Object[] invokeArgs, long ttl0, AffinityTopologyVersion topVer, MvccSnapshot mvccVer,
GridCacheOperation op, boolean needHistory, boolean noCreate, CacheEntryPredicate filter, boolean retVal)
throws IgniteCheckedException, GridCacheEntryRemovedException {
rawPut(val, ttl);
http://git-wip-us.apache.org/repos/asf/ignite/blob/dab050ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index c191849..d75b8e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -2172,7 +2172,10 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
SQL,
/** */
- SQL_SUM
+ SQL_SUM,
+
+ /** */
+ INVOKE
}
/**
@@ -2183,7 +2186,10 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
DML,
/** */
- PUT
+ PUT,
+
+ /** */
+ INVOKE
}
/**