You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/12 07:13:53 UTC
[5/8] ignite git commit: ignite-4932 When possible for cache 'get'
read directly from offheap without entry creation.
ignite-4932 When possible for cache 'get' read directly from offheap without entry creation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01671827
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01671827
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01671827
Branch: refs/heads/ignite-5075-cacheStart
Commit: 01671827411ed6043e6bfb80514e3ff57fb40b18
Parents: 7ea5830
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 12 09:06:48 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 12 09:06:48 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 129 ++++--
.../cache/GridCacheConcurrentMap.java | 2 -
.../cache/GridCacheConcurrentMapImpl.java | 5 +-
.../processors/cache/GridCacheContext.java | 12 +
.../processors/cache/GridCacheEventManager.java | 32 ++
.../processors/cache/GridCacheMapEntry.java | 14 +-
.../cache/GridCacheMapEntryFactory.java | 6 +-
.../processors/cache/GridNoStorageCacheMap.java | 8 +-
.../cache/IgniteCacheOffheapManager.java | 7 +
.../cache/IgniteCacheOffheapManagerImpl.java | 17 +
.../distributed/GridDistributedCacheEntry.java | 8 +-
.../dht/GridCachePartitionedConcurrentMap.java | 10 +-
.../distributed/dht/GridDhtCacheAdapter.java | 8 +-
.../distributed/dht/GridDhtCacheEntry.java | 8 +-
.../distributed/dht/GridDhtLocalPartition.java | 4 +-
.../dht/GridPartitionedGetFuture.java | 158 +++++---
.../dht/GridPartitionedSingleGetFuture.java | 141 ++++---
.../dht/atomic/GridDhtAtomicCache.java | 226 ++++++-----
.../dht/atomic/GridDhtAtomicCacheEntry.java | 11 +-
.../dht/colocated/GridDhtColocatedCache.java | 222 ++++++-----
.../colocated/GridDhtColocatedCacheEntry.java | 11 +-
.../colocated/GridDhtDetachedCacheEntry.java | 10 +-
.../distributed/near/GridNearCacheAdapter.java | 6 +-
.../distributed/near/GridNearCacheEntry.java | 8 +-
.../cache/distributed/near/GridNearTxLocal.java | 7 +-
.../processors/cache/local/GridLocalCache.java | 6 +-
.../cache/local/GridLocalCacheEntry.java | 8 +-
.../local/atomic/GridLocalAtomicCache.java | 188 +++++----
.../ignite/spi/discovery/tcp/ServerImpl.java | 2 +
.../cache/IgniteCacheNoSyncForGetTest.java | 395 +++++++++++++++++++
.../IgniteCacheExpiryPolicyAbstractTest.java | 2 +-
.../loadtests/hashmap/GridHashMapLoadTest.java | 4 +-
.../testsuites/IgniteCacheTestSuite2.java | 3 +
.../cache/IgniteGetFromComputeBenchmark.java | 167 ++++++++
34 files changed, 1339 insertions(+), 506 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 0b1ab74..694f4b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -944,7 +944,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Entry (never {@code null}).
*/
public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) {
- GridCacheEntryEx e = map.putEntryIfObsoleteOrAbsent(topVer, key, null, true, false);
+ GridCacheEntryEx e = map.putEntryIfObsoleteOrAbsent(topVer, key, true, false);
assert e != null;
@@ -966,7 +966,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
cur = map.putEntryIfObsoleteOrAbsent(
topVer,
key,
- null,
create, touch);
}
@@ -1965,58 +1964,104 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough();
+ boolean readNoEntry = ctx.readNoEntry(expiry, readerArgs != null);
+
for (KeyCacheObject key : keys) {
while (true) {
- GridCacheEntryEx entry = entryEx(key);
-
- if (entry == null) {
- if (!skipVals && ctx.config().isStatisticsEnabled())
- ctx.cache().metrics0().onRead(false);
-
- break;
- }
-
try {
- EntryGetResult res;
+ EntryGetResult res = null;
boolean evt = !skipVals;
boolean updateMetrics = !skipVals;
- if (storeEnabled) {
- res = entry.innerGetAndReserveForLoad(updateMetrics,
- evt,
- subjId,
- taskName,
- expiry,
- !deserializeBinary,
- readerArgs);
+ GridCacheEntryEx entry = null;
- assert res != null;
+ boolean skipEntry = readNoEntry;
- if (res.value() == null) {
- if (misses == null)
- misses = new HashMap<>();
+ if (readNoEntry) {
+ CacheDataRow row = ctx.offheap().read(key);
- misses.put(key, res);
+ if (row != null) {
+ long expireTime = row.expireTime();
+
+ if (expireTime != 0) {
+ if (expireTime > U.currentTimeMillis()) {
+ res = new EntryGetWithTtlResult(row.value(),
+ row.version(),
+ false,
+ expireTime,
+ 0);
+ }
+ else
+ skipEntry = false;
+ }
+ else
+ res = new EntryGetResult(row.value(), row.version(), false);
+ }
+
+ if (res != null) {
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ row.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
- res = null;
+ if (updateMetrics && ctx.cache().configuration().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(true);
}
+ else if (storeEnabled)
+ skipEntry = false;
}
- else {
- res = entry.innerGetVersioned(
- null,
- null,
- updateMetrics,
- evt,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary,
- readerArgs);
-
- if (res == null)
- ctx.evicts().touch(entry, topVer);
+
+ if (!skipEntry) {
+ entry = entryEx(key);
+
+ if (entry == null) {
+ if (!skipVals && ctx.config().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(false);
+
+ break;
+ }
+
+ if (storeEnabled) {
+ res = entry.innerGetAndReserveForLoad(updateMetrics,
+ evt,
+ subjId,
+ taskName,
+ expiry,
+ !deserializeBinary,
+ readerArgs);
+
+ assert res != null;
+
+ if (res.value() == null) {
+ if (misses == null)
+ misses = new HashMap<>();
+
+ misses.put(key, res);
+
+ res = null;
+ }
+ }
+ else {
+ res = entry.innerGetVersioned(
+ null,
+ null,
+ updateMetrics,
+ evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary,
+ readerArgs);
+
+ if (res == null)
+ ctx.evicts().touch(entry, topVer);
+ }
}
if (res != null) {
@@ -2029,7 +2074,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
true,
needVer);
- if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
+ if (entry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
ctx.evicts().touch(entry, topVer);
if (keysSize == 1)
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
index 9378f74..0fe5c9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
@@ -38,7 +38,6 @@ public interface GridCacheConcurrentMap {
/**
* @param topVer Topology version.
* @param key Key.
- * @param val Value.
* @param create Create flag.
* @return Existing or new GridCacheMapEntry. Will return {@code null} if entry is obsolete or absent and create
* flag is set to {@code false}. Will also return {@code null} if create flag is set to {@code true}, but entry
@@ -47,7 +46,6 @@ public interface GridCacheConcurrentMap {
@Nullable public GridCacheMapEntry putEntryIfObsoleteOrAbsent(
AffinityTopologyVersion topVer,
KeyCacheObject key,
- @Nullable CacheObject val,
boolean create,
boolean touch);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
index 76d961a..2c262df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
@@ -107,7 +107,6 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
/** {@inheritDoc} */
@Nullable @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(final AffinityTopologyVersion topVer,
KeyCacheObject key,
- @Nullable final CacheObject val,
final boolean create,
final boolean touch) {
GridCacheMapEntry cur = null;
@@ -135,7 +134,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
reserved = true;
}
- created0 = factory.create(ctx, topVer, key, key.hashCode(), val);
+ created0 = factory.create(ctx, topVer, key);
}
cur = created = created0;
@@ -158,7 +157,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
reserved = true;
}
- created0 = factory.create(ctx, topVer, key, key.hashCode(), val);
+ created0 = factory.create(ctx, topVer, key);
}
cur = created = created0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 8d562c5..a0489fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2042,6 +2042,18 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * Checks if it is possible to directly read data memory without entry creation (this
+ * is optimization to avoid unnecessary blocking synchronization on cache entry).
+ *
+ * @param expiryPlc Optional expiry policy for read operation.
+ * @param readers {@code True} if need update near cache readers.
+ * @return {@code True} if it is possible to directly read offheap instead of using {@link GridCacheEntryEx#innerGet}.
+ */
+ public boolean readNoEntry(@Nullable IgniteCacheExpiryPolicy expiryPlc, boolean readers) {
+ return !config().isOnheapCacheEnabled() && !readers && expiryPlc == null;
+ }
+
+ /**
* @return {@code True} if fast eviction is allowed.
*/
public boolean allowFastEviction() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index be5b539..687b132 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
@@ -62,6 +63,37 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
}
/**
+ * @param key Key for event.
+ * @param tx Possible surrounding transaction.
+ * @param val Read value.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param keepBinary Keep binary flag.
+ */
+ public void readEvent(KeyCacheObject key,
+ @Nullable IgniteInternalTx tx,
+ @Nullable CacheObject val,
+ @Nullable UUID subjId,
+ @Nullable String taskName,
+ boolean keepBinary) {
+ if (isRecordable(EVT_CACHE_OBJECT_READ)) {
+ addEvent(cctx.affinity().partition(key),
+ key,
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ val,
+ val != null,
+ val,
+ val != null,
+ subjId,
+ null,
+ taskName,
+ keepBinary);
+ }
+ }
+
+ /**
* @param part Partition.
* @param key Key for the event.
* @param tx Possible surrounding transaction.
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 21c58fa..edf90d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -169,14 +169,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param cctx Cache context.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
protected GridCacheMapEntry(
GridCacheContext<?, ?> cctx,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class);
@@ -186,15 +182,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert key != null;
this.key = key;
- this.hash = hash;
+ this.hash = key.hashCode();
this.cctx = cctx;
- val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
-
- synchronized (this) {
- value(val);
- }
-
ver = cctx.versions().next();
startVer = ver.order();
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
index 4ee9385..d3e3921 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
@@ -27,15 +27,11 @@ public interface GridCacheMapEntryFactory {
* @param ctx Cache registry.
* @param topVer Topology version.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
* @return New cache entry.
*/
public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
index 00827ee..14a8482 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
@@ -45,10 +45,12 @@ public class GridNoStorageCacheMap implements GridCacheConcurrentMap {
}
/** {@inheritDoc} */
- @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key,
- @Nullable CacheObject val, boolean create, boolean touch) {
+ @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer,
+ KeyCacheObject key,
+ boolean create,
+ boolean touch) {
if (create)
- return new GridDhtCacheEntry(ctx, topVer, key, key.hashCode(), val);
+ return new GridDhtCacheEntry(ctx, topVer, key);
else
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9eb5368..b476aeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -71,6 +71,13 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
@Nullable public CacheDataRow read(GridCacheMapEntry entry) throws IgniteCheckedException;
/**
+ * @param key Key.
+ * @return Cached row, if available, null otherwise.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public CacheDataRow read(KeyCacheObject key) throws IgniteCheckedException;
+
+ /**
* @param p Partition.
* @return Data store.
* @throws IgniteCheckedException If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 650f65e..099840a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -362,6 +362,23 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
+ @Nullable @Override public CacheDataRow read(KeyCacheObject key) throws IgniteCheckedException {
+ CacheDataRow row;
+
+ if (cctx.isLocal())
+ row = locCacheDataStore.find(key);
+ else {
+ GridDhtLocalPartition part = cctx.topology().localPartition(cctx.affinity().partition(key), null, false);
+
+ row = part != null ? dataStore(part).find(key) : null;
+ }
+
+ assert row == null || row.value() != null : row;
+
+ return row;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean containsKey(GridCacheMapEntry entry) {
try {
return read(entry) != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index f518934..e7675b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -49,16 +49,12 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
/**
* @param ctx Cache context.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
public GridDistributedCacheEntry(
GridCacheContext ctx,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, key, hash, val);
+ super(ctx, key);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
index 357bf89..f021b65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
@@ -79,20 +79,22 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
}
/** {@inheritDoc} */
- @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key,
- @Nullable CacheObject val, boolean create, boolean touch) {
+ @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer,
+ KeyCacheObject key,
+ boolean create,
+ boolean touch) {
while (true) {
GridDhtLocalPartition part = localPartition(key, topVer, create);
if (part == null)
return null;
- GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, val, create, touch);
+ GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, create, touch);
if (res != null || !create)
return res;
- // Otherwise parttion was concurrently evicted and should be re-created on next iteration.
+ // Otherwise partition was concurrently evicted and should be re-created on next iteration.
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 7e6ae81..121c734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -246,11 +246,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- return new GridDhtCacheEntry(ctx, topVer, key, hash, val);
+ return new GridDhtCacheEntry(ctx, topVer, key);
}
};
}
@@ -428,7 +426,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @return Cache entry.
*/
protected GridDistributedCacheEntry createEntry(KeyCacheObject key) {
- return new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0);
+ return new GridDhtDetachedCacheEntry(ctx, key);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 4c22090..be7805f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -76,17 +76,13 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
* @param ctx Cache context.
* @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
public GridDhtCacheEntry(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, key, hash, val);
+ super(ctx, key);
// Record this entry with partition.
int p = cctx.affinity().partition(key);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 5425954..4208a98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -900,7 +900,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
CacheDataRow row = it0.next();
GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx.affinity().affinityTopologyVersion(),
- row.key(), null, true, false);
+ row.key(),
+ true,
+ false);
if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
if (rec) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 4dc4eb4..362432c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -445,75 +446,109 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
GridDhtCacheAdapter<K, V> cache = cache();
+ boolean readNoEntry = cctx.readNoEntry(expiryPlc, false);
+ boolean evt = !skipVals;
+
while (true) {
try {
- GridCacheEntryEx entry = cache.entryEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary,
- null);
-
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
+ boolean skipEntry = readNoEntry;
+
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (readNoEntry) {
+ CacheDataRow row = cctx.offheap().read(key);
+
+ if (row != null) {
+ long expireTime = row.expireTime();
+
+ if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
+ v = row.value();
+
+ if (needVer)
+ ver = row.version();
+
+ if (evt) {
+ cctx.events().readEvent(key,
+ null,
+ row.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
+ else
+ skipEntry = false;
}
- else {
- v = entry.innerGet(
- null,
- null,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
- }
+ }
- cache.context().evicts().touch(entry, topVer);
+ if (!skipEntry) {
+ GridCacheEntryEx entry = cache.entryEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- cache.removeEntry(entry);
- }
- else {
- cctx.addResult(locVals,
- key,
- v,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- true,
- getRes,
- ver,
- 0,
- 0,
- needVer);
-
- return true;
+ cache.context().evicts().touch(entry, topVer);
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ cache.removeEntry(entry);
+ }
}
}
+ if (v != null) {
+ cctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+
+ return true;
+ }
+
boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
// Entry not found, do not continue search if topology did not change and there is no store.
@@ -604,9 +639,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
*/
private class MiniFuture extends GridFutureAdapter<Map<K, V>> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private final IgniteUuid futId = IgniteUuid.randomUuid();
/** Node ID. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index dbf1fe1..63ed9a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
@@ -288,7 +289,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
expiryPlc != null ? expiryPlc.forCreate() : -1L,
expiryPlc != null ? expiryPlc.forAccess() : -1L,
skipVals,
- /**add reader*/false,
+ /*add reader*/false,
needVer,
cctx.deploymentEnabled(),
recovery);
@@ -347,67 +348,101 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
GridDhtCacheAdapter colocated = cctx.dht();
+ boolean readNoEntry = cctx.readNoEntry(expiryPlc, false);
+ boolean evt = !skipVals;
+
while (true) {
try {
- GridCacheEntryEx entry = colocated.entryEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- EntryGetResult res = entry.innerGetVersioned(
- null,
- null,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- true,
- null);
-
- if (res != null) {
- v = res.value();
- ver = res.version();
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ boolean skipEntry = readNoEntry;
+
+ if (readNoEntry) {
+ CacheDataRow row = cctx.offheap().read(key);
+
+ if (row != null) {
+ long expireTime = row.expireTime();
+
+ if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
+ v = row.value();
+
+ if (needVer)
+ ver = row.version();
+
+ if (evt) {
+ cctx.events().readEvent(key,
+ null,
+ row.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
+ else
+ skipEntry = false;
}
- else {
- v = entry.innerGet(
- null,
- null,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- true);
- }
+ }
- colocated.context().evicts().touch(entry, topVer);
+ if (!skipEntry) {
+ GridCacheEntryEx entry = colocated.entryEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ if (needVer) {
+ EntryGetResult res = entry.innerGetVersioned(
+ null,
+ null,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true,
+ null);
+
+ if (res != null) {
+ v = res.value();
+ ver = res.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true);
+ }
+
+ colocated.context().evicts().touch(entry, topVer);
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- colocated.removeEntry(entry);
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ colocated.removeEntry(entry);
+ }
}
- else {
- if (!skipVals && cctx.config().isStatisticsEnabled())
- cctx.cache().metrics0().onRead(true);
+ }
- if (!skipVals)
- setResult(v, ver);
- else
- setSkipValueResult(true, ver);
+ if (v != null) {
+ if (!skipVals && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(true);
- return true;
- }
+ if (!skipVals)
+ setResult(v, ver);
+ else
+ setSkipValueResult(true, ver);
+
+ return true;
}
boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e477592..47572fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
@@ -196,11 +197,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val);
+ return new GridDhtAtomicCacheEntry(ctx, topVer, key);
}
};
}
@@ -1473,105 +1472,156 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
+ final boolean evt = !skipVals;
+
// Optimisation: try to resolve value locally and escape 'get future' creation.
if (!forcePrimary && ctx.affinityNode()) {
- Map<K, V> locVals = U.newHashMap(keys.size());
-
- boolean success = true;
-
- // Optimistically expect that all keys are available locally (avoid creation of get future).
- for (KeyCacheObject key : keys) {
- GridCacheEntryEx entry = null;
-
- while (true) {
- try {
- entry = entryEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /*update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
+ try {
+ Map<K, V> locVals = U.newHashMap(keys.size());
+
+ boolean success = true;
+ boolean readNoEntry = ctx.readNoEntry(expiry, false);
+
+ // Optimistically expect that all keys are available locally (avoid creation of get future).
+ for (KeyCacheObject key : keys) {
+ if (readNoEntry) {
+ CacheDataRow row = ctx.offheap().read(key);
+
+ if (row != null) {
+ long expireTime = row.expireTime();
+
+ if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
+ ctx.addResult(locVals,
+ key,
+ row.value(),
+ skipVals,
+ false,
+ deserializeBinary,
true,
- null);
-
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
- }
- }
- else {
- v = entry.innerGet(
null,
- null,
- /*read-through*/false,
- /*update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary);
- }
-
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- GridCacheVersion obsoleteVer = context().versions().next();
-
- if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeEntry(entry);
-
- success = false;
+ row.version(),
+ 0,
+ 0,
+ needVer);
+
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ row.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
else
- ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true,
- getRes, ver, 0, 0, needVer);
+ success = false;
}
else
success = false;
-
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
}
- catch (GridDhtInvalidPartitionException ignored) {
- success = false;
+ else {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = entryEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ true,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+ }
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(context().versions().next()))
+ removeEntry(entry);
+
+ success = false;
+ }
+ else {
+ ctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+ }
+ }
+ else
+ success = false;
- break; // While.
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- finally {
- if (entry != null)
- ctx.evicts().touch(entry, topVer);
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ success = false;
+
+ break; // While.
+ }
+ finally {
+ if (entry != null)
+ ctx.evicts().touch(entry, topVer);
+ }
+ }
}
- }
- if (!success)
- break;
- else if (!skipVals && ctx.config().isStatisticsEnabled())
- metrics0().onRead(true);
- }
+ if (!success)
+ break;
+ else if (!skipVals && ctx.config().isStatisticsEnabled())
+ metrics0().onRead(true);
+ }
- if (success) {
- sendTtlUpdateRequest(expiry);
+ if (success) {
+ sendTtlUpdateRequest(expiry);
- return new GridFinishedFuture<>(locVals);
+ return new GridFinishedFuture<>(locVals);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
index 3f014d5..b0c9a64 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -33,17 +32,13 @@ public class GridDhtAtomicCacheEntry extends GridDhtCacheEntry {
* @param ctx Cache context.
* @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
- public GridDhtAtomicCacheEntry(
+ GridDhtAtomicCacheEntry(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, topVer, key, hash, val);
+ super(ctx, topVer, key);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 2292cb2..12a3912 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
@@ -120,11 +121,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- return new GridDhtColocatedCacheEntry(ctx, topVer, key, hash, val);
+ return new GridDhtColocatedCacheEntry(ctx, topVer, key);
}
};
}
@@ -458,118 +457,161 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
expiryPlc = expiryPolicy(null);
// Optimisation: try to resolve value locally and escape 'get future' creation.
- if (!forcePrimary) {
- Map<K, V> locVals = null;
+ if (!forcePrimary && ctx.affinityNode()) {
+ try {
+ Map<K, V> locVals = null;
- boolean success = true;
+ boolean success = true;
+ boolean readNoEntry = ctx.readNoEntry(expiryPlc, false);
+ boolean evt = !skipVals;
- // Optimistically expect that all keys are available locally (avoid creation of get future).
- for (KeyCacheObject key : keys) {
- GridCacheEntryEx entry = null;
-
- while (true) {
- try {
- entry = entryEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary,
- null);
-
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
- }
- }
- else {
- v = entry.innerGet(
- null,
- null,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
- }
-
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- GridCacheVersion obsoleteVer = context().versions().next();
+ for (KeyCacheObject key : keys) {
+ if (readNoEntry) {
+ CacheDataRow row = ctx.offheap().read(key);
- if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeEntry(entry);
+ if (row != null) {
+ long expireTime = row.expireTime();
- success = false;
- }
- else {
+ if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
if (locVals == null)
locVals = U.newHashMap(keys.size());
ctx.addResult(locVals,
key,
- v,
+ row.value(),
skipVals,
keepCacheObj,
deserializeBinary,
true,
- getRes,
- ver,
+ null,
+ row.version(),
0,
0,
needVer);
+
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ row.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
+ else
+ success = false;
}
else
success = false;
-
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
}
- catch (GridDhtInvalidPartitionException ignored) {
- success = false;
+ else {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = entryEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ GridCacheVersion obsoleteVer = context().versions().next();
+
+ if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
+ removeEntry(entry);
+
+ success = false;
+ }
+ else {
+ if (locVals == null)
+ locVals = U.newHashMap(keys.size());
+
+ ctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObj,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+ }
+ }
+ else
+ success = false;
- break; // While.
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- finally {
- if (entry != null)
- context().evicts().touch(entry, topVer);
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ success = false;
+
+ break; // While.
+ }
+ finally {
+ if (entry != null)
+ context().evicts().touch(entry, topVer);
+ }
+ }
}
- }
- if (!success)
- break;
- else if (!skipVals && ctx.config().isStatisticsEnabled())
- ctx.cache().metrics0().onRead(true);
- }
+ if (!success)
+ break;
+ else if (!skipVals && ctx.config().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(true);
+ }
- if (success) {
- sendTtlUpdateRequest(expiryPlc);
+ if (success) {
+ sendTtlUpdateRequest(expiryPlc);
- return new GridFinishedFuture<>(locVals);
+ return new GridFinishedFuture<>(locVals);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
index cc71e11..f7cc5a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -32,17 +31,13 @@ public class GridDhtColocatedCacheEntry extends GridDhtCacheEntry {
* @param ctx Cache context.
* @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
- public GridDhtColocatedCacheEntry(
+ GridDhtColocatedCacheEntry(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, topVer, key, hash, val);
+ super(ctx, topVer, key);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index ac81b63..404265d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
@@ -36,14 +35,9 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
/**
* @param ctx Cache context.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
- * @param next Next entry in the linked list.
- * @param hdrId Header ID.
*/
- public GridDhtDetachedCacheEntry(GridCacheContext ctx, KeyCacheObject key, int hash, CacheObject val,
- GridCacheMapEntry next, int hdrId) {
- super(ctx, key, hash, val);
+ public GridDhtDetachedCacheEntry(GridCacheContext ctx, KeyCacheObject key) {
+ super(ctx, key);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 59d986a..0b25f58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -95,11 +95,9 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- return new GridNearCacheEntry(ctx, key, hash, val);
+ return new GridNearCacheEntry(ctx, key);
}
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index fa098df..b17d0b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -66,16 +66,12 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
/**
* @param ctx Cache context.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
public GridNearCacheEntry(
GridCacheContext ctx,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, key, hash, val);
+ super(ctx, key);
part = ctx.affinity().partition(key);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index f795ddc..9ad084e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3878,17 +3878,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
/**
* @param cctx Cache context.
* @param key Key.
- * @param val Value.
* @param filter Filter.
* @return {@code True} if filter passed.
*/
private boolean isAll(GridCacheContext cctx,
KeyCacheObject key,
- CacheObject val,
+ final CacheObject val0,
CacheEntryPredicate[] filter) {
- GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) {
+ GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key) {
@Nullable @Override public CacheObject peekVisibleValue() {
- return rawGet();
+ return val0;
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 94f618a..5e3dc3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -86,11 +86,9 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- return new GridLocalCacheEntry(ctx, key, hash, val);
+ return new GridLocalCacheEntry(ctx, key);
}
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index 3e93917..421b32a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -38,16 +38,12 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
/**
* @param ctx Cache registry.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
GridLocalCacheEntry(
GridCacheContext ctx,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, key, hash, val);
+ super(ctx, key);
}
/** {@inheritDoc} */