You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/12/09 12:24:06 UTC
[24/51] [abbrv] ignite git commit: IGNITE-2083 EntryProcessor is
called twice on primary node in transactional cache
IGNITE-2083 EntryProcessor is called twice on primary node in transactional cache
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9a14d643
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9a14d643
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9a14d643
Branch: refs/heads/ignite-843-rc2
Commit: 9a14d6432932fc1a1fdf2ddd77dea920382efe8c
Parents: c10b112
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 7 15:05:09 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 7 15:05:09 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtTxPrepareFuture.java | 51 +-
.../cache/transactions/IgniteTxAdapter.java | 5 +
.../cache/transactions/IgniteTxEntry.java | 8 +-
.../IgniteCacheEntryProcessorCallTest.java | 497 +++++++++++++++++++
...idCachePartitionedHitsAndMissesSelfTest.java | 4 +-
.../testframework/junits/GridAbstractTest.java | 7 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
7 files changed, 558 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 9f1f8a1..3829e28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -337,6 +337,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
cacheCtx.config().isLoadPreviousValue() &&
!txEntry.skipStore();
+ boolean evt = retVal || txEntry.op() == TRANSFORM;
+
+ EntryProcessor entryProc = null;
+
+ if (evt && txEntry.op() == TRANSFORM)
+ entryProc = F.first(txEntry.entryProcessors()).get1();
+
CacheObject val = cached.innerGet(
tx,
/*swap*/true,
@@ -344,11 +351,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/*fail fast*/false,
/*unmarshal*/true,
/*metrics*/retVal,
- /*event*/retVal,
+ /*event*/evt,
/*tmp*/false,
- null,
- null,
- null,
+ tx.subjectId(),
+ entryProc,
+ tx.resolveTaskName(),
null,
txEntry.keepBinary());
@@ -364,11 +371,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
Object procRes = null;
Exception err = null;
+ boolean modified = false;
+
for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
- try {
- CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(
- txEntry.context(), key, val, txEntry.cached().version(), txEntry.keepBinary());
+ CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(
+ txEntry.context(), key, val, txEntry.cached().version(), txEntry.keepBinary());
+ try {
EntryProcessor<Object, Object, Object> processor = t.get1();
procRes = processor.process(invokeEntry, t.get2());
@@ -380,9 +389,27 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
break;
}
+
+ modified |= invokeEntry.modified();
}
- txEntry.entryProcessorCalculatedValue(val);
+ if (modified)
+ val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val));
+
+ GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP;
+
+ if (op == NOOP) {
+ if (expiry != null) {
+ long ttl = CU.toTtl(expiry.getExpiryForAccess());
+
+ txEntry.ttl(ttl);
+
+ if (ttl == CU.TTL_ZERO)
+ op = DELETE;
+ }
+ }
+
+ txEntry.entryProcessorCalculatedValue(new T2<>(op, op == NOOP ? null : val));
if (retVal) {
if (err != null || procRes != null)
@@ -1301,10 +1328,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
entry.cached().partition());
if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
- CacheObject procVal = entry.entryProcessorCalculatedValue();
+ T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
+
+ assert procVal != null : entry;
- entry.op(procVal == null ? DELETE : UPDATE);
- entry.value(procVal, true, false);
+ entry.op(procVal.get1());
+ entry.value(procVal.get2(), true, false);
entry.entryProcessors(null);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 3065ac2..53f4f56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1233,6 +1233,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
if (F.isEmpty(txEntry.entryProcessors()))
return F.t(txEntry.op(), txEntry.value());
else {
+ T2<GridCacheOperation, CacheObject> calcVal = txEntry.entryProcessorCalculatedValue();
+
+ if (calcVal != null)
+ return calcVal;
+
boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() :
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index fba1513..2c6c3df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -105,7 +105,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** Transient field for calculated entry processor value. */
@GridDirectTransient
- private CacheObject entryProcessorCalcVal;
+ private T2<GridCacheOperation, CacheObject> entryProcessorCalcVal;
/** Transform closure bytes. */
@GridToStringExclude
@@ -888,14 +888,16 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/**
* @return Entry processor calculated value.
*/
- public CacheObject entryProcessorCalculatedValue() {
+ public T2<GridCacheOperation, CacheObject> entryProcessorCalculatedValue() {
return entryProcessorCalcVal;
}
/**
* @param entryProcessorCalcVal Entry processor calculated value.
*/
- public void entryProcessorCalculatedValue(CacheObject entryProcessorCalcVal) {
+ public void entryProcessorCalculatedValue(T2<GridCacheOperation, CacheObject> entryProcessorCalcVal) {
+ assert entryProcessorCalcVal != null;
+
this.entryProcessorCalcVal = entryProcessorCalcVal;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorCallTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorCallTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorCallTest.java
new file mode 100644
index 0000000..5163d96
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorCallTest.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class IgniteCacheEntryProcessorCallTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ static final AtomicInteger callCnt = new AtomicInteger();
+
+ /** */
+ private static final int SRV_CNT = 4;
+
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private boolean client;
+
+ /** */
+ private static final int OP_UPDATE = 1;
+
+ /** */
+ private static final int OP_REMOVE = 2;
+
+ /** */
+ private static final int OP_GET = 3;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(SRV_CNT);
+
+ client = true;
+
+ Ignite client = startGrid(SRV_CNT);
+
+ assertTrue(client.configuration().isClientMode());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEntryProcessorCall() throws Exception {
+ {
+ CacheConfiguration<Integer, TestValue> ccfg = new CacheConfiguration<>();
+ ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+ checkEntryProcessorCallCount(ccfg, 1);
+ }
+
+ {
+ CacheConfiguration<Integer, TestValue> ccfg = new CacheConfiguration<>();
+ ccfg.setBackups(0);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+ checkEntryProcessorCallCount(ccfg, 1);
+ }
+
+ {
+ CacheConfiguration<Integer, TestValue> ccfg = new CacheConfiguration<>();
+ ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+
+ checkEntryProcessorCallCount(ccfg, 2);
+ }
+
+ {
+ CacheConfiguration<Integer, TestValue> ccfg = new CacheConfiguration<>();
+ ccfg.setBackups(0);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+
+ checkEntryProcessorCallCount(ccfg, 1);
+ }
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param expCallCnt Expected entry processor calls count.
+ * @throws Exception If failed.
+ */
+ private void checkEntryProcessorCallCount(CacheConfiguration<Integer, TestValue> ccfg,
+ int expCallCnt) throws Exception {
+ Ignite client1 = ignite(SRV_CNT);
+
+ IgniteCache<Integer, TestValue> clientCache1 = client1.createCache(ccfg);
+
+ IgniteCache<Integer, TestValue> srvCache = ignite(0).cache(ccfg.getName());
+
+ awaitPartitionMapExchange();
+
+ int key = 0;
+
+ checkEntryProcessCall(key++, clientCache1, null, null, expCallCnt);
+
+ if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+ checkEntryProcessCall(key++, clientCache1, OPTIMISTIC, REPEATABLE_READ, expCallCnt + 1);
+ checkEntryProcessCall(key++, clientCache1, PESSIMISTIC, REPEATABLE_READ, expCallCnt + 1);
+ checkEntryProcessCall(key++, clientCache1, OPTIMISTIC, SERIALIZABLE, expCallCnt + 1);
+ }
+
+ for (int i = 100; i < 110; i++) {
+ checkEntryProcessCall(key++, srvCache, null, null, expCallCnt);
+
+ if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+ checkEntryProcessCall(key++, srvCache, OPTIMISTIC, REPEATABLE_READ, expCallCnt + 1);
+ checkEntryProcessCall(key++, srvCache, PESSIMISTIC, REPEATABLE_READ, expCallCnt + 1);
+ checkEntryProcessCall(key++, srvCache, OPTIMISTIC, SERIALIZABLE, expCallCnt + 1);
+ }
+ }
+
+ for (int i = 0; i < NODES; i++)
+ ignite(i).destroyCache(ccfg.getName());
+ }
+
+ /**
+ *
+ * @param key Key.
+ * @param cache Cache.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @param expCallCnt Expected entry processor calls count.
+ */
+ private void checkEntryProcessCall(Integer key,
+ IgniteCache<Integer, TestValue> cache,
+ @Nullable TransactionConcurrency concurrency,
+ @Nullable TransactionIsolation isolation,
+ int expCallCnt) {
+ Ignite ignite = cache.unwrap(Ignite.class);
+
+ ClusterNode primary = ignite.affinity(cache.getName()).mapKeyToNode(key);
+
+ assertNotNull(primary);
+
+ log.info("Check call [key=" + key +
+ ", primary=" + primary.attribute(ATTR_GRID_NAME) +
+ ", concurrency=" + concurrency +
+ ", isolation=" + isolation + "]");
+
+ Transaction tx;
+ TestReturnValue retVal;
+
+ log.info("Invoke: " + key);
+
+ // Update.
+ callCnt.set(0);
+
+ tx = startTx(cache, concurrency, isolation);
+
+ retVal = cache.invoke(key, new TestEntryProcessor(OP_UPDATE), new TestValue(Integer.MIN_VALUE));
+
+ if (tx != null)
+ tx.commit();
+
+ assertEquals(expCallCnt, callCnt.get());
+
+ checkReturnValue(retVal, "null");
+ checkCacheValue(cache.getName(), key, new TestValue(0));
+
+ log.info("Invoke: " + key);
+
+ // Get.
+ callCnt.set(0);
+
+ tx = startTx(cache, concurrency, isolation);
+
+ retVal = cache.invoke(key, new TestEntryProcessor(OP_GET), new TestValue(Integer.MIN_VALUE));
+
+ if (tx != null)
+ tx.commit();
+
+ assertEquals(expCallCnt, callCnt.get());
+
+ checkReturnValue(retVal, "0");
+ checkCacheValue(cache.getName(), key, new TestValue(0));
+
+ log.info("Invoke: " + key);
+
+ // Update.
+ callCnt.set(0);
+
+ tx = startTx(cache, concurrency, isolation);
+
+ retVal = cache.invoke(key, new TestEntryProcessor(OP_UPDATE), new TestValue(Integer.MIN_VALUE));
+
+ if (tx != null)
+ tx.commit();
+
+ assertEquals(expCallCnt, callCnt.get());
+
+ checkReturnValue(retVal, "0");
+ checkCacheValue(cache.getName(), key, new TestValue(1));
+
+ log.info("Invoke: " + key);
+
+ // Remove.
+ callCnt.set(0);
+
+ tx = startTx(cache, concurrency, isolation);
+
+ retVal = cache.invoke(key, new TestEntryProcessor(OP_REMOVE), new TestValue(Integer.MIN_VALUE));
+
+ if (tx != null)
+ tx.commit();
+
+ assertEquals(expCallCnt, callCnt.get());
+
+ checkReturnValue(retVal, "1");
+ checkCacheValue(cache.getName(), key, null);
+ }
+
+ /**
+ * @param retVal Return value.
+ * @param expVal Expected value.
+ */
+ private void checkReturnValue(TestReturnValue retVal, String expVal) {
+ assertNotNull(retVal);
+
+ TestValue arg = (TestValue)retVal.argument();
+ assertNotNull(arg);
+ assertEquals(Integer.MIN_VALUE, (Object)arg.value());
+
+ assertEquals(expVal, retVal.value());
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param key Key.
+ * @param expVal Expected value.
+ */
+ private void checkCacheValue(String cacheName, Integer key, TestValue expVal) {
+ for (int i = 0; i < NODES; i++) {
+ Ignite ignite = ignite(i);
+
+ IgniteCache<Integer, TestValue> cache = ignite.cache(cacheName);
+
+ assertEquals(expVal, cache.get(key));
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @return Started transaction.
+ */
+ @Nullable private Transaction startTx(IgniteCache<Integer, TestValue> cache,
+ @Nullable TransactionConcurrency concurrency,
+ @Nullable TransactionIsolation isolation) {
+ if (concurrency != null) {
+ assert isolation != null;
+
+ return cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation);
+ }
+
+ return null;
+ }
+
+ /**
+ *
+ */
+ static class TestEntryProcessor implements EntryProcessor<Integer, TestValue, TestReturnValue> {
+ /** */
+ private int op;
+
+ /**
+ * @param op Operation.
+ */
+ public TestEntryProcessor(int op) {
+ this.op = op;
+ }
+
+ /** {@inheritDoc} */
+ @Override public TestReturnValue process(MutableEntry<Integer, TestValue> entry,
+ Object... args) {
+ Ignite ignite = entry.unwrap(Ignite.class);
+
+ ignite.log().info("TestEntryProcessor called [op=" + op + ", entry=" + entry + ']');
+
+ callCnt.incrementAndGet();
+
+ assertEquals(1, args.length);
+
+ TestReturnValue retVal;
+
+ TestValue val = entry.getValue();
+
+ if (val == null)
+ retVal = new TestReturnValue("null", args[0]);
+ else
+ retVal = new TestReturnValue(String.valueOf(val.value()), args[0]);
+
+ switch (op) {
+ case OP_GET:
+ return retVal;
+
+ case OP_UPDATE: {
+ if (val == null)
+ val = new TestValue(0);
+ else
+ val = new TestValue(val.val + 1);
+
+ entry.setValue(val);
+
+ break;
+ }
+
+ case OP_REMOVE:
+ entry.remove();
+
+ break;
+
+ default:
+ assert false;
+ }
+
+ return retVal;
+ }
+ }
+
+ /**
+ *
+ */
+ static class TestValue {
+ /** */
+ private Integer val;
+
+ /**
+ * @param val Value.
+ */
+ public TestValue(Integer val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Value.
+ */
+ public Integer value() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestValue testVal = (TestValue) o;
+
+ return val.equals(testVal.val);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestValue.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class TestReturnValue {
+ /** */
+ private String val;
+
+ /** */
+ private Object arg;
+
+ /**
+ * @param val Value.
+ * @param arg Entry processor argument.
+ */
+ public TestReturnValue(String val, Object arg) {
+ this.val = val;
+ this.arg = arg;
+ }
+
+ /**
+ * @return Value.
+ */
+ public String value() {
+ return val;
+ }
+
+ /**
+ * @return Entry processor argument.
+ */
+ public Object argument() {
+ return arg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestReturnValue testVal = (TestReturnValue) o;
+
+ return val.equals(testVal.val);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestReturnValue.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
index a2ae2e1..02eb9d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
@@ -121,8 +121,8 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac
}
// Check that invoke and loader updated metrics
- assertEquals(CNT, hits);
- assertEquals(CNT, misses);
+ assertEquals(CNT / 2, hits);
+ assertEquals(CNT / 2, misses);
}
finally {
stopAllGrids();
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 95661cb..eaf63d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.portable.BinaryMarshaller;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.GridClassLoaderCache;
import org.apache.ignite.internal.portable.BinaryEnumCache;
@@ -119,6 +120,9 @@ public abstract class GridAbstractTest extends TestCase {
/** Null name for execution map. */
private static final String NULL_NAME = UUID.randomUUID().toString();
+ /** */
+ private static final boolean BINARY_MARSHALLER = false;
+
/** Ip finder for TCP discovery. */
public static final TcpDiscoveryIpFinder LOCAL_IP_FINDER = new TcpDiscoveryVmIpFinder(false) {{
setAddresses(Collections.singleton("127.0.0.1:47500..47509"));
@@ -155,6 +159,9 @@ public abstract class GridAbstractTest extends TestCase {
System.setProperty(IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "10000");
System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "false");
+ if (BINARY_MARSHALLER)
+ GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
+
Thread timer = new Thread(new GridTestClockTimer(), "ignite-clock-for-tests");
timer.setDaemon(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a14d643/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index ca31c28..7e45470 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerEager
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxLocalTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxReplicatedTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerTxTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorCallTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheManyAsyncOperationsTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheNearLockValueSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheTransactionalStopBusySelfTest;
@@ -167,6 +168,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheAtomicLocalInvokeTest.class);
suite.addTestSuite(IgniteCacheAtomicLocalWithStoreInvokeTest.class);
suite.addTestSuite(IgniteCacheTxInvokeTest.class);
+ suite.addTestSuite(IgniteCacheEntryProcessorCallTest.class);
suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class);
suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class);
suite.addTestSuite(IgniteCrossCacheTxStoreSelfTest.class);