You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/10 12:55:50 UTC
[03/50] [abbrv] ignite git commit: 2224
2224
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01135066
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01135066
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01135066
Branch: refs/heads/ignite-1786
Commit: 01135066d54df254a0b23afbbffca2ed103e3a8c
Parents: 62502b2
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Feb 2 15:25:05 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Feb 2 15:25:05 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 45 +-
.../processors/cache/CacheEntryImplEx.java | 14 +-
.../processors/cache/GridCacheAdapter.java | 297 +++++--
.../processors/cache/GridCacheContext.java | 33 +-
.../processors/cache/GridCacheMapEntry.java | 2 +-
.../processors/cache/GridCacheProxyImpl.java | 51 ++
.../processors/cache/IgniteCacheProxy.java | 51 ++
.../processors/cache/IgniteInternalCache.java | 85 ++
.../dht/CacheDistributedGetFutureAdapter.java | 15 -
.../distributed/dht/GridDhtCacheAdapter.java | 7 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 6 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 4 +-
.../dht/GridPartitionedGetFuture.java | 38 +-
.../dht/GridPartitionedSingleGetFuture.java | 17 +-
.../dht/atomic/GridDhtAtomicCache.java | 82 +-
.../dht/colocated/GridDhtColocatedCache.java | 42 +-
.../distributed/near/GridNearAtomicCache.java | 6 +-
.../distributed/near/GridNearCacheAdapter.java | 6 +-
.../distributed/near/GridNearCacheEntry.java | 3 +-
.../distributed/near/GridNearGetFuture.java | 45 +-
...arOptimisticSerializableTxPrepareFuture.java | 2 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 4 +
.../GridNearPessimisticTxPrepareFuture.java | 2 +
.../near/GridNearTransactionalCache.java | 9 +-
.../local/atomic/GridLocalAtomicCache.java | 97 ++-
.../cache/transactions/IgniteTxEntry.java | 32 +-
.../transactions/IgniteTxLocalAdapter.java | 196 +++--
.../cache/transactions/IgniteTxLocalEx.java | 3 +-
.../cache/transactions/IgniteTxManager.java | 2 +-
.../cache/CacheGetEntryAbstractTest.java | 803 +++++++++++++++++++
...GetEntryOptimisticReadCommittedSeltTest.java | 36 +
...etEntryOptimisticRepeatableReadSeltTest.java | 36 +
...eGetEntryOptimisticSerializableSeltTest.java | 36 +
...etEntryPessimisticReadCommittedSeltTest.java | 36 +
...tEntryPessimisticRepeatableReadSeltTest.java | 36 +
...GetEntryPessimisticSerializableSeltTest.java | 36 +
.../cache/CacheReadThroughRestartSelfTest.java | 43 +-
.../CacheSerializableTransactionsTest.java | 142 +++-
.../cache/GridCacheAbstractFullApiSelfTest.java | 141 ++++
.../GridCacheInterceptorAbstractSelfTest.java | 172 +++-
...GridCacheDhtEvictionNearReadersSelfTest.java | 2 +-
.../multijvm/IgniteCacheProcessProxy.java | 59 +-
.../testsuites/IgniteCacheTestSuite4.java | 12 +
.../config/benchmark-multicast.properties | 7 +
.../IgniteGetEntriesPutAllTxBenchmark.java | 73 ++
.../cache/IgnitePutGetEntryBenchmark.java | 47 ++
.../cache/IgnitePutGetEntryTxBenchmark.java | 73 ++
47 files changed, 2644 insertions(+), 342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 886dca6..a791e38 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -31,10 +31,12 @@ import javax.cache.CacheException;
import javax.cache.configuration.Configuration;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheWriter;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CacheMode;
@@ -390,18 +392,59 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* <code>null</code> value for a key.
*/
@IgniteAsyncSupported
- <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
+ public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
Object... args);
/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public V get(K key);
+ /**
+ * Gets an entry from the cache.
+ * <p>
+ * If the cache is configured to use read-through, and get would return null
+ * because the entry is missing from the cache, the Cache's {@link CacheLoader}
+ * is called in an attempt to load the entry.
+ *
+ * @param key the key whose associated value is to be returned
+ * @return the element, or null, if it does not exist.
+ * @throws IllegalStateException if the cache is {@link #isClosed()}
+ * @throws NullPointerException if the key is null
+ * @throws CacheException if there is a problem fetching the value
+ * @throws ClassCastException if the implementation is configured to perform
+ * runtime-type-checking, and the key or value types are incompatible with those that have been
+ * configured for the {@link Cache}
+ */
+ @IgniteAsyncSupported
+ public CacheEntry<K, V> getEntry(K key);
+
/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public Map<K, V> getAll(Set<? extends K> keys);
/**
+ * Gets a collection of entries from the {@link Cache}.
+ * <p>
+ * If the cache is configured read-through, and a get for a key would
+ * return null because an entry is missing from the cache, the Cache's
+ * {@link CacheLoader} is called in an attempt to load the entry. If an
+ * entry cannot be loaded for a given key, the key will not be present in
+ * the returned Collection.
+ *
+ * @param keys The keys whose associated values are to be returned.
+ * @return A collection of entries that were found for the given keys. Entries not found
+ * in the cache are not in the returned collection.
+ * @throws NullPointerException if keys is null or if keys contains a null
+ * @throws IllegalStateException if the cache is {@link #isClosed()}
+ * @throws CacheException if there is a problem fetching the values
+ * @throws ClassCastException if the implementation is configured to perform
+ * runtime-type-checking, and the key or value types are incompatible with those that have been
+ * configured for the {@link Cache}
+ */
+ @IgniteAsyncSupported
+ public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys);
+
+ /**
* Gets values from cache. Will bypass started transaction, if any, i.e. will not enlist entries
* and will not lock any keys if pessimistic transaction is started by thread.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
index 1c7111a..af926c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
@@ -21,9 +21,13 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
+
/**
*
*/
@@ -54,6 +58,14 @@ public class CacheEntryImplEx<K, V> extends CacheEntryImpl<K, V> implements Cach
/** {@inheritDoc} */
public GridCacheVersion version() {
+ if (ver == GET_ENTRY_INVALID_VER_AFTER_GET) {
+ throw new IgniteException("Impossible to get entry version after " +
+ "get() inside OPTIMISTIC REPEATABLE_READ transaction. Use only getEntry() or getEntries() inside " +
+ "OPTIMISTIC REPEATABLE_READ transaction to solve this problem.");
+ }
+ else if (ver == GET_ENTRY_INVALID_VER_UPDATED)
+ throw new IgniteException("Impossible to get version for entry updated in transaction.");
+
return ver;
}
@@ -81,7 +93,7 @@ public class CacheEntryImplEx<K, V> extends CacheEntryImpl<K, V> implements Cach
String res = "CacheEntry [key=" + getKey() +
", val=" + getValue();
- if (ver != null) {
+ if (ver != null && ver != GET_ENTRY_INVALID_VER_AFTER_GET && ver != GET_ENTRY_INVALID_VER_UPDATED) {
res += ", topVer=" + ver.topologyVersion() +
", nodeOrder=" + ver.nodeOrder() +
", order=" + ver.order() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9fd65e5..69abc54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -52,6 +52,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CachePeekMode;
@@ -607,7 +608,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*task name*/null,
/*deserialize binary*/false,
/*skip values*/true,
- /*can remap*/true
+ /*can remap*/true,
+ false
);
}
@@ -633,7 +635,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*task name*/null,
/*deserialize binary*/false,
/*skip values*/true,
- /*can remap*/true
+ /*can remap*/true,
+ false
).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
@Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
Map<K, V> kvMap = fut.get();
@@ -1296,7 +1299,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
/*deserialize cache objects*/true,
/*skip values*/false,
- /*can remap*/true
+ /*can remap*/true,
+ false
).get().get(key);
}
@@ -1312,7 +1316,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
true,
false,
- /*can remap*/true
+ /*can remap*/true,
+ false
).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
@Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
return e.get().get(key);
@@ -1332,7 +1337,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
/*deserialize cache objects*/true,
/*skip values*/false,
- /*can remap*/false
+ /*can remap*/false,
+ false
).get().get(key);
}
@@ -1352,7 +1358,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
!ctx.keepBinary(),
/*skip values*/false,
- /*can remap*/true);
+ /*can remap*/true,
+ false);
}
/**
@@ -1372,7 +1379,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
long start = statsEnabled ? System.nanoTime() : 0L;
- V val = get(key, !ctx.keepBinary());
+ V val = get(key, !ctx.keepBinary(), false);
if (ctx.config().getInterceptor() != null)
val = (V)ctx.config().getInterceptor().onGet(key, val);
@@ -1384,6 +1391,30 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @Nullable @Override public CacheEntry<K, V> getEntry(K key) throws IgniteCheckedException {
+ A.notNull(key, "key");
+
+ boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
+ T2<V, GridCacheVersion> t = (T2<V, GridCacheVersion>)get(key, !ctx.keepBinary(), true);
+
+ CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()): null;
+
+ if (ctx.config().getInterceptor() != null) {
+ V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);
+
+ val = (val0 != null) ? new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null) : null;
+ }
+
+ if (statsEnabled)
+ metrics0().addGetTimeNanos(System.nanoTime() - start);
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<V> getAsync(final K key) {
A.notNull(key, "key");
@@ -1391,7 +1422,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final long start = statsEnabled ? System.nanoTime() : 0L;
- IgniteInternalFuture<V> fut = getAsync(key, !ctx.keepBinary());
+ IgniteInternalFuture<V> fut = getAsync(key, !ctx.keepBinary(), false);
if (ctx.config().getInterceptor() != null)
fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() {
@@ -1407,6 +1438,42 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<CacheEntry<K, V>> getEntryAsync(final K key) {
+ A.notNull(key, "key");
+
+ final boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+ final long start = statsEnabled ? System.nanoTime() : 0L;
+
+ IgniteInternalFuture<T2<V, GridCacheVersion>> fut =
+ (IgniteInternalFuture<T2<V, GridCacheVersion>>)getAsync(key, !ctx.keepBinary(), true);
+
+ final boolean intercept = ctx.config().getInterceptor() != null;
+
+ IgniteInternalFuture<CacheEntry<K, V>> fr = fut.chain(
+ new CX1<IgniteInternalFuture<T2<V, GridCacheVersion>>, CacheEntry<K, V>>() {
+ @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f)
+ throws IgniteCheckedException {
+ T2<V, GridCacheVersion> t = f.get();
+
+ CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null;
+ if (intercept) {
+ V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);
+
+ return new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null);
+ }
+ else
+ return val;
+ }
+ });
+
+ if (statsEnabled)
+ fut.listen(new UpdateGetTimeStatClosure<T2<V, GridCacheVersion>>(metrics0(), start));
+
+ return fr;
+ }
+
+ /** {@inheritDoc} */
@Override public Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException {
A.notNull(keys, "keys");
@@ -1414,7 +1481,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
long start = statsEnabled ? System.nanoTime() : 0L;
- Map<K, V> map = getAll(keys, !ctx.keepBinary());
+ Map<K, V> map = getAll(keys, !ctx.keepBinary(), false);
if (ctx.config().getInterceptor() != null)
map = interceptGet(keys, map);
@@ -1426,6 +1493,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @Override public Collection<CacheEntry<K, V>> getEntries(@Nullable Collection<? extends K> keys)
+ throws IgniteCheckedException {
+ A.notNull(keys, "keys");
+
+ boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
+ Map<K, T2<V, GridCacheVersion>> map = (Map<K, T2<V, GridCacheVersion>>)getAll(keys, !ctx.keepBinary(), true);
+
+ Collection<CacheEntry<K, V>> res = new HashSet<>();
+
+ if (ctx.config().getInterceptor() != null)
+ res = interceptGetEntries(keys, map);
+ else
+ for (Map.Entry<K, T2<V, GridCacheVersion>> e : map.entrySet())
+ res.add(new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2()));
+
+ if (statsEnabled)
+ metrics0().addGetTimeNanos(System.nanoTime() - start);
+
+ return res;
+ }
+
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys) {
A.notNull(keys, "keys");
@@ -1433,7 +1526,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final long start = statsEnabled ? System.nanoTime() : 0L;
- IgniteInternalFuture<Map<K, V>> fut = getAllAsync(keys, !ctx.keepBinary());
+ IgniteInternalFuture<Map<K, V>> fut = getAllAsync(keys, !ctx.keepBinary(), false);
if (ctx.config().getInterceptor() != null)
return fut.chain(new CX1<IgniteInternalFuture<Map<K, V>>, Map<K, V>>() {
@@ -1448,6 +1541,45 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return fut;
}
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(
+ @Nullable final Collection<? extends K> keys) {
+ A.notNull(keys, "keys");
+
+ final boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+ final long start = statsEnabled ? System.nanoTime() : 0L;
+
+ IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> fut =
+ (IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>)
+ ((IgniteInternalFuture)getAllAsync(keys, !ctx.keepBinary(), true));
+
+ final boolean intercept = ctx.config().getInterceptor() != null;
+
+ IgniteInternalFuture<Collection<CacheEntry<K, V>>> rf =
+ fut.chain(new CX1<IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>, Collection<CacheEntry<K, V>>>() {
+ @Override public Collection<CacheEntry<K, V>> applyx(
+ IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> f) throws IgniteCheckedException {
+ if (intercept)
+ return interceptGetEntries(keys, f.get());
+ else {
+ Map<K, CacheEntry<K, V>> res = U.newHashMap(f.get().size());
+
+ for (Map.Entry<K, T2<V, GridCacheVersion>> e : f.get().entrySet())
+ res.put(e.getKey(),
+ new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2()));
+
+ return res.values();
+ }
+ }
+ });
+
+ if (statsEnabled)
+ fut.listen(new UpdateGetTimeStatClosure<Map<K, T2<V, GridCacheVersion>>>(metrics0(), start));
+
+ return rf;
+ }
+
/**
* Applies cache interceptor on result of 'get' operation.
*
@@ -1490,6 +1622,53 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * Applies cache interceptor on result of 'getEntries' operation.
+ *
+ * @param keys All requested keys.
+ * @param map Result map.
+ * @return Map with values returned by cache interceptor..
+ */
+ @SuppressWarnings("IfMayBeConditional")
+ private Collection<CacheEntry<K, V>> interceptGetEntries(
+ @Nullable Collection<? extends K> keys, Map<K, T2<V, GridCacheVersion>> map) {
+ Map<K, CacheEntry<K, V>> res;
+
+ if (F.isEmpty(keys)) {
+ assert map.isEmpty();
+
+ return Collections.emptySet();
+ }
+
+ res = U.newHashMap(keys.size());
+
+ CacheInterceptor<K, V> interceptor = cacheCfg.getInterceptor();
+
+ assert interceptor != null;
+
+ for (Map.Entry<K, T2<V, GridCacheVersion>> e : map.entrySet()) {
+ V val = interceptor.onGet(e.getKey(), e.getValue().get1());
+
+ if (val != null)
+ res.put(e.getKey(), new CacheEntryImplEx<>(e.getKey(), val, e.getValue().get2()));
+ }
+
+ if (map.size() != keys.size()) { // Not all requested keys were in cache.
+ for (K key : keys) {
+ if (key != null) {
+ if (!map.containsKey(key)) {
+ V val = interceptor.onGet(key, null);
+
+ if (val != null)
+ res.put(key, new CacheEntryImplEx<>(key, val, null));
+ }
+ }
+ }
+ }
+
+ return res.values();
+ }
+
+ /**
* @param key Key.
* @param forcePrimary Force primary.
* @param skipTx Skip tx.
@@ -1498,6 +1677,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param deserializeBinary Deserialize binary.
* @param skipVals Skip values.
* @param canRemap Can remap flag.
+ * @param needVer Need version.
* @return Future for the get operation.
*/
protected IgniteInternalFuture<V> getAsync(
@@ -1508,7 +1688,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
String taskName,
boolean deserializeBinary,
final boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
return getAllAsync(Collections.singletonList(key),
forcePrimary,
@@ -1517,7 +1698,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
deserializeBinary,
skipVals,
- canRemap).chain(
+ canRemap,
+ needVer).chain(
new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
@Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
Map<K, V> map = e.get();
@@ -1544,6 +1726,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param deserializeBinary Deserialize binary.
* @param skipVals Skip values.
* @param canRemap Can remap flag.
+ * @param needVer Need version.
* @return Future for the get operation.
* @see GridCacheAdapter#getAllAsync(Collection)
*/
@@ -1555,7 +1738,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
String taskName,
boolean deserializeBinary,
boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -1570,7 +1754,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
forcePrimary,
skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
skipVals,
- canRemap);
+ canRemap,
+ needVer);
}
/**
@@ -1584,6 +1769,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param expiry Expiry policy.
* @param skipVals Skip values.
* @param canRemap Can remap flag.
+ * @param needVer Need version.
* @return Future for the get operation.
* @see GridCacheAdapter#getAllAsync(Collection)
*/
@@ -1596,7 +1782,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean forcePrimary,
@Nullable IgniteCacheExpiryPolicy expiry,
final boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -1613,7 +1800,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
skipVals,
false,
canRemap,
- false);
+ needVer);
}
/**
@@ -1708,20 +1895,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.evicts().touch(entry, topVer);
}
else {
- if (needVer) {
- assert keepCacheObjects;
-
- map.put((K1)key, (V1)new T2<>(res.get1(), res.get2()));
- }
- else {
- ctx.addResult(map,
- key,
- res.get1(),
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- true);
- }
+ ctx.addResult(map,
+ key,
+ res.get1(),
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ true,
+ needVer ? res.get2() : null);
if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
ctx.evicts().touch(entry, topVer);
@@ -1783,20 +1964,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
// Don't put key-value pair into result map if value is null.
if (val != null) {
- if (needVer) {
- assert keepCacheObjects;
-
- map.put((K1)key, (V1)new T2<>(cacheVal, set ? verSet : ver));
- }
- else {
- ctx.addResult(map,
- key,
- cacheVal,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false);
- }
+ ctx.addResult(map,
+ key,
+ cacheVal,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ needVer ? set ? verSet : ver : null);
}
if (tx0 == null || (!tx0.implicit() &&
@@ -1889,11 +2064,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
}
else {
- assert !needVer;
-
return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) {
@Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) {
- return tx.getAllAsync(ctx, keys, deserializeBinary, skipVals, false, !readThrough);
+ return tx.getAllAsync(ctx, keys, deserializeBinary, skipVals, false, !readThrough, needVer);
}
}, ctx.operationContextPerCall());
}
@@ -4494,28 +4667,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @param key Key.
* @param deserializeBinary Deserialize binary flag.
+ * @param needVer Need version.
* @return Cached value.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public V get(K key, boolean deserializeBinary) throws IgniteCheckedException {
+ @Nullable public V get(K key, boolean deserializeBinary, final boolean needVer) throws IgniteCheckedException {
checkJta();
String taskName = ctx.kernalContext().job().currentTaskName();
- return get(key, taskName, deserializeBinary);
+ return get(key, taskName, deserializeBinary, needVer);
}
/**
* @param key Key.
* @param taskName Task name.
* @param deserializeBinary Deserialize binary flag.
+ * @param needVer Need version.
* @return Cached value.
* @throws IgniteCheckedException If failed.
*/
protected V get(
final K key,
String taskName,
- boolean deserializeBinary) throws IgniteCheckedException {
+ boolean deserializeBinary,
+ boolean needVer) throws IgniteCheckedException {
return getAsync(key,
!ctx.config().isReadFromBackup(),
/*skip tx*/false,
@@ -4523,15 +4699,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
deserializeBinary,
false,
- /*can remap*/true).get();
+ /*can remap*/true,
+ needVer).get();
}
/**
* @param key Key.
* @param deserializeBinary Deserialize binary flag.
+ * @param needVer Need version.
* @return Read operation future.
*/
- public final IgniteInternalFuture<V> getAsync(final K key, boolean deserializeBinary) {
+ public final IgniteInternalFuture<V> getAsync(final K key, boolean deserializeBinary, final boolean needVer) {
try {
checkJta();
}
@@ -4548,28 +4726,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
deserializeBinary,
false,
- /*can remap*/true);
+ /*can remap*/true,
+ needVer);
}
/**
* @param keys Keys.
* @param deserializeBinary Deserialize binary flag.
+ * @param needVer Need version.
* @return Map of cached values.
* @throws IgniteCheckedException If read failed.
*/
- public Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary) throws IgniteCheckedException {
+ public Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary,
+ boolean needVer) throws IgniteCheckedException {
checkJta();
- return getAllAsync(keys, deserializeBinary).get();
+ return getAllAsync(keys, deserializeBinary, needVer).get();
}
/**
* @param keys Keys.
* @param deserializeBinary Deserialize binary flag.
+ * @param needVer Need version.
* @return Read future.
*/
public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys,
- boolean deserializeBinary) {
+ boolean deserializeBinary, boolean needVer) {
String taskName = ctx.kernalContext().job().currentTaskName();
return getAllAsync(keys,
@@ -4579,7 +4761,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
deserializeBinary,
/*skip vals*/false,
- /*can remap*/true);
+ /*can remap*/true,
+ needVer);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index fc48b9d..e875df0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -49,12 +49,12 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManager;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.GPC;
@@ -1882,6 +1883,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @param keepCacheObjects Keep cache objects flag.
* @param deserializeBinary Deserialize binary flag.
* @param cpy Copy flag.
+ * @param ver GridCacheVersion.
*/
@SuppressWarnings("unchecked")
public <K1, V1> void addResult(Map<K1, V1> map,
@@ -1890,7 +1892,8 @@ public class GridCacheContext<K, V> implements Externalizable {
boolean skipVals,
boolean keepCacheObjects,
boolean deserializeBinary,
- boolean cpy) {
+ boolean cpy,
+ final GridCacheVersion ver) {
assert key != null;
assert val != null || skipVals;
@@ -1902,10 +1905,32 @@ public class GridCacheContext<K, V> implements Externalizable {
assert key0 != null : key;
assert val0 != null : val;
- map.put((K1)key0, (V1)val0);
+ map.put((K1)key0, ver != null ? (V1)new T2<>(val0, ver) : (V1)val0);
}
else
- map.put((K1)key, (V1)(skipVals ? true : val));
+ map.put((K1)key,
+ (V1)(ver != null ?
+ (V1)new T2<>(skipVals ? true : val, ver) :
+ skipVals ? true : val));
+ }
+
+ /**
+ * @param map Map.
+ * @param key Key.
+ * @param val Value.
+ * @param skipVals Skip values flag.
+ * @param keepCacheObjects Keep cache objects flag.
+ * @param deserializeBinary Deserialize binary flag.
+ * @param cpy Copy flag.
+ */
+ public <K1, V1> void addResult(Map<K1, V1> map,
+ KeyCacheObject key,
+ CacheObject val,
+ boolean skipVals,
+ boolean keepCacheObjects,
+ boolean deserializeBinary,
+ boolean cpy) {
+ addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, cpy, null);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 2d25d16..64cfd01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -882,7 +882,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateTtl(expiryPlc);
if (retVer) {
- resVer = isNear() ? ((GridNearCacheEntry)this).dhtVersion() : this.ver;
+ resVer = (isNear() && cctx.transactional()) ? ((GridNearCacheEntry)this).dhtVersion() : this.ver;
if (resVer == null)
ret = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 3a53942..9b4aff3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -31,6 +31,7 @@ import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
@@ -307,6 +308,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
+ @Nullable @Override public CacheEntry<K, V> getEntry(K key) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.getEntry(key);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public V getTopologySafe(K key) throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);
@@ -331,6 +344,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<CacheEntry<K, V>> getEntryAsync(K key) {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.getEntryAsync(key);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public V getForcePrimary(K key) throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);
@@ -451,6 +476,19 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
+ @Override public Collection<CacheEntry<K, V>> getEntries(
+ @Nullable Collection<? extends K> keys) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.getEntries(keys);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys) {
CacheOperationContext prev = gate.enter(opCtx);
@@ -463,6 +501,19 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(
+ @Nullable Collection<? extends K> keys) {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.getEntriesAsync(keys);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public V getAndPut(K key, V val)
throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 9e66d4d..5ed8753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -44,6 +44,7 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheManager;
import org.apache.ignite.cache.CacheMetrics;
@@ -873,6 +874,31 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
+ @Override public CacheEntry<K, V> getEntry(K key) {
+ try {
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
+
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getEntryAsync(key));
+
+ return null;
+ }
+ else
+ return delegate.getEntry(key);
+ }
+ finally {
+ onLeave(gate, prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public Map<K, V> getAll(Set<? extends K> keys) {
try {
GridCacheGateway<K, V> gate = this.gate;
@@ -898,6 +924,31 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
+ @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) {
+ try {
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
+
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getEntriesAsync(keys));
+
+ return null;
+ }
+ else
+ return delegate.getEntries(keys);
+ }
+ finally {
+ onLeave(gate, prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
try {
GridCacheGateway<K, V> gate = this.gate;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 433290c..68d0f06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -31,6 +31,7 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
@@ -335,6 +336,28 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
@Nullable public V get(K key) throws IgniteCheckedException;
/**
+ * Retrieves value mapped to the specified key from cache. Value will only be returned if
+ * its entry passed the optional filter provided. Filter check is atomic, and therefore the
+ * returned value is guaranteed to be consistent with the filter. The return value of {@code null}
+ * means entry did not pass the provided filter or cache has no mapping for the
+ * key.
+ * <p>
+ * If the value is not present in cache, then it will be looked up from swap storage. If
+ * it's not present in swap, or if swap is disable, and if read-through is allowed, value
+ * will be loaded from {@link CacheStore} persistent storage via
+ * <code>CacheStore#load(Transaction, Object)</code> method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ *
+ * @param key Key to retrieve the value for.
+ * @return Value for the given key.
+ * @throws IgniteCheckedException If get operation failed.
+ * @throws NullPointerException if the key is {@code null}.
+ */
+ @Nullable public CacheEntry<K, V> getEntry(K key) throws IgniteCheckedException;
+
+ /**
* Asynchronously retrieves value mapped to the specified key from cache. Value will only be returned if
* its entry passed the optional filter provided. Filter check is atomic, and therefore the
* returned value is guaranteed to be consistent with the filter. The return value of {@code null}
@@ -356,6 +379,27 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
public IgniteInternalFuture<V> getAsync(K key);
/**
+ * Asynchronously retrieves value mapped to the specified key from cache. Value will only be returned if
+ * its entry passed the optional filter provided. Filter check is atomic, and therefore the
+ * returned value is guaranteed to be consistent with the filter. The return value of {@code null}
+ * means entry did not pass the provided filter or cache has no mapping for the
+ * key.
+ * <p>
+ * If the value is not present in cache, then it will be looked up from swap storage. If
+ * it's not present in swap, or if swap is disabled, and if read-through is allowed, value
+ * will be loaded from {@link CacheStore} persistent storage via
+ * <code>CacheStore#load(Transaction, Object)</code> method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ *
+ * @param key Key for the value to get.
+ * @return Future for the get operation.
+ * @throws NullPointerException if the key is {@code null}.
+ */
+ public IgniteInternalFuture<CacheEntry<K, V>> getEntryAsync(K key);
+
+ /**
* Retrieves values mapped to the specified keys from cache. Value will only be returned if
* its entry passed the optional filter provided. Filter check is atomic, and therefore the
* returned value is guaranteed to be consistent with the filter. If requested key-value pair
@@ -377,6 +421,27 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
public Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException;
/**
+ * Retrieves values mapped to the specified keys from cache. Value will only be returned if
+ * its entry passed the optional filter provided. Filter check is atomic, and therefore the
+ * returned value is guaranteed to be consistent with the filter. If requested key-value pair
+ * is not present in the returned map, then it means that its entry did not pass the provided
+ * filter or cache has no mapping for the key.
+ * <p>
+ * If some value is not present in cache, then it will be looked up from swap storage. If
+ * it's not present in swap, or if swap is disabled, and if read-through is allowed, value
+ * will be loaded from {@link CacheStore} persistent storage via
+ * <code>CacheStore#loadAll(Transaction, Collection, org.apache.ignite.lang.IgniteBiInClosure)</code> method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ *
+ * @param keys Keys to get.
+ * @return Map of key-value pairs.
+ * @throws IgniteCheckedException If get operation failed.
+ */
+ public Collection<CacheEntry<K, V>> getEntries(@Nullable Collection<? extends K> keys) throws IgniteCheckedException;
+
+ /**
* Asynchronously retrieves values mapped to the specified keys from cache. Value will only be returned if
* its entry passed the optional filter provided. Filter check is atomic, and therefore the
* returned value is guaranteed to be consistent with the filter. If requested key-value pair
@@ -397,6 +462,26 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys);
/**
+ * Asynchronously retrieves values mapped to the specified keys from cache. Value will only be returned if
+ * its entry passed the optional filter provided. Filter check is atomic, and therefore the
+ * returned value is guaranteed to be consistent with the filter. If requested key-value pair
+ * is not present in the returned map, then it means that its entry did not pass the provided
+ * filter or cache has no mapping for the key.
+ * <p>
+ * If some value is not present in cache, then it will be looked up from swap storage. If
+ * it's not present in swap, or if swap is disabled, and if read-through is allowed, value
+ * will be loaded from {@link CacheStore} persistent storage via
+ * <code>CacheStore#loadAll(Transaction, Collection, org.apache.ignite.lang.IgniteBiInClosure)</code> method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ *
+ * @param keys Key for the value to get.
+ * @return Future for the get operation.
+ */
+ public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(@Nullable Collection<? extends K> keys);
+
+ /**
* Stores given key-value pair in cache. If filters are provided, then entries will
* be stored in cache only if they pass the filter. Note that filter check is atomic,
* so value stored in cache is guaranteed to be consistent with the filters. If cache
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 7efaf49..28c94dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -153,21 +153,6 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
}
/**
- * @param map Result map.
- * @param key Key.
- * @param val Value.
- * @param ver Version.
- */
- @SuppressWarnings("unchecked")
- protected final void versionedResult(Map map, KeyCacheObject key, Object val, GridCacheVersion ver) {
- assert needVer;
- assert skipVals || val != null;
- assert ver != null;
-
- map.put(key, new T2<>(skipVals ? true : val, ver));
- }
-
- /**
* Affinity node to send get request to.
*
* @param affNodes All affinity nodes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 9cf8084..5be4e72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -617,6 +617,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @param keys {@inheritDoc}
* @param forcePrimary {@inheritDoc}
* @param skipTx {@inheritDoc}
+ * @param needVer Need version.
* @return {@inheritDoc}
*/
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
@@ -627,7 +628,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
String taskName,
boolean deserializeBinary,
boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -640,7 +642,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
forcePrimary,
null,
skipVals,
- canRemap);
+ canRemap,
+ needVer);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index cb8c842..c926c13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -383,7 +383,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
/*deserialize binary*/false,
skipVals,
/*keep cache objects*/true,
- /*skip store*/!readThrough);
+ /*skip store*/!readThrough,
+ false);
}
}
else {
@@ -413,7 +414,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
/*deserialize binary*/false,
skipVals,
/*keep cache objects*/true,
- /*skip store*/!readThrough);
+ /*skip store*/!readThrough,
+ false);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d8b2f37..41b28d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -946,7 +946,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (retVal ||
!F.isEmpty(e.entryProcessors()) ||
!F.isEmpty(e.filters()) ||
- e.serializableReadVersion() != null) {
+ e.entryReadVersion() != null) {
if (map == null)
map = new HashMap<>();
@@ -1013,7 +1013,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
throws IgniteCheckedException {
try {
for (IgniteTxEntry entry : entries) {
- GridCacheVersion serReadVer = entry.serializableReadVersion();
+ GridCacheVersion serReadVer = entry.entryReadVersion();
if (serReadVer != null) {
entry.cached().unswap();
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 1f2d7c5..2c9a760 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -490,17 +490,14 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
cache.removeIfObsolete(key);
}
else {
- if (needVer)
- versionedResult(locVals, key, v, ver);
- else {
- cctx.addResult(locVals,
- key,
- v,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- true);
- }
+ cctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ true,
+ ver);
return true;
}
@@ -552,17 +549,14 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
for (GridCacheEntryInfo info : infos) {
assert skipVals == (info.value() == null);
- if (needVer)
- versionedResult(map, info.key(), info.value(), info.version());
- else {
- cctx.addResult(map,
- info.key(),
- info.value(),
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false);
- }
+ cctx.addResult(map,
+ info.key(),
+ info.value(),
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ needVer ? info.version() : null);
}
return map;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 0c811ae..01e61bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -625,20 +625,13 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
assert !skipVals;
if (val != null) {
- if (needVer) {
- assert ver != null;
+ if (!keepCacheObjects) {
+ Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary);
- onDone(new T2<>(val, ver));
- }
- else {
- if (!keepCacheObjects) {
- Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary);
-
- onDone(res);
- }
- else
- onDone(val);
+ onDone(needVer ? new T2<>(res, ver) : res);
}
+ else
+ onDone(needVer ? new T2<>(val, ver) : val);
}
else
onDone(null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index cba4e61..b806906 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -317,7 +318,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override protected V get(K key, String taskName, boolean deserializeBinary) throws IgniteCheckedException {
+ @Override protected V get(K key, String taskName, boolean deserializeBinary, boolean needVer)
+ throws IgniteCheckedException {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
if (keyCheck)
@@ -339,7 +341,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiryPlc,
false,
skipStore,
- true).get();
+ true,
+ needVer).get();
}
/** {@inheritDoc} */
@@ -350,7 +353,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- final boolean canRemap) {
+ final boolean canRemap,
+ final boolean needVer) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
if (keyCheck)
@@ -376,7 +380,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiryPlc,
skipVals,
skipStore,
- canRemap);
+ canRemap,
+ needVer);
}
});
}
@@ -390,7 +395,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- final boolean canRemap
+ final boolean canRemap,
+ final boolean needVer
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -420,7 +426,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiryPlc,
skipVals,
skipStore,
- canRemap);
+ canRemap,
+ needVer);
}
});
}
@@ -1098,6 +1105,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param skipVals Skip values flag.
* @param skipStore Skip store flag.
* @param canRemap Can remap flag.
+ * @param needVer Need version.
* @return Get future.
*/
private IgniteInternalFuture<V> getAsync0(KeyCacheObject key,
@@ -1108,7 +1116,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable ExpiryPolicy expiryPlc,
boolean skipVals,
boolean skipStore,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
ctx.shared().exchange().readyAffinityVersion();
@@ -1126,7 +1135,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiry,
skipVals,
canRemap,
- false,
+ needVer,
false);
fut.init();
@@ -1145,6 +1154,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
* @param skipStore Skip store flag.
+ * @param needVer Need version.
* @return Get future.
*/
private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys,
@@ -1155,7 +1165,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable ExpiryPolicy expiryPlc,
boolean skipVals,
boolean skipStore,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
ctx.shared().exchange().readyAffinityVersion();
@@ -1180,19 +1191,42 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (entry != null) {
boolean isNew = entry.isNewLocked();
- CacheObject v = entry.innerGet(null,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary);
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ true);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+ }
+ }
+ else {
+ v = entry.innerGet(null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+ }
// Entry was not in memory or in swap, so we remove it from cache.
if (v == null) {
@@ -1204,7 +1238,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
success = false;
}
else
- ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true);
+ ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true, ver);
}
else
success = false;
@@ -1256,7 +1290,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiry,
skipVals,
canRemap,
- false,
+ needVer,
false);
fut.init();
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 073043d..dc4b6bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -200,7 +200,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- boolean canRemap) {
+ boolean canRemap,
+ final boolean needVer) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
if (keyCheck)
@@ -218,7 +219,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
deserializeBinary,
skipVals,
false,
- opCtx != null && opCtx.skipStore());
+ opCtx != null && opCtx.skipStore(),
+ needVer);
return fut.chain(new CX1<IgniteInternalFuture<Map<Object, Object>>, V>() {
@SuppressWarnings("unchecked")
@@ -258,7 +260,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
skipVals,
canRemap,
- /*needVer*/false,
+ needVer,
/*keepCacheObjects*/false);
fut.init();
@@ -275,7 +277,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -297,7 +300,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
deserializeBinary,
skipVals,
false,
- opCtx != null && opCtx.skipStore());
+ opCtx != null && opCtx.skipStore(),
+ needVer);
}
}, opCtx);
}
@@ -318,7 +322,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
deserializeBinary,
skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
skipVals,
- canRemap);
+ canRemap,
+ needVer);
}
/** {@inheritDoc} */
@@ -345,6 +350,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
* @param canRemap Can remap flag.
+ * @param needVer Need version.
* @return Loaded values.
*/
public IgniteInternalFuture<Map<K, V>> loadAsync(
@@ -357,7 +363,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean deserializeBinary,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
- boolean canRemap) {
+ boolean canRemap,
+ boolean needVer) {
return loadAsync(keys,
readThrough,
forcePrimary,
@@ -367,7 +374,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
expiryPlc,
skipVals,
canRemap,
- false,
+ needVer,
false);
}
@@ -522,17 +529,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (locVals == null)
locVals = U.newHashMap(keys.size());
- if (needVer)
- locVals.put((K)key, (V)new T2<>((Object)(skipVals ? true : v), ver));
- else {
- ctx.addResult(locVals,
- key,
- v,
- skipVals,
- keepCacheObj,
- deserializeBinary,
- true);
- }
+ ctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObj,
+ deserializeBinary,
+ true,
+ ver);
}
}
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index a2d5adb..63c073d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -400,7 +400,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
String taskName,
boolean deserializeBinary,
boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -423,7 +424,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
skipVals ? null : opCtx != null ? opCtx.expiry() : null,
skipVals,
opCtx != null && opCtx.skipStore(),
- canRemap);
+ canRemap,
+ needVer);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 5bf18d9..c750be6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -230,6 +230,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
* @param skipVal Skip value flag.
* @param skipStore Skip store flag.
* @param canRemap Can remap flag.
+ * @param needVer Need version.
* @return Loaded values.
*/
public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable IgniteInternalTx tx,
@@ -241,7 +242,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
@Nullable ExpiryPolicy expiryPlc,
boolean skipVal,
boolean skipStore,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -261,7 +263,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
expiry,
skipVal,
canRemap,
- false,
+ needVer,
false);
// init() will register future for responses if future has remote mappings.
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index c0a1617..026fb4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -350,7 +350,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
null,
false,
/*skip store*/false,
- /*can remap*/true
+ /*can remap*/true,
+ false
).get().get(keyValue(false));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 9291001..06fc0a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -650,26 +650,25 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
*/
@SuppressWarnings("unchecked")
private void addResult(KeyCacheObject key, CacheObject v, GridCacheVersion ver) {
- if (needVer) {
- V val0 = (V)new T2<>(skipVals ? true : v, ver);
+ if (keepCacheObjects) {
+ K key0 = (K)key;
+ V val0 = needVer ?
+ (V)new T2<>(skipVals ? true : v, ver) :
+ (V)(skipVals ? true : v);
- add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
+ add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
}
else {
- if (keepCacheObjects) {
- K key0 = (K)key;
- V val0 = (V)(skipVals ? true : v);
-
- add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
- }
- else {
- K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
- V val0 = !skipVals ?
+ K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
+ V val0 = needVer ?
+ (V)new T2<>(!skipVals ?
+ (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
+ (V)Boolean.TRUE, ver) :
+ !skipVals ?
(V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
(V)Boolean.TRUE;
- add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
- }
+ add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
}
}
@@ -741,16 +740,14 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
assert skipVals == (info.value() == null);
- if (needVer)
- versionedResult(map, key, val, info.version());
- else
- cctx.addResult(map,
- key,
- val,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false);
+ cctx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ needVer ? info.version() : null);
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 4f9f227..52ebfc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -107,7 +107,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
if (txEntry != null) {
if (entry.context().isLocal()) {
- GridCacheVersion serReadVer = txEntry.serializableReadVersion();
+ GridCacheVersion serReadVer = txEntry.entryReadVersion();
if (serReadVer != null) {
GridCacheContext ctx = entry.context();
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index bae0327..b968e57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -279,6 +279,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
*/
private void prepareSingle(IgniteTxEntry write, boolean topLocked) {
+ write.clearEntryReadVersion();
+
AffinityTopologyVersion topVer = tx.topologyVersion();
assert topVer.topologyVersion() > 0;
@@ -339,6 +341,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
for (IgniteTxEntry write : writes) {
+ write.clearEntryReadVersion();
+
GridDistributedTxMapping updated = map(write, topVer, cur, topLocked);
if (cur != updated) {