You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/23 16:39:50 UTC
ignite git commit: IGNITE-6083 Null value may be passed to the entry
processor on existing key
Repository: ignite
Updated Branches:
refs/heads/master 2b5d2fc5f -> c1453dda3
IGNITE-6083 Null value may be passed to the entry processor on existing key
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1453dda
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1453dda
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1453dda
Branch: refs/heads/master
Commit: c1453dda3f8343921b3d4b201e1434d7dc79cd0b
Parents: 2b5d2fc
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Apr 23 19:39:01 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Apr 23 19:39:01 2018 +0300
----------------------------------------------------------------------
.../cache/distributed/near/GridNearTxLocal.java | 3 +
.../transactions/IgniteTxLocalAdapter.java | 2 +
...teCacheEntryProcessorSequentialCallTest.java | 311 +++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
4 files changed, 318 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1453dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 9c0b220..16653e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1350,6 +1350,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
keepBinary,
CU.isNearEnabled(cacheCtx));
+ if (op == TRANSFORM && txEntry.value() == null && old != null)
+ txEntry.value(cacheCtx.toCacheObject(old), false, false);
+
if (enlisted != null)
enlisted.add(cacheKey);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1453dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 54b7ae2..61650cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1152,6 +1152,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (txEntry.op() == TRANSFORM) {
if (computeInvoke) {
+ txEntry.readValue(v);
+
GridCacheVersion ver;
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1453dda/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java b/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java
new file mode 100644
index 0000000..592449d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.transactions.TransactionOptimisticException;
+
+/**
+ */
+public class IgniteCacheEntryProcessorSequentialCallTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ CacheConfiguration cacheCfg = new CacheConfiguration("cache");
+
+ cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+ cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cacheCfg.setMaxConcurrentAsyncOperations(0);
+ cacheCfg.setBackups(0);
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ public void testOptimisticSerializableTxInvokeSequentialCall() throws Exception {
+ transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+ }
+
+ /**
+ *
+ */
+ public void testOptimisticRepeatableReadTxInvokeSequentialCall() throws Exception {
+ transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+
+ transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ }
+
+ /**
+ *
+ */
+ public void testOptimisticReadCommittedTxInvokeSequentialCall() throws Exception {
+ transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+
+ transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+ }
+
+ /**
+ *
+ */
+ public void testPessimisticSerializableTxInvokeSequentialCall() throws Exception {
+ transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+ }
+
+ /**
+ *
+ */
+ public void testPessimisticRepeatableReadTxInvokeSequentialCall() throws Exception {
+ transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+
+ transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ }
+
+ /**
+ *
+ */
+ public void testPessimisticReadCommittedTxInvokeSequentialCall() throws Exception {
+ transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+
+ transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+ }
+
+ /**
+ * Test for sequential entry processor invoking not null value on primary cache.
+ * In this test entry processor gets value from local node.
+ *
+ * @param transactionConcurrency Transaction concurrency.
+ * @param transactionIsolation Transaction isolation.
+ */
+ public void transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency transactionConcurrency,
+ TransactionIsolation transactionIsolation) throws Exception {
+ TestKey key = new TestKey(1L);
+ TestValue val = new TestValue();
+ val.value("1");
+
+ Ignite primaryIgnite;
+
+ if (ignite(0).affinity("cache").isPrimary(ignite(0).cluster().localNode(), key))
+ primaryIgnite = ignite(0);
+ else
+ primaryIgnite = ignite(1);
+
+ IgniteCache<TestKey, TestValue> cache = primaryIgnite.cache("cache");
+
+ cache.put(key, val);
+
+ NotNullCacheEntryProcessor cacheEntryProcessor = new NotNullCacheEntryProcessor();
+
+ try (Transaction transaction = primaryIgnite.transactions().txStart(transactionConcurrency,
+ transactionIsolation)) {
+
+ cache.invoke(key, cacheEntryProcessor);
+ cache.invoke(key, cacheEntryProcessor);
+
+ transaction.commit();
+ }
+
+ cache.remove(key);
+ }
+
+ /**
+ * Test for sequential entry processor invoking not null value on near cache.
+ * In this test entry processor fetches value from remote node.
+ *
+ * @param transactionConcurrency Transaction concurrency.
+ * @param transactionIsolation Transaction isolation.
+ */
+ public void transactionInvokeSequentialCallOnNearNode(TransactionConcurrency transactionConcurrency,
+ TransactionIsolation transactionIsolation) throws Exception {
+ TestKey key = new TestKey(1L);
+ TestValue val = new TestValue();
+ val.value("1");
+
+ Ignite nearIgnite;
+ Ignite primaryIgnite;
+
+ if (ignite(0).affinity("cache").isPrimary(ignite(0).cluster().localNode(), key)) {
+ primaryIgnite = ignite(0);
+
+ nearIgnite = ignite(1);
+ }
+ else {
+ primaryIgnite = ignite(1);
+
+ nearIgnite = ignite(0);
+ }
+
+ primaryIgnite.cache("cache").put(key, val);
+
+ IgniteCache<TestKey, TestValue> nearCache = nearIgnite.cache("cache");
+
+ NotNullCacheEntryProcessor cacheEntryProcessor = new NotNullCacheEntryProcessor();
+
+ try (Transaction transaction = nearIgnite.transactions().txStart(transactionConcurrency,
+ transactionIsolation)) {
+
+ nearCache.invoke(key, cacheEntryProcessor);
+ nearCache.invoke(key, cacheEntryProcessor);
+
+ transaction.commit();
+ }
+
+ primaryIgnite.cache("cache").remove(key);
+ }
+
+ /**
+ * Test for sequential entry processor invocation. During transaction value is changed externally, which leads to
+ * optimistic conflict exception.
+ */
+ public void testTxInvokeSequentialOptimisticConflict() throws Exception {
+ TestKey key = new TestKey(1L);
+
+ IgniteCache<TestKey, TestValue> cache = ignite(0).cache("cache");
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ cache.put(key, new TestValue("1"));
+
+ multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ fail();
+ }
+
+ cache.put(key, new TestValue("2"));
+ }
+ }, 1);
+
+ Transaction tx = ignite(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ cache.invoke(key, new NotNullCacheEntryProcessor());
+
+ latch.countDown();
+
+ Thread.sleep(1_000);
+
+ cache.invoke(key, new NotNullCacheEntryProcessor());
+
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ tx.commit();
+
+ return null;
+ }
+ }, TransactionOptimisticException.class);
+
+ cache.remove(key);
+ }
+
+ /**
+ * Cache entry processor checking whether entry has got non-null value.
+ */
+ public static class NotNullCacheEntryProcessor implements CacheEntryProcessor<TestKey, TestValue, Object> {
+ /** {@inheritDoc} */
+ public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException {
+ assertNotNull(entry.getValue());
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class TestKey {
+ /** Value. */
+ private final Long val;
+
+ /**
+ * @param val Value.
+ */
+ public TestKey(Long val) {
+ this.val = val;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class TestValue {
+ /** Value. */
+ private String val;
+
+ /**
+ * Default constructor.
+ */
+ public TestValue() {
+ }
+
+ /**
+ * @param val Value.
+ */
+ public TestValue(String val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Value.
+ */
+ public String value() {
+ return val;
+ }
+
+ /**
+ * @param val New value.
+ */
+ public void value(String val) {
+ this.val = val;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1453dda/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 0612615..1bf65e0 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import java.util.Set;
import junit.framework.TestSuite;
+import org.apache.ignite.cache.IgniteCacheEntryProcessorSequentialCallTest;
import org.apache.ignite.cache.IgniteWarmupClosureSelfTest;
import org.apache.ignite.cache.store.CacheStoreReadFromBackupTest;
import org.apache.ignite.cache.store.CacheTransactionalStoreReadFromBackupTest;
@@ -191,6 +192,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class);
suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class);
suite.addTestSuite(IgniteCrossCacheTxStoreSelfTest.class);
+ suite.addTestSuite(IgniteCacheEntryProcessorSequentialCallTest.class);
// TODO GG-11148: include test when implemented.
// Test fails due to incorrect handling of CacheConfiguration#getCopyOnRead() and