You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/01/25 17:41:31 UTC
[1/2] ignite git commit: 2224
Repository: ignite
Updated Branches:
refs/heads/ignite-2224 1f5b2021c -> ad54ec254
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntrySeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntrySeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntrySeltTest.java
new file mode 100644
index 0000000..76d7940
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntrySeltTest.java
@@ -0,0 +1,189 @@
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Test getEntry and getEntries methods.
+ */
+public class CacheGetEntrySeltTest extends GridCacheAbstractSelfTest {
+
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration cacheCfg1 = new CacheConfiguration();
+ cacheCfg1.setCacheMode(CacheMode.PARTITIONED);
+ cacheCfg1.setName("near");
+ cacheCfg1.setNearConfiguration(new NearCacheConfiguration());
+
+ CacheConfiguration cacheCfg1t = new CacheConfiguration();
+ cacheCfg1t.setCacheMode(CacheMode.PARTITIONED);
+ cacheCfg1t.setAtomicityMode(TRANSACTIONAL);
+ cacheCfg1t.setName("nearT");
+ cacheCfg1t.setNearConfiguration(new NearCacheConfiguration());
+
+ CacheConfiguration cacheCfg2 = new CacheConfiguration();
+ cacheCfg2.setCacheMode(CacheMode.PARTITIONED);
+ cacheCfg2.setName("partitioned");
+
+ CacheConfiguration cacheCfg2t = new CacheConfiguration();
+ cacheCfg2t.setCacheMode(CacheMode.PARTITIONED);
+ cacheCfg2t.setAtomicityMode(TRANSACTIONAL);
+ cacheCfg2t.setName("partitionedT");
+
+ CacheConfiguration cacheCfg3 = new CacheConfiguration();
+ cacheCfg3.setCacheMode(CacheMode.LOCAL);
+ cacheCfg3.setName("local");
+
+ CacheConfiguration cacheCfg3t = new CacheConfiguration();
+ cacheCfg3t.setCacheMode(CacheMode.LOCAL);
+ cacheCfg3t.setAtomicityMode(TRANSACTIONAL);
+ cacheCfg3t.setName("localT");
+
+ CacheConfiguration cacheCfg4 = new CacheConfiguration();
+ cacheCfg4.setCacheMode(CacheMode.REPLICATED);
+ cacheCfg4.setName("replicated");
+
+ CacheConfiguration cacheCfg4t = new CacheConfiguration();
+ cacheCfg4t.setCacheMode(CacheMode.REPLICATED);
+ cacheCfg4t.setAtomicityMode(TRANSACTIONAL);
+ cacheCfg4t.setName("replicatedT");
+
+ cfg.setMarshaller(null);
+
+ cfg.setCacheConfiguration(cfg.getCacheConfiguration()[0], cacheCfg1, cacheCfg1t, cacheCfg2, cacheCfg2t,
+ cacheCfg3, cacheCfg3t, cacheCfg4, cacheCfg4t);
+
+ return cfg;
+ }
+
+ @Override protected long getTestTimeout() {
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetEntry() throws Exception {
+ test0("near");
+ test0("nearT");
+ test0("partitioned");
+ test0("partitionedT");
+ test0("local");
+ test0("localT");
+ test0("replicated");
+ test0("replicatedT");
+ }
+
+ private void test0(String name) {
+ IgniteCache<Integer, TestValue> cache = grid(0).cache(name);
+
+ // Put.
+ for (int i = 0; i < 10_000; ++i)
+ cache.put(i, new TestValue(i));
+
+ // getEntry regular.
+ for (int i = 0; i < 10_000; ++i) {
+ CacheEntry<Integer, TestValue> e = cache.getEntry(i);
+
+ assertEquals(e.getValue().val, i);
+
+ assertNotNull(e.version());
+ }
+
+ // getEntries regular.
+ for (int i = 0; i < 10_000; ++i) {
+ Set<Integer> set = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ set.add(++i);
+
+ Collection<CacheEntry<Integer, TestValue>> es = cache.getEntries(set);
+
+ for (CacheEntry<Integer, TestValue> e : es) {
+ assertEquals((Integer)e.getValue().val, e.getKey());
+
+ assertTrue(e.getValue().val <= i);
+ assertTrue(e.getValue().val > i - 10);
+
+ assertNotNull(e.version());
+ }
+ }
+
+ IgniteCache<Integer, BinaryObject> cacheB = grid(0).cache(name).withKeepBinary();
+
+ // getEntry withKeepBinary.
+ for (int i = 0; i < 10_000; ++i) {
+ CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i);
+
+ assertEquals(((TestValue)e.getValue().deserialize()).val, i);
+
+ assertNotNull(e.version());
+ }
+
+ // getEntries withKeepBinary.
+ for (int i = 0; i < 10_000; ++i) {
+ Set<Integer> set = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ set.add(++i);
+
+ Collection<CacheEntry<Integer, BinaryObject>> es = cacheB.getEntries(set);
+
+ for (CacheEntry<Integer, BinaryObject> e : es) {
+ TestValue tv = e.getValue().deserialize();
+
+ assertEquals((Integer)tv.val, e.getKey());
+
+ assertTrue((tv).val <= i);
+ assertTrue((tv).val > i - 10);
+
+ assertNotNull(e.version());
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestValue implements Serializable {
+ /** */
+ private int val;
+
+ /**
+ * @param val Value.
+ */
+ public TestValue(int val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Value.
+ */
+ public int value() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestValue.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 1e0071e..3f4f06c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -53,6 +53,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CachePeekMode;
@@ -556,6 +557,30 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @throws Exception In case of error.
*/
+ public void testGetEntry() throws Exception {
+ IgniteCache<String, Integer> cache = jcache();
+
+ cache.put("key1", 1);
+ cache.put("key2", 2);
+
+ CacheEntry<String, Integer> key1e = cache.getEntry("key1");
+ CacheEntry<String, Integer> key2e = cache.getEntry("key2");
+ CacheEntry<String, Integer> wrongKeye = cache.getEntry("wrongKey");
+
+ assert key1e.getValue() == 1;
+ assert key1e.getKey().equals("key1");
+ assert key1e.version() != null;
+
+ assert key2e.getValue() == 2;
+ assert key2e.getKey().equals("key2");
+ assert key2e.version() != null;
+
+ assert wrongKeye == null;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
public void testGetAsync() throws Exception {
IgniteCache<String, Integer> cache = jcache();
@@ -664,6 +689,122 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @throws Exception In case of error.
*/
+ public void testGetEntries() throws Exception {
+ Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;
+
+ final IgniteCache<String, Integer> cache = jcache();
+
+ try {
+ cache.put("key1", 1);
+ cache.put("key2", 2);
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.getAll(null).isEmpty();
+
+ return null;
+ }
+ }, NullPointerException.class, null);
+
+ assert cache.getEntries(Collections.<String>emptySet()).isEmpty();
+
+ Collection<CacheEntry<String, Integer>> c1 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));
+
+ info("Retrieved c1: " + c1);
+
+ assert 2 == c1.size() : "Invalid collection: " + c1;
+
+ boolean b1 = false;
+ boolean b2 = false;
+
+ for (CacheEntry<String, Integer> e: c1){
+ if (e.getKey().equals("key1") && e.getValue().equals(1))
+ b1 = true;
+
+ if (e.getKey().equals("key2") && e.getValue().equals(2))
+ b2 = true;
+ }
+
+ assertTrue(b1 && b2);
+
+ Collection<CacheEntry<String, Integer>> c2 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));
+
+ info("Retrieved c2: " + c2);
+
+ assert 2 == c2.size() : "Invalid collection: " + c2;
+
+ b1 = false;
+ b2 = false;
+
+ for (CacheEntry<String, Integer> e: c2){
+ if (e.getKey().equals("key1") && e.getValue().equals(1))
+ b1 = true;
+
+ if (e.getKey().equals("key2") && e.getValue().equals(2))
+ b2 = true;
+ }
+
+ assertTrue(b1 && b2);
+
+ // Now do the same checks but within transaction.
+ if (txShouldBeUsed()) {
+ try (Transaction tx0 = transactions().txStart()) {
+ assert cache.getEntries(Collections.<String>emptySet()).isEmpty();
+
+ c1 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));
+
+ info("Retrieved c1: " + c1);
+
+ assert 2 == c1.size() : "Invalid collection: " + c1;
+
+ b1 = false;
+ b2 = false;
+
+ for (CacheEntry<String, Integer> e: c1){
+ if (e.getKey().equals("key1") && e.getValue().equals(1))
+ b1 = true;
+
+ if (e.getKey().equals("key2") && e.getValue().equals(2))
+ b2 = true;
+ }
+
+ assertTrue(b1 && b2);
+
+ c2 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));
+
+ info("Retrieved c2: " + c2);
+
+ assert 2 == c2.size() : "Invalid collection: " + c2;
+
+ b1 = false;
+ b2 = false;
+
+ for (CacheEntry<String, Integer> e: c2){
+ if (e.getKey().equals("key1") && e.getValue().equals(1))
+ b1 = true;
+
+ if (e.getKey().equals("key2") && e.getValue().equals(2))
+ b2 = true;
+ }
+
+ assertTrue(b1 && b2);
+
+ tx0.commit();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
public void testGetAllWithNulls() throws Exception {
final IgniteCache<String, Integer> cache = jcache();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
index c57869f..99203b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -29,6 +30,7 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
@@ -257,6 +259,112 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
/**
* @throws Exception If failed.
*/
+ public void testGetEntry() throws Exception {
+ testGetEntry(primaryKey(0));
+
+ afterTest();
+
+ if (cacheMode() != LOCAL)
+ testGetEntry(backupKey(0));
+ }
+
+ /**
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void testGetEntry(String key) throws Exception {
+ // Try when value is not in cache.
+
+ interceptor.retInterceptor = new NullGetInterceptor();
+
+ log.info("Get 1.");
+
+ IgniteCache<String, Integer> cache = jcache(0);
+
+ assertEquals(null, cache.getEntry(key).getValue());
+
+ assertEquals(1, interceptor.invokeCnt.get());
+
+ assertEquals(0, interceptor.getMap.size());
+
+ interceptor.reset();
+
+ interceptor.retInterceptor = new OneGetInterceptor();
+
+ log.info("Get 2.");
+
+ assertEquals((Integer)1, cache.getEntry(key).getValue());
+
+ assertEquals(1, interceptor.invokeCnt.get());
+
+ assertEquals(0, interceptor.getMap.size());
+
+ interceptor.reset();
+
+ // Disable interceptor and update cache.
+
+ interceptor.disabled = true;
+
+ cache.put(key, 100);
+
+ interceptor.disabled = false;
+
+ // Try when value is in cache.
+
+ interceptor.retInterceptor = new NullGetInterceptor();
+
+ log.info("Get 3.");
+
+ assertEquals(null, cache.getEntry(key).getValue());
+
+ assertEquals(1, interceptor.invokeCnt.get());
+
+ assertEquals(1, interceptor.getMap.size());
+
+ assertEquals(100, interceptor.getMap.get(key));
+
+ checkCacheValue(key, 100);
+
+ interceptor.reset();
+
+ interceptor.retInterceptor = new GetIncrementInterceptor();
+
+ log.info("Get 4.");
+
+ assertEquals((Integer)101, cache.getEntry(key).getValue());
+
+ assertEquals(1, interceptor.invokeCnt.get());
+
+ assertEquals(1, interceptor.getMap.size());
+
+ assertEquals(100, interceptor.getMap.get(key));
+
+ checkCacheValue(key, 100);
+
+ interceptor.reset();
+
+ interceptor.retInterceptor = new GetIncrementInterceptor();
+
+ log.info("GetAsync 1.");
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ cacheAsync.getEntry(key);
+
+ assertEquals((Integer)101, cacheAsync.<CacheEntry<String, Integer>>future().get().getValue());
+
+ assertEquals(1, interceptor.invokeCnt.get());
+
+ assertEquals(1, interceptor.getMap.size());
+
+ assertEquals(100, interceptor.getMap.get(key));
+
+ checkCacheValue(key, 100);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testGetAll() throws Exception {
Set<String> keys = new LinkedHashSet<>();
@@ -350,6 +458,90 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
/**
* @throws Exception If failed.
*/
+ public void testGetEntries() throws Exception {
+ Set<String> keys = new LinkedHashSet<>();
+
+ for (int i = 0; i < 1000; i++)
+ keys.add(String.valueOf(i));
+
+ interceptor.retInterceptor = new NullGetInterceptor();
+
+ IgniteCache<String, Integer> cache = jcache(0);
+
+ IgniteCache<String, Integer> cacheAsync = cache.withAsync();
+
+ Collection<CacheEntry<String, Integer>> c = cache.getEntries(keys);
+
+ assertEquals(0, c.size());
+
+ assertEquals(1000, interceptor.invokeCnt.get());
+
+ interceptor.reset();
+
+ interceptor.retInterceptor = new GetAllInterceptor1();
+
+ c = cache.getEntries(keys);
+
+ assertEquals(500, c.size());
+
+ for (CacheEntry<String, Integer> e : c) {
+ int k = Integer.valueOf(e.getKey());
+
+ assertEquals((Integer)(k * 2), e.getValue());
+ }
+
+ assertEquals(1000, interceptor.invokeCnt.get());
+
+ // Put some values in cache.
+
+ interceptor.disabled = true;
+
+ for (int i = 0; i < 500; i++)
+ cache.put(String.valueOf(i), i);
+
+ interceptor.disabled = false;
+
+ for (int j = 0; j < 2; j++) {
+ interceptor.reset();
+
+ interceptor.retInterceptor = new GetAllInterceptor2();
+
+ if (j == 0)
+ c = cache.getEntries(keys);
+ else {
+ cacheAsync.getEntries(keys);
+
+ c = cacheAsync.<Collection<CacheEntry<String, Integer>>>future().get();
+ }
+
+ for (CacheEntry<String, Integer> e : c) {
+ int k = Integer.valueOf(e.getKey());
+
+ switch (k % 3) {
+ case 1:
+ Integer exp = k < 500 ? k : null;
+
+ assertEquals(exp, e.getValue());
+
+ break;
+
+ case 2:
+ assertEquals((Integer)(k * 3), e.getValue());
+
+ break;
+
+ default:
+ fail();
+ }
+ }
+
+ assertEquals(1000, interceptor.invokeCnt.get());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCancelUpdate() throws Exception {
for (Operation op : Operation.values()) {
testCancelUpdate(primaryKey(0), op);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java
index 6a7416e..293ba1e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEvictionNearReadersSelfTest.java
@@ -263,7 +263,7 @@ public class GridCacheDhtEvictionNearReadersSelfTest extends GridCommonAbstractT
waitForLocalEvent(grid(primary).events(), nodeEvent(primary.id()), EVT_CACHE_ENTRY_EVICTED);
// Get value on other node, it should be loaded to near cache.
- assertEquals(val, nearOther.get(key, true));
+ assertEquals(val, nearOther.get(key, true, false));
entryPrimary = (GridDhtCacheEntry)dhtPrimary.peekEx(key);
entryBackup = (GridDhtCacheEntry)dhtBackup.peekEx(key);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index 035f1b0..4282917 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -34,6 +34,7 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CachePeekMode;
@@ -129,7 +130,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
+ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
throws CacheException {
throw new UnsupportedOperationException("Method should be supported.");
}
@@ -223,11 +224,19 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
return compute.call(new GetTask<K, V>(cacheName, isAsync, key));
}
+ @Override public CacheEntry<K, V> getEntry(K key) {
+ return compute.call(new GetEntryTask<K, V>(cacheName, isAsync, key));
+ }
+
/** {@inheritDoc} */
@Override public Map<K, V> getAll(Set<? extends K> keys) {
return compute.call(new GetAllTask<K, V>(cacheName, isAsync, keys));
}
+ @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) {
+ return compute.call(new GetEntriesTask<K, V>(cacheName, isAsync, keys));
+ }
+
/** {@inheritDoc} */
@Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
return compute.call(new GetAllOutTxTask<K, V>(cacheName, isAsync, keys));
@@ -710,6 +719,30 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
/**
*
*/
+ private static class GetEntryTask<K, V> extends CacheTaskAdapter<K, V, CacheEntry<K, V>> {
+ /** Key. */
+ private final K key;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param key Key.
+ */
+ public GetEntryTask(String cacheName, boolean async, K key) {
+ super(cacheName, async);
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheEntry<K, V> call() throws Exception {
+ return cache().getEntry(key);
+ }
+ }
+
+
+ /**
+ *
+ */
private static class RemoveAllTask<K, V> extends CacheTaskAdapter<K, V, Void> {
/**
* @param cacheName Cache name.
@@ -973,6 +1006,29 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
/**
*
*/
+ private static class GetEntriesTask<K, V> extends CacheTaskAdapter<K, V, Collection<CacheEntry<K, V>> > {
+ /** Keys. */
+ private final Set<? extends K> keys;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param keys Keys.
+ */
+ public GetEntriesTask(String cacheName, boolean async, Set<? extends K> keys) {
+ super(cacheName, async);
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<CacheEntry<K, V>> call() throws Exception {
+ return cache().getEntries(keys);
+ }
+ }
+
+ /**
+ *
+ */
private static class GetAllOutTxTask<K, V> extends CacheTaskAdapter<K, V, Map<K, V>> {
/** Keys. */
private final Set<? extends K> keys;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 68e52df..8fb5e14 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListenerSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheGetEntrySeltTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectionsSelfTest;
import org.apache.ignite.internal.processors.GridCacheTxLoadFromStoreOnLockSelfTest;
import org.apache.ignite.internal.processors.cache.CacheClientStoreSelfTest;
@@ -241,6 +242,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteSystemCacheOnClientTest.class);
suite.addTestSuite(CacheRemoveAllSelfTest.class);
+ suite.addTestSuite(CacheGetEntrySeltTest.class);
suite.addTestSuite(CacheStopAndDestroySelfTest.class);
[2/2] ignite git commit: 2224
Posted by av...@apache.org.
2224
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad54ec25
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad54ec25
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad54ec25
Branch: refs/heads/ignite-2224
Commit: ad54ec254af3452fdf63884884cf43d04700b565
Parents: 1f5b202
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Jan 25 19:41:27 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Jan 25 19:41:27 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 9 +
.../processors/cache/GridCacheAdapter.java | 265 ++++++++++++++++---
.../processors/cache/GridCacheContext.java | 28 +-
.../processors/cache/GridCacheMapEntry.java | 2 +-
.../processors/cache/GridCacheProxyImpl.java | 47 ++++
.../processors/cache/IgniteCacheProxy.java | 50 ++++
.../processors/cache/IgniteInternalCache.java | 85 ++++++
.../dht/CacheDistributedGetFutureAdapter.java | 15 --
.../distributed/dht/GridDhtCacheAdapter.java | 6 +-
.../dht/GridPartitionedGetFuture.java | 21 +-
.../dht/GridPartitionedSingleGetFuture.java | 8 +-
.../dht/atomic/GridDhtAtomicCache.java | 80 ++++--
.../dht/colocated/GridDhtColocatedCache.java | 25 +-
.../distributed/near/GridNearAtomicCache.java | 6 +-
.../distributed/near/GridNearCacheAdapter.java | 5 +-
.../distributed/near/GridNearCacheEntry.java | 8 +-
.../distributed/near/GridNearGetFuture.java | 23 +-
.../near/GridNearTransactionalCache.java | 6 +-
.../processors/cache/local/GridLocalCache.java | 1 +
.../local/atomic/GridLocalAtomicCache.java | 107 ++++++--
.../transactions/IgniteTxLocalAdapter.java | 24 +-
.../processors/cache/CacheGetEntrySeltTest.java | 189 +++++++++++++
.../cache/GridCacheAbstractFullApiSelfTest.java | 141 ++++++++++
.../GridCacheInterceptorAbstractSelfTest.java | 192 ++++++++++++++
...GridCacheDhtEvictionNearReadersSelfTest.java | 2 +-
.../multijvm/IgniteCacheProcessProxy.java | 58 +++-
.../testsuites/IgniteCacheTestSuite4.java | 2 +
27 files changed, 1272 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 886dca6..c1cad53 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -35,6 +35,7 @@ import javax.cache.integration.CacheWriter;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CacheMode;
@@ -397,10 +398,18 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
@IgniteAsyncSupported
@Override public V get(K key);
+ /** TODO */
+ @IgniteAsyncSupported
+ public CacheEntry<K, V> getEntry(K key);
+
/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public Map<K, V> getAll(Set<? extends K> keys);
+ /** TODO */
+ @IgniteAsyncSupported
+ public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys);
+
/**
* Gets values from cache. Will bypass started transaction, if any, i.e. will not enlist entries
* and will not lock any keys if pessimistic transaction is started by thread.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 2582e6c..9061c7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -52,6 +52,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CachePeekMode;
@@ -610,7 +611,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*task name*/null,
/*deserialize binary*/false,
/*skip values*/true,
- /*can remap*/true
+ /*can remap*/true,
+ false
);
}
@@ -636,7 +638,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*task name*/null,
/*deserialize binary*/false,
/*skip values*/true,
- /*can remap*/true
+ /*can remap*/true,
+ false
).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
@Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
Map<K, V> kvMap = fut.get();
@@ -1299,7 +1302,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
/*deserialize cache objects*/true,
/*skip values*/false,
- /*can remap*/true
+ /*can remap*/true,
+ false
).get().get(key);
}
@@ -1315,7 +1319,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
true,
false,
- /*can remap*/true
+ /*can remap*/true,
+ false
).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
@Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
return e.get().get(key);
@@ -1335,7 +1340,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
/*deserialize cache objects*/true,
/*skip values*/false,
- /*can remap*/false
+ /*can remap*/false,
+ false
).get().get(key);
}
@@ -1355,7 +1361,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
!ctx.keepBinary(),
/*skip values*/false,
- /*can remap*/true);
+ /*can remap*/true,
+ false);
}
/**
@@ -1375,7 +1382,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
long start = statsEnabled ? System.nanoTime() : 0L;
- V val = get(key, !ctx.keepBinary());
+ V val = get(key, !ctx.keepBinary(), false);
if (ctx.config().getInterceptor() != null)
val = (V)ctx.config().getInterceptor().onGet(key, val);
@@ -1387,6 +1394,30 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @Nullable @Override public CacheEntry<K, V> getEntry(K key) throws IgniteCheckedException {
+ A.notNull(key, "key");
+
+ boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
+ T2<V, GridCacheVersion> t = (T2<V, GridCacheVersion>)get(key, !ctx.keepBinary(), true);
+
+ CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()): null;
+
+ if (ctx.config().getInterceptor() != null) {
+ V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);
+
+ val = new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null);
+ }
+
+ if (statsEnabled)
+ metrics0().addGetTimeNanos(System.nanoTime() - start);
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<V> getAsync(final K key) {
A.notNull(key, "key");
@@ -1394,7 +1425,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final long start = statsEnabled ? System.nanoTime() : 0L;
- IgniteInternalFuture<V> fut = getAsync(key, !ctx.keepBinary());
+ IgniteInternalFuture<V> fut = getAsync(key, !ctx.keepBinary(), false);
if (ctx.config().getInterceptor() != null)
fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() {
@@ -1410,6 +1441,41 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<CacheEntry<K, V>> getEntryAsync(final K key) {
+ A.notNull(key, "key");
+
+ final boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+ final long start = statsEnabled ? System.nanoTime() : 0L;
+
+ IgniteInternalFuture<T2<V, GridCacheVersion>> fut =
+ (IgniteInternalFuture<T2<V, GridCacheVersion>>)getAsync(key, !ctx.keepBinary(), true);
+
+ final boolean intercept = ctx.config().getInterceptor() != null;
+
+ IgniteInternalFuture<CacheEntry<K, V>> fr = fut.chain(new CX1<IgniteInternalFuture<T2<V, GridCacheVersion>>, CacheEntry<K, V>>() {
+ @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f)
+ throws IgniteCheckedException {
+ T2<V, GridCacheVersion> t = f.get();
+
+ CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null;
+ if (intercept) {
+ V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);
+
+ return new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null);
+ }
+ else
+ return val;
+ }
+ });
+
+ if (statsEnabled)
+ fut.listen(new UpdateGetTimeStatClosure<T2<V, GridCacheVersion>>(metrics0(), start));
+
+ return fr;
+ }
+
+ /** {@inheritDoc} */
@Override public Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException {
A.notNull(keys, "keys");
@@ -1417,7 +1483,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
long start = statsEnabled ? System.nanoTime() : 0L;
- Map<K, V> map = getAll(keys, !ctx.keepBinary());
+ Map<K, V> map = getAll(keys, !ctx.keepBinary(), false);
if (ctx.config().getInterceptor() != null)
map = interceptGet(keys, map);
@@ -1429,6 +1495,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @Override public Collection<CacheEntry<K, V>> getEntries(@Nullable Collection<? extends K> keys)
+ throws IgniteCheckedException {
+ A.notNull(keys, "keys");
+
+ boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
+ Map<K, T2<V, GridCacheVersion>> map = (Map<K, T2<V, GridCacheVersion>>)getAll(keys, !ctx.keepBinary(), true);
+
+ Collection<CacheEntry<K, V>> res = new HashSet<>();
+
+ if (ctx.config().getInterceptor() != null)
+ res = interceptGetEntries(keys, map);
+ else
+ for (Map.Entry<K, T2<V, GridCacheVersion>> e : map.entrySet())
+ res.add(new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2()));
+
+ if (statsEnabled)
+ metrics0().addGetTimeNanos(System.nanoTime() - start);
+
+ return res;
+ }
+
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys) {
A.notNull(keys, "keys");
@@ -1436,7 +1528,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final long start = statsEnabled ? System.nanoTime() : 0L;
- IgniteInternalFuture<Map<K, V>> fut = getAllAsync(keys, !ctx.keepBinary());
+ IgniteInternalFuture<Map<K, V>> fut = getAllAsync(keys, !ctx.keepBinary(), false);
if (ctx.config().getInterceptor() != null)
return fut.chain(new CX1<IgniteInternalFuture<Map<K, V>>, Map<K, V>>() {
@@ -1451,6 +1543,46 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return fut;
}
+ @Override
+ public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(
+ @Nullable final Collection<? extends K> keys) {
+ A.notNull(keys, "keys");
+
+ final boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+ final long start = statsEnabled ? System.nanoTime() : 0L;
+
+ IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> fut =
+ (IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>)
+ ((IgniteInternalFuture)getAllAsync(keys, !ctx.keepBinary(), true));
+
+ final boolean intercept = ctx.config().getInterceptor() != null;
+
+ IgniteInternalFuture<Collection<CacheEntry<K, V>>> rf =
+ fut.chain(new CX1<IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>, Collection<CacheEntry<K, V>>>() {
+ @Override
+ public Collection<CacheEntry<K, V>> applyx(IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> f)
+ throws IgniteCheckedException {
+ if (intercept)
+ return interceptGetEntries(keys, f.get());
+ else {
+ Map<K, CacheEntry<K, V>> res = U.newHashMap(f.get().size());
+
+ for (Map.Entry<K, T2<V, GridCacheVersion>> e : f.get().entrySet())
+ res.put(e.getKey(),
+ new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2()));
+
+ return res.values();
+ }
+ }
+ });
+
+ if (statsEnabled)
+ fut.listen(new UpdateGetTimeStatClosure<Map<K, T2<V, GridCacheVersion>>>(metrics0(), start));
+
+ return rf;
+ }
+
/**
* Applies cache interceptor on result of 'get' operation.
*
@@ -1493,6 +1625,55 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * Applies cache interceptor on result of 'get' operation.
+ *
+ * @param keys All requested keys.
+ * @param map Result map.
+ * @return Map with values returned by cache interceptor..
+ */
+ @SuppressWarnings("IfMayBeConditional")
+ private Collection<CacheEntry<K, V>> interceptGetEntries(@Nullable Collection<? extends K> keys, Map<K, T2<V, GridCacheVersion>> map) {
+ Map<K, CacheEntry<K, V>> res;
+
+ if (F.isEmpty(keys)) {
+ res = U.newHashMap(map.size());
+
+ for (Map.Entry<K, T2<V, GridCacheVersion>> e : map.entrySet())
+ res.put(e.getKey(), new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2()));
+
+ return res.values();
+ }
+
+ res = U.newHashMap(keys.size());
+
+ CacheInterceptor<K, V> interceptor = cacheCfg.getInterceptor();
+
+ assert interceptor != null;
+
+ for (Map.Entry<K, T2<V, GridCacheVersion>> e : map.entrySet()) {
+ V val = interceptor.onGet(e.getKey(), e.getValue().get1());
+
+ if (val != null)
+ res.put(e.getKey(), new CacheEntryImplEx<>(e.getKey(), val, e.getValue().get2()));
+ }
+
+ if (map.size() != keys.size()) { // Not all requested keys were in cache.
+ for (K key : keys) {
+ if (key != null) {
+ if (!map.containsKey(key)) {
+ V val = interceptor.onGet(key, null);
+
+ if (val != null)
+ res.put(key, new CacheEntryImplEx<>(key, val, null));
+ }
+ }
+ }
+ }
+
+ return res.values();
+ }
+
+ /**
* @param key Key.
* @param forcePrimary Force primary.
* @param skipTx Skip tx.
@@ -1511,7 +1692,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
String taskName,
boolean deserializeBinary,
final boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
return getAllAsync(Collections.singletonList(key),
forcePrimary,
@@ -1520,7 +1702,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
deserializeBinary,
skipVals,
- canRemap).chain(
+ canRemap,
+ needVer).chain(
new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
@Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
Map<K, V> map = e.get();
@@ -1558,7 +1741,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
String taskName,
boolean deserializeBinary,
boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -1573,7 +1757,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
forcePrimary,
skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
skipVals,
- canRemap);
+ canRemap,
+ needVer);
}
/**
@@ -1599,7 +1784,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean forcePrimary,
@Nullable IgniteCacheExpiryPolicy expiry,
final boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -1616,7 +1802,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
skipVals,
false,
canRemap,
- false);
+ needVer);
}
/**
@@ -1712,9 +1898,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
else {
if (needVer) {
- assert keepCacheObjects;
-
- map.put((K1)key, (V1)new T2<>(res.get1(), res.get2()));
+ ctx.addResult(map,
+ key,
+ res.get1(),
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ true,
+ res.get2());
}
else {
ctx.addResult(map,
@@ -1787,9 +1978,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
// Don't put key-value pair into result map if value is null.
if (val != null) {
if (needVer) {
- assert keepCacheObjects;
-
- map.put((K1)key, (V1)new T2<>(cacheVal, set ? verSet : ver));
+ ctx.addResult(map,
+ key,
+ cacheVal,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ set ? verSet : ver);
}
else {
ctx.addResult(map,
@@ -4540,12 +4736,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Cached value.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public V get(K key, boolean deserializeBinary) throws IgniteCheckedException {
+ @Nullable public V get(K key, boolean deserializeBinary, final boolean needVer) throws IgniteCheckedException {
checkJta();
String taskName = ctx.kernalContext().job().currentTaskName();
- return get(key, taskName, deserializeBinary);
+ return get(key, taskName, deserializeBinary, needVer);
}
/**
@@ -4558,7 +4754,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
protected V get(
final K key,
String taskName,
- boolean deserializeBinary) throws IgniteCheckedException {
+ boolean deserializeBinary,
+ boolean needVer) throws IgniteCheckedException {
return getAsync(key,
!ctx.config().isReadFromBackup(),
/*skip tx*/false,
@@ -4566,7 +4763,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
deserializeBinary,
false,
- /*can remap*/true).get();
+ /*can remap*/true,
+ needVer).get();
}
/**
@@ -4574,7 +4772,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param deserializeBinary Deserialize binary flag.
* @return Read operation future.
*/
- public final IgniteInternalFuture<V> getAsync(final K key, boolean deserializeBinary) {
+ public final IgniteInternalFuture<V> getAsync(final K key, boolean deserializeBinary, final boolean needVer) {
try {
checkJta();
}
@@ -4591,7 +4789,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
deserializeBinary,
false,
- /*can remap*/true);
+ /*can remap*/true,
+ needVer);
}
/**
@@ -4600,10 +4799,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Map of cached values.
* @throws IgniteCheckedException If read failed.
*/
- public Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary) throws IgniteCheckedException {
+ public Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary,
+ boolean needVer) throws IgniteCheckedException {
checkJta();
- return getAllAsync(keys, deserializeBinary).get();
+ return getAllAsync(keys, deserializeBinary, needVer).get();
}
/**
@@ -4612,7 +4812,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Read future.
*/
public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys,
- boolean deserializeBinary) {
+ boolean deserializeBinary, boolean needVer) {
String taskName = ctx.kernalContext().job().currentTaskName();
return getAllAsync(keys,
@@ -4622,7 +4822,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
deserializeBinary,
/*skip vals*/false,
- /*can remap*/true);
+ /*can remap*/true,
+ needVer);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index fc48b9d..354c46a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.GPC;
@@ -1882,6 +1883,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @param keepCacheObjects Keep cache objects flag.
* @param deserializeBinary Deserialize binary flag.
* @param cpy Copy flag.
+ * @param ver GridCacheVersion.
*/
@SuppressWarnings("unchecked")
public <K1, V1> void addResult(Map<K1, V1> map,
@@ -1890,7 +1892,8 @@ public class GridCacheContext<K, V> implements Externalizable {
boolean skipVals,
boolean keepCacheObjects,
boolean deserializeBinary,
- boolean cpy) {
+ boolean cpy,
+ final GridCacheVersion ver) {
assert key != null;
assert val != null || skipVals;
@@ -1902,10 +1905,29 @@ public class GridCacheContext<K, V> implements Externalizable {
assert key0 != null : key;
assert val0 != null : val;
- map.put((K1)key0, (V1)val0);
+ map.put((K1)key0, ver != null ? (V1)new T2<>(val0, ver) : (V1)val0);
}
else
- map.put((K1)key, (V1)(skipVals ? true : val));
+ map.put((K1)key, (V1)(skipVals ? true : ver != null ? (V1)new T2<>(val, ver) : val));
+ }
+
+ /**
+ * @param map Map.
+ * @param key Key.
+ * @param val Value.
+ * @param skipVals Skip values flag.
+ * @param keepCacheObjects Keep cache objects flag.
+ * @param deserializeBinary Deserialize binary flag.
+ * @param cpy Copy flag.
+ */
+ public <K1, V1> void addResult(Map<K1, V1> map,
+ KeyCacheObject key,
+ CacheObject val,
+ boolean skipVals,
+ boolean keepCacheObjects,
+ boolean deserializeBinary,
+ boolean cpy) {
+ addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, cpy, null);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c896a82..e4d3290 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -882,7 +882,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateTtl(expiryPlc);
if (retVer) {
- resVer = isNear() ? ((GridNearCacheEntry)this).dhtVersion() : this.ver;
+ resVer = (isNear() && !cctx.atomic()) ? ((GridNearCacheEntry)this).dhtVersion() : this.ver;
if (resVer == null)
ret = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 8ffd273..004c445 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -31,6 +31,7 @@ import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
@@ -304,6 +305,17 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
}
+ @Nullable @Override public CacheEntry<K, V> getEntry(K key) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.getEntry(key);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
/** {@inheritDoc} */
@Override public V getTopologySafe(K key) throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);
@@ -328,6 +340,17 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
}
+ @Override public IgniteInternalFuture<CacheEntry<K, V>> getEntryAsync(K key) {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.getEntryAsync(key);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
/** {@inheritDoc} */
@Override public V getForcePrimary(K key) throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);
@@ -448,6 +471,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
}
+ @Override public Collection<CacheEntry<K, V>> getEntries(
+ @Nullable Collection<? extends K> keys) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.getEntries(keys);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys) {
CacheOperationContext prev = gate.enter(opCtx);
@@ -460,6 +495,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
}
+ @Override
+ public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(@Nullable Collection<? extends K> keys) {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.getEntriesAsync(keys);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
/** {@inheritDoc} */
@Nullable @Override public V getAndPut(K key, V val)
throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b64c69c..904b79b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -44,6 +44,7 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheManager;
import org.apache.ignite.cache.CacheMetrics;
@@ -872,6 +873,30 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
+ @Override public CacheEntry<K, V> getEntry(K key) {
+ try {
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
+
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getEntryAsync(key));
+
+ return null;
+ }
+ else
+ return delegate.getEntry(key);
+ }
+ finally {
+ onLeave(gate, prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
/** {@inheritDoc} */
@Override public Map<K, V> getAll(Set<? extends K> keys) {
try {
@@ -898,6 +923,31 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
+ @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) {
+ try {
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
+
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getEntriesAsync(keys));
+
+ return null;
+ }
+ else
+ return delegate.getEntries(keys);
+ }
+ finally {
+ onLeave(gate, prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
try {
GridCacheGateway<K, V> gate = this.gate;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 433290c..68d0f06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -31,6 +31,7 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
@@ -335,6 +336,28 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
@Nullable public V get(K key) throws IgniteCheckedException;
/**
+ * Retrieves value mapped to the specified key from cache. Value will only be returned if
+ * its entry passed the optional filter provided. Filter check is atomic, and therefore the
+ * returned value is guaranteed to be consistent with the filter. The return value of {@code null}
+ * means entry did not pass the provided filter or cache has no mapping for the
+ * key.
+ * <p>
+ * If the value is not present in cache, then it will be looked up from swap storage. If
+ * it's not present in swap, or if swap is disable, and if read-through is allowed, value
+ * will be loaded from {@link CacheStore} persistent storage via
+ * <code>CacheStore#load(Transaction, Object)</code> method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ *
+ * @param key Key to retrieve the value for.
+ * @return Value for the given key.
+ * @throws IgniteCheckedException If get operation failed.
+ * @throws NullPointerException if the key is {@code null}.
+ */
+ @Nullable public CacheEntry<K, V> getEntry(K key) throws IgniteCheckedException;
+
+ /**
* Asynchronously retrieves value mapped to the specified key from cache. Value will only be returned if
* its entry passed the optional filter provided. Filter check is atomic, and therefore the
* returned value is guaranteed to be consistent with the filter. The return value of {@code null}
@@ -356,6 +379,27 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
public IgniteInternalFuture<V> getAsync(K key);
/**
+ * Asynchronously retrieves value mapped to the specified key from cache. Value will only be returned if
+ * its entry passed the optional filter provided. Filter check is atomic, and therefore the
+ * returned value is guaranteed to be consistent with the filter. The return value of {@code null}
+ * means entry did not pass the provided filter or cache has no mapping for the
+ * key.
+ * <p>
+ * If the value is not present in cache, then it will be looked up from swap storage. If
+ * it's not present in swap, or if swap is disabled, and if read-through is allowed, value
+ * will be loaded from {@link CacheStore} persistent storage via
+ * <code>CacheStore#load(Transaction, Object)</code> method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ *
+ * @param key Key for the value to get.
+ * @return Future for the get operation.
+ * @throws NullPointerException if the key is {@code null}.
+ */
+ public IgniteInternalFuture<CacheEntry<K, V>> getEntryAsync(K key);
+
+ /**
* Retrieves values mapped to the specified keys from cache. Value will only be returned if
* its entry passed the optional filter provided. Filter check is atomic, and therefore the
* returned value is guaranteed to be consistent with the filter. If requested key-value pair
@@ -377,6 +421,27 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
public Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException;
/**
+ * Retrieves values mapped to the specified keys from cache. Value will only be returned if
+ * its entry passed the optional filter provided. Filter check is atomic, and therefore the
+ * returned value is guaranteed to be consistent with the filter. If requested key-value pair
+ * is not present in the returned map, then it means that its entry did not pass the provided
+ * filter or cache has no mapping for the key.
+ * <p>
+ * If some value is not present in cache, then it will be looked up from swap storage. If
+ * it's not present in swap, or if swap is disabled, and if read-through is allowed, value
+ * will be loaded from {@link CacheStore} persistent storage via
+ * <code>CacheStore#loadAll(Transaction, Collection, org.apache.ignite.lang.IgniteBiInClosure)</code> method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ *
+ * @param keys Keys to get.
+ * @return Map of key-value pairs.
+ * @throws IgniteCheckedException If get operation failed.
+ */
+ public Collection<CacheEntry<K, V>> getEntries(@Nullable Collection<? extends K> keys) throws IgniteCheckedException;
+
+ /**
* Asynchronously retrieves values mapped to the specified keys from cache. Value will only be returned if
* its entry passed the optional filter provided. Filter check is atomic, and therefore the
* returned value is guaranteed to be consistent with the filter. If requested key-value pair
@@ -397,6 +462,26 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys);
/**
+ * Asynchronously retrieves values mapped to the specified keys from cache. Value will only be returned if
+ * its entry passed the optional filter provided. Filter check is atomic, and therefore the
+ * returned value is guaranteed to be consistent with the filter. If requested key-value pair
+ * is not present in the returned map, then it means that its entry did not pass the provided
+ * filter or cache has no mapping for the key.
+ * <p>
+ * If some value is not present in cache, then it will be looked up from swap storage. If
+ * it's not present in swap, or if swap is disabled, and if read-through is allowed, value
+ * will be loaded from {@link CacheStore} persistent storage via
+ * <code>CacheStore#loadAll(Transaction, Collection, org.apache.ignite.lang.IgniteBiInClosure)</code> method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ *
+ * @param keys Key for the value to get.
+ * @return Future for the get operation.
+ */
+ public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(@Nullable Collection<? extends K> keys);
+
+ /**
* Stores given key-value pair in cache. If filters are provided, then entries will
* be stored in cache only if they pass the filter. Note that filter check is atomic,
* so value stored in cache is guaranteed to be consistent with the filters. If cache
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 7efaf49..28c94dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -153,21 +153,6 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
}
/**
- * @param map Result map.
- * @param key Key.
- * @param val Value.
- * @param ver Version.
- */
- @SuppressWarnings("unchecked")
- protected final void versionedResult(Map map, KeyCacheObject key, Object val, GridCacheVersion ver) {
- assert needVer;
- assert skipVals || val != null;
- assert ver != null;
-
- map.put(key, new T2<>(skipVals ? true : val, ver));
- }
-
- /**
* Affinity node to send get request to.
*
* @param affNodes All affinity nodes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 9cf8084..33c2f9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -627,7 +627,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
String taskName,
boolean deserializeBinary,
boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -640,7 +641,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
forcePrimary,
null,
skipVals,
- canRemap);
+ canRemap,
+ needVer);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 1f2d7c5..63d236e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -491,8 +491,15 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
}
else {
if (needVer)
- versionedResult(locVals, key, v, ver);
- else {
+ cctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ true,
+ ver);
+ else
cctx.addResult(locVals,
key,
v,
@@ -500,7 +507,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
keepCacheObjects,
deserializeBinary,
true);
- }
return true;
}
@@ -553,7 +559,14 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
assert skipVals == (info.value() == null);
if (needVer)
- versionedResult(map, info.key(), info.value(), info.version());
+ cctx.addResult(map,
+ info.key(),
+ info.value(),
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ info.version());
else {
cctx.addResult(map,
info.key(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 0c811ae..5f1688b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -628,7 +628,13 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
if (needVer) {
assert ver != null;
- onDone(new T2<>(val, ver));
+ if (!keepCacheObjects) {
+ Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary && !skipVals);
+
+ onDone(new T2<>(res, ver));
+ }
+ else
+ onDone(new T2<>(val, ver));
}
else {
if (!keepCacheObjects) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index aa79cfa..4d1b1a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -77,7 +77,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -94,6 +93,7 @@ import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -317,7 +317,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override protected V get(K key, String taskName, boolean deserializeBinary) throws IgniteCheckedException {
+ @Override protected V get(K key, String taskName, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
if (keyCheck)
@@ -339,7 +339,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiryPlc,
false,
skipStore,
- true).get();
+ true,
+ needVer).get();
}
/** {@inheritDoc} */
@@ -350,7 +351,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- final boolean canRemap) {
+ final boolean canRemap,
+ final boolean needVer) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
if (keyCheck)
@@ -376,7 +378,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiryPlc,
skipVals,
skipStore,
- canRemap);
+ canRemap,
+ needVer);
}
});
}
@@ -390,7 +393,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- final boolean canRemap
+ final boolean canRemap,
+ final boolean needVer
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -420,7 +424,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiryPlc,
skipVals,
skipStore,
- canRemap);
+ canRemap,
+ needVer);
}
});
}
@@ -1046,7 +1051,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable ExpiryPolicy expiryPlc,
boolean skipVals,
boolean skipStore,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
ctx.shared().exchange().readyAffinityVersion();
@@ -1064,7 +1070,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiry,
skipVals,
canRemap,
- false,
+ needVer,
false);
fut.init();
@@ -1093,7 +1099,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable ExpiryPolicy expiryPlc,
boolean skipVals,
boolean skipStore,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
ctx.shared().exchange().readyAffinityVersion();
@@ -1118,19 +1125,42 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (entry != null) {
boolean isNew = entry.isNewLocked();
- CacheObject v = entry.innerGet(null,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary);
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ true);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+ }
+ }
+ else {
+ v = entry.innerGet(null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+ }
// Entry was not in memory or in swap, so we remove it from cache.
if (v == null) {
@@ -1141,6 +1171,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
success = false;
}
+ else if (needVer)
+ ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true, ver);
else
ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true);
}
@@ -1194,7 +1226,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiry,
skipVals,
canRemap,
- false,
+ needVer,
false);
fut.init();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 073043d..7827e87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -200,7 +200,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- boolean canRemap) {
+ boolean canRemap,
+ boolean needVer) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
if (keyCheck)
@@ -258,7 +259,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
skipVals,
canRemap,
- /*needVer*/false,
+ /*needVer*/needVer,
/*keepCacheObjects*/false);
fut.init();
@@ -275,7 +276,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -318,7 +320,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
deserializeBinary,
skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
skipVals,
- canRemap);
+ canRemap,
+ needVer);
}
/** {@inheritDoc} */
@@ -357,7 +360,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean deserializeBinary,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
- boolean canRemap) {
+ boolean canRemap,
+ boolean needVer) {
return loadAsync(keys,
readThrough,
forcePrimary,
@@ -367,7 +371,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
expiryPlc,
skipVals,
canRemap,
- false,
+ needVer,
false);
}
@@ -523,7 +527,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
locVals = U.newHashMap(keys.size());
if (needVer)
- locVals.put((K)key, (V)new T2<>((Object)(skipVals ? true : v), ver));
+ ctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObj,
+ deserializeBinary,
+ true,
+ ver);
else {
ctx.addResult(locVals,
key,
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index a2d5adb..63c073d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -400,7 +400,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
String taskName,
boolean deserializeBinary,
boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -423,7 +424,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
skipVals ? null : opCtx != null ? opCtx.expiry() : null,
skipVals,
opCtx != null && opCtx.skipStore(),
- canRemap);
+ canRemap,
+ needVer);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 5bf18d9..8153fcd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -241,7 +241,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
@Nullable ExpiryPolicy expiryPlc,
boolean skipVal,
boolean skipStore,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -261,7 +262,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
expiry,
skipVal,
canRemap,
- false,
+ needVer,
false);
// init() will register future for responses if future has remote mappings.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index c0a1617..cfad910 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -21,17 +21,22 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvcc;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -350,7 +355,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
null,
false,
/*skip store*/false,
- /*can remap*/true
+ /*can remap*/true,
+ false
).get().get(keyValue(false));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 9291001..5128b3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -651,9 +651,19 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
@SuppressWarnings("unchecked")
private void addResult(KeyCacheObject key, CacheObject v, GridCacheVersion ver) {
if (needVer) {
- V val0 = (V)new T2<>(skipVals ? true : v, ver);
+ if (keepCacheObjects) {
+ V val0 = (V)new T2<>(skipVals ? true : v, ver);
- add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
+ add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
+ }
+ else {
+ K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
+ V val0 = (V)new T2<>(skipVals ? true : !skipVals ?
+ (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
+ (V)Boolean.TRUE, ver);
+
+ add(new GridFinishedFuture<>(Collections.singletonMap((K)key0, val0)));
+ }
}
else {
if (keepCacheObjects) {
@@ -742,7 +752,14 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
assert skipVals == (info.value() == null);
if (needVer)
- versionedResult(map, key, val, info.version());
+ cctx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ ver);
else
cctx.addResult(map,
key,
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index a09dec0..50279d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -122,7 +122,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -162,7 +163,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
skipVals ? null : opCtx != null ? opCtx.expiry() : null,
skipVals,
skipStore,
- canRemap);
+ canRemap,
+ needVer);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 4ce1f36..967728b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.util.Collection;
import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 6130ead..e120862 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -477,7 +477,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override @Nullable public V get(K key, boolean deserializeBinary) throws IgniteCheckedException {
+ @Override @Nullable public V get(K key, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException {
String taskName = ctx.kernalContext().job().currentTaskName();
Map<K, V> m = getAllInternal(Collections.singleton(key),
@@ -485,7 +485,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ctx.readThrough(),
taskName,
deserializeBinary,
- false);
+ false,
+ needVer);
assert m.isEmpty() || m.size() == 1 : m.size();
@@ -494,7 +495,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public final Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary)
+ @Override public final Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer)
throws IgniteCheckedException {
A.notNull(keys, "keys");
@@ -505,7 +506,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ctx.readThrough(),
taskName,
deserializeBinary,
- false);
+ false,
+ needVer);
}
@@ -519,7 +521,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
final String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
A.notNull(keys, "keys");
@@ -528,7 +531,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return asyncOp(new Callable<Map<K, V>>() {
@Override public Map<K, V> call() throws Exception {
- return getAllInternal(keys, swapOrOffheap, storeEnabled, taskName, deserializeBinary, skipVals);
+ return getAllInternal(keys, swapOrOffheap, storeEnabled, taskName, deserializeBinary, skipVals, needVer);
}
});
}
@@ -551,7 +554,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
boolean storeEnabled,
String taskName,
boolean deserializeBinary,
- boolean skipVals
+ boolean skipVals,
+ boolean needVer
) throws IgniteCheckedException {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -584,24 +588,76 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
if (entry != null) {
- CacheObject v = entry.innerGet(null,
- /*swap*/swapOrOffheap,
- /*read-through*/false,
- /*fail-fast*/false,
- /*unmarshal*/true,
- /**update-metrics*/true,
- /**event*/!skipVals,
- /**temporary*/false,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary);
- if (v != null)
- ctx.addResult(vals, cacheKey, v, skipVals, false, deserializeBinary, true);
- else
- success = false;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/swapOrOffheap,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+
+ ctx.addResult(
+ vals,
+ cacheKey,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ ver);
+ }else
+ success = false;
+ }
+ else {
+ v = entry.innerGet(null,
+ /*swap*/swapOrOffheap,
+ /*read-through*/false,
+ /*fail-fast*/false,
+ /*unmarshal*/true,
+ /**update-metrics*/true,
+ /**event*/!skipVals,
+ /**temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ if (v != null) {
+ if (needVer)
+ ctx.addResult(vals,
+ cacheKey,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ ver);
+ else
+ ctx.addResult(vals,
+ cacheKey,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true);
+ }
+ else
+ success = false;
+ }
}
else {
if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
@@ -638,7 +694,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/*force primary*/false,
expiry,
skipVals,
- /*can remap*/true).get();
+ /*can remap*/true,
+ needVer).get();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad54ec25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 926eaf2..023ca52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1511,13 +1511,23 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
if (val != null) {
- cacheCtx.addResult(map,
- key,
- val,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false);
+ if (needReadVer)
+ cacheCtx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ ver);
+ else
+ cacheCtx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false);
}
else
missed.put(key, ver);