You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/02/13 10:32:11 UTC
[14/31] ignite git commit: ignite-4525 - Near reader is created when
value is loaded from store.
ignite-4525 - Near reader is created when value is loaded from store.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b54a4813
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b54a4813
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b54a4813
Branch: refs/heads/ignite-4436-2
Commit: b54a481315a45c7a6c8f70534f655e14b25cc439
Parents: d0c0bce
Author: dkarachentsev <dk...@gridgain.com>
Authored: Wed Jan 18 12:05:22 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Wed Jan 18 12:05:22 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 33 +-
.../processors/cache/GridCacheContext.java | 4 +-
.../processors/cache/GridCacheEntryEx.java | 20 +-
.../processors/cache/GridCacheMapEntry.java | 56 ++-
.../processors/cache/ReaderArguments.java | 74 +++
.../distributed/dht/GridDhtCacheAdapter.java | 9 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 85 ++--
.../distributed/dht/GridDhtGetSingleFuture.java | 75 ++-
.../dht/GridPartitionedGetFuture.java | 3 +-
.../dht/GridPartitionedSingleGetFuture.java | 3 +-
.../dht/atomic/GridDhtAtomicCache.java | 5 +-
.../dht/colocated/GridDhtColocatedCache.java | 3 +-
.../distributed/near/GridNearGetFuture.java | 6 +-
.../local/atomic/GridLocalAtomicCache.java | 5 +-
.../transactions/IgniteTxLocalAdapter.java | 35 +-
.../processors/cache/GridCacheTestEntryEx.java | 11 +-
.../near/GridNearCacheStoreUpdateTest.java | 466 +++++++++++++++++++
.../GridNearOffheapCacheStoreUpdateTest.java | 35 ++
.../testsuites/IgniteCacheTestSuite2.java | 5 +
19 files changed, 770 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index fd9f396..59665bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1789,6 +1789,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
subjId = ctx.subjectIdPerCall(subjId, opCtx);
return getAllAsync(keys,
+ null,
opCtx == null || !opCtx.skipStore(),
!skipTx,
subjId,
@@ -1803,6 +1804,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @param keys Keys.
+ * @param readerArgs Near cache reader will be added if not null.
* @param readThrough Read through.
* @param checkTx Check tx.
* @param subjId Subj Id.
@@ -1817,6 +1819,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @see GridCacheAdapter#getAllAsync(Collection)
*/
public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys,
+ @Nullable final ReaderArguments readerArgs,
boolean readThrough,
boolean checkTx,
@Nullable final UUID subjId,
@@ -1834,6 +1837,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
return getAllAsync0(ctx.cacheKeysView(keys),
+ readerArgs,
readThrough,
checkTx,
subjId,
@@ -1848,6 +1852,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @param keys Keys.
+ * @param readerArgs Near cache reader will be added if not null.
* @param readThrough Read-through flag.
* @param checkTx Check local transaction flag.
* @param subjId Subject ID.
@@ -1862,6 +1867,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
@Nullable final Collection<KeyCacheObject> keys,
+ @Nullable final ReaderArguments readerArgs,
final boolean readThrough,
boolean checkTx,
@Nullable final UUID subjId,
@@ -1932,7 +1938,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
subjId,
taskName,
expiry,
- !deserializeBinary);
+ !deserializeBinary,
+ readerArgs);
assert res != null;
@@ -1957,7 +1964,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
null,
taskName,
expiry,
- !deserializeBinary);
+ !deserializeBinary,
+ readerArgs);
if (res == null)
ctx.evicts().touch(entry, topVer);
@@ -2015,29 +2023,28 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
GridCacheEntryEx entry = entryEx(key);
try {
- GridCacheVersion verSet = entry.versionedValue(cacheVal,
+ T2<CacheObject, GridCacheVersion> verVal = entry.versionedValue(
+ cacheVal,
res.version(),
- null);
-
- boolean set = verSet != null;
+ null,
+ readerArgs);
if (log.isDebugEnabled())
log.debug("Set value loaded from store into entry [" +
- "set=" + set +
- ", curVer=" + res.version() +
- ", newVer=" + verSet + ", " +
+ "oldVer=" + res.version() +
+ ", newVer=" + verVal.get2() + ", " +
"entry=" + entry + ']');
// Don't put key-value pair into result map if value is null.
- if (val != null) {
+ if (verVal.get1() != null) {
ctx.addResult(map,
key,
- cacheVal,
+ verVal.get1(),
skipVals,
keepCacheObjects,
deserializeBinary,
- false,
- needVer ? set ? verSet : res.version() : null);
+ true,
+ needVer ? verVal.get2() : null);
}
if (tx0 == null || (!tx0.implicit() &&
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 66b71b4..424e325 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1900,9 +1900,9 @@ public class GridCacheContext<K, V> implements Externalizable {
assert val != null || skipVals;
if (!keepCacheObjects) {
- Object key0 = unwrapBinaryIfNeeded(key, !deserializeBinary);
+ Object key0 = unwrapBinaryIfNeeded(key, !deserializeBinary, cpy);
- Object val0 = skipVals ? true : unwrapBinaryIfNeeded(val, !deserializeBinary);
+ Object val0 = skipVals ? true : unwrapBinaryIfNeeded(val, !deserializeBinary, cpy);
assert key0 != null : key;
assert val0 != null : val;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index b1d632f..51f423a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.T2;
import org.jetbrains.annotations.Nullable;
/**
@@ -318,11 +319,12 @@ public interface GridCacheEntryEx {
* @param taskName Task name.
* @param expiryPlc Expiry policy.
* @param keepBinary Keep binary flag.
+ * @param readerArgs Reader will be added if not null.
* @return Cached value and entry version.
* @throws IgniteCheckedException If loading value failed.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- @Nullable public EntryGetResult innerGetVersioned(
+ public EntryGetResult innerGetVersioned(
@Nullable GridCacheVersion ver,
IgniteInternalTx tx,
boolean readSwap,
@@ -333,7 +335,8 @@ public interface GridCacheEntryEx {
Object transformClo,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean keepBinary)
+ boolean keepBinary,
+ @Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
@@ -344,7 +347,7 @@ public interface GridCacheEntryEx {
* @param taskName Task name.
* @param expiryPlc Expiry policy.
* @param keepBinary Keep binary flag.
- * @return Cached value and entry version.
+ * @param readerArgs Reader will be added if not null.
* @throws IgniteCheckedException If loading value failed.
* @throws GridCacheEntryRemovedException If entry was removed.
* @return Cached value, entry version and flag indicating if entry was reserved.
@@ -355,7 +358,8 @@ public interface GridCacheEntryEx {
UUID subjId,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException;
+ boolean keepBinary,
+ @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
* @param ver Expected entry version.
@@ -751,13 +755,15 @@ public interface GridCacheEntryEx {
* @param val New value.
* @param curVer Version to match or {@code null} if match is not required.
* @param newVer Version to set.
- * @return Non null version if value was set.
+ * @param readerArgs Reader will be added if not null.
+ * @return Current version and value.
* @throws IgniteCheckedException If index could not be updated.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- public GridCacheVersion versionedValue(CacheObject val,
+ public T2<CacheObject, GridCacheVersion> versionedValue(CacheObject val,
@Nullable GridCacheVersion curVer,
- @Nullable GridCacheVersion newVer)
+ @Nullable GridCacheVersion newVer,
+ @Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9f0c2b0..59e4181 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -778,7 +778,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
expirePlc,
false,
keepBinary,
- false);
+ false,
+ null);
}
/** {@inheritDoc} */
@@ -788,7 +789,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
UUID subjId,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ boolean keepBinary,
+ @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException {
return (EntryGetResult)innerGet0(
/*ver*/null,
/*tx*/null,
@@ -803,11 +805,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
expiryPlc,
true,
keepBinary,
- /*reserve*/true);
+ /*reserve*/true,
+ readerArgs);
}
/** {@inheritDoc} */
- @Nullable @Override public EntryGetResult innerGetVersioned(
+ @Override public EntryGetResult innerGetVersioned(
@Nullable GridCacheVersion ver,
IgniteInternalTx tx,
boolean readSwap,
@@ -818,7 +821,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Object transformClo,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean keepBinary)
+ boolean keepBinary,
+ @Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException {
return (EntryGetResult)innerGet0(ver,
tx,
@@ -833,7 +837,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
expiryPlc,
true,
keepBinary,
- false);
+ false,
+ readerArgs);
}
/** {@inheritDoc} */
@@ -852,7 +857,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean retVer,
boolean keepBinary,
- boolean reserveForLoad
+ boolean reserveForLoad,
+ @Nullable ReaderArguments readerArgs
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert !(retVer && readThrough);
assert !(reserveForLoad && readThrough);
@@ -961,6 +967,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Cache version for optimistic check.
startVer = ver;
+ addReaderIfNeed(readerArgs);
+
if (ret != null) {
assert tmp || !(ret instanceof BinaryObjectOffheapImpl);
assert !obsolete;
@@ -1051,6 +1059,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
deletedUnlocked(false);
+
+ assert readerArgs == null;
}
if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
@@ -3611,19 +3621,22 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Override public synchronized GridCacheVersion versionedValue(CacheObject val,
+ @Override public synchronized T2<CacheObject, GridCacheVersion> versionedValue(CacheObject val,
GridCacheVersion curVer,
- GridCacheVersion newVer)
+ GridCacheVersion newVer,
+ @Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException
{
checkObsolete();
+ addReaderIfNeed(readerArgs);
+
if (curVer == null || curVer.equals(ver)) {
if (val != this.val) {
GridCacheMvcc mvcc = mvccExtras();
if (mvcc != null && !mvcc.isEmpty())
- return null;
+ return new T2<>(this.val, ver);
if (newVer == null)
newVer = cctx.versions().next();
@@ -3647,13 +3660,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Version does not change for load ops.
update(val, expTime, ttl, newVer, true);
- return newVer;
+ return new T2<>(val, newVer);
}
assert !evictionDisabled() : this;
}
- return null;
+ return new T2<>(this.val, ver);
+ }
+
+ /**
+ * @param readerArgs Reader arguments
+ */
+ private void addReaderIfNeed(@Nullable ReaderArguments readerArgs) {
+ if (readerArgs != null) {
+ assert this instanceof GridDhtCacheEntry : this;
+ assert Thread.holdsLock(this);
+
+ try {
+ ((GridDhtCacheEntry)this).addReader(readerArgs.reader(),
+ readerArgs.messageId(),
+ readerArgs.topologyVersion());
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert false : this;
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ReaderArguments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ReaderArguments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ReaderArguments.java
new file mode 100644
index 0000000..b8b5e64
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ReaderArguments.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Arguments required for adding near cache reader to entry.
+ */
+public class ReaderArguments {
+ /** */
+ private final UUID reader;
+
+ /** */
+ private final long msgId;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /**
+ * @param reader Near cache node ID.
+ * @param msgId Message ID.
+ * @param topVer Topology version.
+ */
+ public ReaderArguments(final UUID reader, final long msgId,
+ final AffinityTopologyVersion topVer) {
+ this.reader = reader;
+ this.msgId = msgId;
+ this.topVer = topVer;
+ }
+
+ /**
+ * @return Reader node ID.
+ */
+ public UUID reader() {
+ return reader;
+ }
+
+ /**
+ * @return Message ID.
+ */
+ public long messageId() {
+ return msgId;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReaderArguments.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index b2fb7b4..543cee1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -35,8 +35,8 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.ReaderArguments;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
@@ -623,6 +624,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
CacheOperationContext opCtx = ctx.operationContextPerCall();
return getAllAsync(keys,
+ null,
opCtx == null || !opCtx.skipStore(),
/*don't check local tx. */false,
subjId,
@@ -637,6 +639,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/**
* @param keys Keys to get
+ * @param readerArgs Reader will be added if not null.
* @param readThrough Read through flag.
* @param subjId Subject ID.
* @param taskName Task name.
@@ -647,6 +650,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
*/
IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> getDhtAllAsync(
Collection<KeyCacheObject> keys,
+ @Nullable final ReaderArguments readerArgs,
boolean readThrough,
@Nullable UUID subjId,
String taskName,
@@ -655,6 +659,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
boolean canRemap
) {
return getAllAsync0(keys,
+ readerArgs,
readThrough,
/*don't check local tx. */false,
subjId,
@@ -694,7 +699,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
reader,
keys,
readThrough,
- /*tx*/null,
topVer,
subjId,
taskNameHash,
@@ -738,7 +742,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
key,
addRdr,
readThrough,
- /*tx*/null,
topVer,
subjId,
taskNameHash,
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 913580f..3bf4489 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -29,7 +29,6 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -37,7 +36,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import org.apache.ignite.internal.processors.cache.ReaderArguments;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
@@ -50,7 +49,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -96,9 +94,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
/** Topology version .*/
private AffinityTopologyVersion topVer;
- /** Transaction. */
- private IgniteTxLocalEx tx;
-
/** Retries because ownership changed. */
private Collection<Integer> retries;
@@ -120,7 +115,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
* @param reader Reader.
* @param keys Keys.
* @param readThrough Read through flag.
- * @param tx Transaction.
* @param topVer Topology version.
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
@@ -133,7 +127,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
UUID reader,
Map<KeyCacheObject, Boolean> keys,
boolean readThrough,
- @Nullable IgniteTxLocalEx tx,
@NotNull AffinityTopologyVersion topVer,
@Nullable UUID subjId,
int taskNameHash,
@@ -150,7 +143,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
this.msgId = msgId;
this.keys = keys;
this.readThrough = readThrough;
- this.tx = tx;
this.topVer = topVer;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
@@ -159,7 +151,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
futId = IgniteUuid.randomUuid();
- ver = tx == null ? cctx.versions().next() : tx.xidVersion();
+ ver = cctx.versions().next();
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridDhtGetFuture.class);
@@ -340,6 +332,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
ClusterNode readerNode = cctx.discovery().node(reader);
+ ReaderArguments readerArgs = null;
+
if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) {
for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) {
while (true) {
@@ -351,12 +345,19 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
boolean addReader = (!e.deleted() && k.getValue() && !skipVals);
- if (addReader)
+ if (addReader) {
e.unswap(false);
+ // Entry will be removed on touch() if no data in cache,
+ // but they could be loaded from store,
+ // we have to add reader again later.
+ if (readerArgs == null)
+ readerArgs = new ReaderArguments(reader, msgId, topVer);
+ }
+
// Register reader. If there are active transactions for this entry,
// then will wait for their completion before proceeding.
- // TODO: GG-4003:
+ // TODO: IGNITE-3498:
// TODO: What if any transaction we wait for actually removes this entry?
// TODO: In this case seems like we will be stuck with untracked near entry.
// TODO: To fix, check that reader is contained in the list of readers once
@@ -392,28 +393,19 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
if (txFut == null || txFut.isDone()) {
- if (tx == null) {
- fut = cache().getDhtAllAsync(
- keys.keySet(),
- readThrough,
- subjId,
- taskName,
- expiryPlc,
- skipVals,
- /*can remap*/true);
- }
- else {
- fut = tx.getAllAsync(cctx,
- null,
- keys.keySet(),
- /*deserialize binary*/false,
- skipVals,
- /*keep cache objects*/true,
- /*skip store*/!readThrough,
- false);
- }
+ fut = cache().getDhtAllAsync(
+ keys.keySet(),
+ readerArgs,
+ readThrough,
+ subjId,
+ taskName,
+ expiryPlc,
+ skipVals,
+ /*can remap*/true);
}
else {
+ final ReaderArguments args = readerArgs;
+
// If we are here, then there were active transactions for some entries
// when we were adding the reader. In that case we must wait for those
// transactions to complete.
@@ -424,26 +416,15 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
if (e != null)
throw new GridClosureException(e);
- if (tx == null) {
- return cache().getDhtAllAsync(
- keys.keySet(),
- readThrough,
- subjId,
- taskName,
- expiryPlc,
- skipVals,
- /*can remap*/true);
- }
- else {
- return tx.getAllAsync(cctx,
- null,
- keys.keySet(),
- /*deserialize binary*/false,
- skipVals,
- /*keep cache objects*/true,
- /*skip store*/!readThrough,
- false);
- }
+ return cache().getDhtAllAsync(
+ keys.keySet(),
+ args,
+ readThrough,
+ subjId,
+ taskName,
+ expiryPlc,
+ skipVals,
+ /*can remap*/true);
}
}
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 9394937..49bebd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import org.apache.ignite.internal.processors.cache.ReaderArguments;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
@@ -90,9 +90,6 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
/** Topology version .*/
private AffinityTopologyVersion topVer;
- /** Transaction. */
- private IgniteTxLocalEx tx;
-
/** Retries because ownership changed. */
private Collection<Integer> retries;
@@ -115,7 +112,6 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
* @param key Key.
* @param addRdr Add reader flag.
* @param readThrough Read through flag.
- * @param tx Transaction.
* @param topVer Topology version.
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
@@ -129,7 +125,6 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
KeyCacheObject key,
Boolean addRdr,
boolean readThrough,
- @Nullable IgniteTxLocalEx tx,
@NotNull AffinityTopologyVersion topVer,
@Nullable UUID subjId,
int taskNameHash,
@@ -145,7 +140,6 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
this.key = key;
this.addRdr = addRdr;
this.readThrough = readThrough;
- this.tx = tx;
this.topVer = topVer;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
@@ -154,7 +148,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
futId = IgniteUuid.randomUuid();
- ver = tx == null ? cctx.versions().next() : tx.xidVersion();
+ ver = cctx.versions().next();
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridDhtGetSingleFuture.class);
@@ -306,6 +300,8 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
ClusterNode readerNode = cctx.discovery().node(reader);
+ ReaderArguments readerArgs = null;
+
if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) {
while (true) {
GridDhtCacheEntry e = cache().entryExx(key, topVer);
@@ -314,14 +310,21 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
if (e.obsolete())
continue;
- boolean addReader = (!e.deleted() && addRdr && !skipVals);
+ boolean addReader = (!e.deleted() && this.addRdr && !skipVals);
- if (addReader)
+ if (addReader) {
e.unswap(false);
+ // Entry will be removed on touch() if no data in cache,
+ // but they could be loaded from store,
+ // we have to add reader again later.
+ if (readerArgs == null)
+ readerArgs = new ReaderArguments(reader, msgId, topVer);
+ }
+
// Register reader. If there are active transactions for this entry,
// then will wait for their completion before proceeding.
- // TODO: GG-4003:
+ // TODO: IGNITE-3498:
// TODO: What if any transaction we wait for actually removes this entry?
// TODO: In this case seems like we will be stuck with untracked near entry.
// TODO: To fix, check that reader is contained in the list of readers once
@@ -348,28 +351,19 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
if (rdrFut == null || rdrFut.isDone()) {
- if (tx == null) {
- fut = cache().getDhtAllAsync(
- Collections.singleton(key),
- readThrough,
- subjId,
- taskName,
- expiryPlc,
- skipVals,
- /*can remap*/true);
- }
- else {
- fut = tx.getAllAsync(cctx,
- null,
- Collections.singleton(key),
- /*deserialize binary*/false,
- skipVals,
- /*keep cache objects*/true,
- /*skip store*/!readThrough,
- false);
- }
+ fut = cache().getDhtAllAsync(
+ Collections.singleton(key),
+ readerArgs,
+ readThrough,
+ subjId,
+ taskName,
+ expiryPlc,
+ skipVals,
+ /*can remap*/true);
}
else {
+ final ReaderArguments args = readerArgs;
+
rdrFut.listen(
new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> fut) {
@@ -381,29 +375,16 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
return;
}
- IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut0;
-
- if (tx == null) {
- fut0 = cache().getDhtAllAsync(
+ IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut0 =
+ cache().getDhtAllAsync(
Collections.singleton(key),
+ args,
readThrough,
subjId,
taskName,
expiryPlc,
skipVals,
/*can remap*/true);
- }
- else {
- fut0 = tx.getAllAsync(cctx,
- null,
- Collections.singleton(key),
- /*deserialize binary*/false,
- skipVals,
- /*keep cache objects*/true,
- /*skip store*/!readThrough,
- false
- );
- }
fut0.listen(createGetFutureListener());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index d0f209d..c8e2cf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -461,7 +461,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
null,
taskName,
expiryPlc,
- !deserializeBinary);
+ !deserializeBinary,
+ null);
if (res != null) {
v = res.value();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index e188a35..e369bfa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -385,7 +385,8 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
null,
taskName,
expiryPlc,
- true);
+ true,
+ null);
if (res != null) {
v = res.value();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 94a049e..f601e0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1505,7 +1505,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
taskName,
expiry,
- true);
+ true,
+ null);
if (res != null) {
v = res.value();
@@ -2292,7 +2293,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridCacheVersion ver = entry.version();
- entry.versionedValue(ctx.toCacheObject(v), null, ver);
+ entry.versionedValue(ctx.toCacheObject(v), null, ver, null);
}
catch (GridCacheEntryRemovedException e) {
assert false : "Entry should not get obsolete while holding lock [entry=" + entry +
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 56af95e..29f0607 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -496,7 +496,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
null,
taskName,
expiryPlc,
- !deserializeBinary);
+ !deserializeBinary,
+ null);
if (res != null) {
v = res.value();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index ab0e88c..8bc513e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -449,7 +449,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
null,
taskName,
expiryPlc,
- !deserializeBinary);
+ !deserializeBinary,
+ null);
if (res != null) {
v = res.value();
@@ -589,7 +590,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
null,
taskName,
expiryPlc,
- !deserializeBinary);
+ !deserializeBinary,
+ null);
if (res != null) {
v = res.value();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index d1acada..ad818a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -59,7 +59,6 @@ import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.resource.GridResourceIoc;
-import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
@@ -528,7 +527,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
null,
taskName,
expiry,
- !deserializeBinary);
+ !deserializeBinary,
+ null);
if (res != null) {
v = res.value();
@@ -602,6 +602,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
return getAllAsync(
keys,
+ null,
opCtx == null || !opCtx.skipStore(),
false,
subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 91c9c92..f05d90d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -438,7 +438,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
null,
resolveTaskName(),
expiryPlc,
- txEntry == null ? keepBinary : txEntry.keepBinary());
+ txEntry == null ? keepBinary : txEntry.keepBinary(),
+ null);
if (res == null) {
if (misses == null)
@@ -477,17 +478,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
GridCacheEntryEx entry = cacheCtx.cache().entryEx(key);
try {
- GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null);
-
- boolean set = setVer != null;
+ T2<CacheObject, GridCacheVersion> verVal = entry.versionedValue(cacheVal,
+ ver,
+ null,
+ null);
- if (set)
- ver = setVer;
+ if (log.isDebugEnabled()) {
+ log.debug("Set value loaded from store into entry [" +
+ "oldVer=" + ver +
+ ", newVer=" + verVal.get2() +
+ ", entry=" + entry + ']');
+ }
- if (log.isDebugEnabled())
- log.debug("Set value loaded from store into entry [set=" + set +
- ", curVer=" + ver + ", newVer=" + setVer + ", " +
- "entry=" + entry + ']');
+ ver = verVal.get2();
break;
}
@@ -1232,7 +1235,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
transformClo,
resolveTaskName(),
null,
- txEntry.keepBinary());
+ txEntry.keepBinary(),
+ null);
if (res != null) {
val = res.value();
@@ -1316,7 +1320,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
null,
resolveTaskName(),
accessPlc,
- !deserializeBinary) : null;
+ !deserializeBinary,
+ null) : null;
if (res != null) {
val = res.value();
@@ -1666,7 +1671,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
transformClo,
resolveTaskName(),
null,
- txEntry.keepBinary());
+ txEntry.keepBinary(),
+ null);
if (res != null) {
val = res.value();
@@ -2390,7 +2396,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
entryProcessor,
resolveTaskName(),
null,
- keepBinary) : null;
+ keepBinary,
+ null) : null;
if (res != null) {
old = res.value();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index b03e9c8..8db68b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -430,7 +430,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
UUID subjId,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ boolean keepBinary,
+ @Nullable ReaderArguments args) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert false;
return null;
@@ -448,7 +449,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
Object transformClo,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean keepBinary) {
+ boolean keepBinary,
+ @Nullable ReaderArguments readerArgs) {
assert false;
return null;
@@ -684,9 +686,10 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** @inheritDoc */
- @Override public GridCacheVersion versionedValue(CacheObject val,
+ @Override public T2<CacheObject, GridCacheVersion> versionedValue(CacheObject val,
GridCacheVersion curVer,
- GridCacheVersion newVer) {
+ GridCacheVersion newVer,
+ @Nullable ReaderArguments readerArgs) {
assert false;
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java
new file mode 100644
index 0000000..183b9ca
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ * Check that near cache is updated when entry loaded from store.
+ */
+public class GridNearCacheStoreUpdateTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE_NAME = "cache";
+
+ /** */
+ private Ignite srv;
+
+ /** */
+ private Ignite client;
+
+ /** */
+ private IgniteCache<String, String> cache;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.contains("client"))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ srv = startGrid("server");
+ client = startGrid("client");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testAtomicUpdateNear() throws Exception {
+ cache = client.createCache(cacheConfiguration(), new NearCacheConfiguration<String, String>());
+
+ checkNear(null, null);
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testTransactionAtomicUpdateNear() throws Exception {
+ cache = client.createCache(cacheConfiguration(), new NearCacheConfiguration<String, String>());
+
+ checkNear(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testPessimisticRepeatableReadUpdateNear() throws Exception {
+ cache = client.createCache(cacheConfiguration().setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL),
+ new NearCacheConfiguration<String, String>());
+
+ checkNear(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testPessimisticReadCommittedUpdateNear() throws Exception {
+ cache = client.createCache(cacheConfiguration().setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL),
+ new NearCacheConfiguration<String, String>());
+
+ checkNear(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testOptimisticSerializableUpdateNear() throws Exception {
+ cache = client.createCache(cacheConfiguration().setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL),
+ new NearCacheConfiguration<String, String>());
+
+ checkNear(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+ }
+
+ /**
+ * @param txConc Transaction concurrency.
+ * @param txIsolation Transaction isolation.
+ * @throws Exception If fail.
+ */
+ private void checkNear(TransactionConcurrency txConc, TransactionIsolation txIsolation) throws Exception {
+ checkNearSingle(txConc, txIsolation);
+ checkNearSingleConcurrent(txConc, txIsolation);
+ checkNearBatch(txConc, txIsolation);
+ checkNearBatchConcurrent(txConc, txIsolation);
+ }
+
+ /**
+ * @param txConc Transaction concurrency.
+ * @param txIsolation Transaction isolation.
+ * @throws Exception If fail.
+ */
+ private void checkNearSingle(TransactionConcurrency txConc, TransactionIsolation txIsolation) throws Exception {
+ final String key = "key";
+
+ boolean tx = txConc != null && txIsolation != null;
+
+ final IgniteCache<String, String> clientCache = this.cache;
+ final IgniteCache<String, String> srvCache = srv.<String, String>cache(CACHE_NAME);
+
+ if (tx) {
+ doInTransaction(client, txConc, txIsolation, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ // Read from store.
+ assertEquals(key, clientCache.get(key));
+
+ return null;
+ }
+ });
+ }
+ else
+ assertEquals(key, clientCache.get(key));
+
+ final String updatedVal = "key_updated";
+
+ if (tx) {
+ doInTransaction(srv, txConc, txIsolation, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ // Update value.
+ srvCache.put(key, updatedVal);
+
+ return null;
+ }
+ });
+ }
+ else
+ srvCache.put(key, updatedVal);
+
+ if (tx) {
+ doInTransaction(client, txConc, txIsolation, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ assertEquals(updatedVal, clientCache.get(key));
+
+ return null;
+ }
+ });
+ }
+ else
+ assertEquals(updatedVal, clientCache.get(key));
+ }
+
+ /**
+ * @param txConc Transaction concurrency.
+ * @param txIsolation Transaction isolation.
+ * @throws Exception If fail.
+ */
+ private void checkNearSingleConcurrent(final TransactionConcurrency txConc, final TransactionIsolation txIsolation) throws Exception {
+ for (int i = 0; i < 10; i++) {
+ final String key = String.valueOf(-((new Random().nextInt(99) + 1)));
+
+ boolean tx = txConc != null && txIsolation != null;
+
+ final IgniteCache<String, String> clientCache = this.cache;
+ final IgniteCache<String, String> srvCache = srv.cache(CACHE_NAME);
+
+ final CountDownLatch storeLatch = new CountDownLatch(1);
+
+ final IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ storeLatch.await();
+
+ clientCache.get(key);
+
+ return null;
+ }
+ });
+
+
+// IgniteInternalFuture<Object> fut2 = null;
+
+ // TODO Sometimes Near cache becomes inconsistent
+// if (!tx) {
+// // TODO: IGNITE-3498
+// // TODO: Doesn't work on transactional cache.
+// fut2 = GridTestUtils.runAsync(new Callable<Object>() {
+// @Override public Object call() throws Exception {
+// storeLatch.await();
+//
+// srvCache.remove(key);
+//
+// return null;
+// }
+// });
+// }
+
+ final IgniteInternalFuture<Object> fut3 = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ storeLatch.await();
+
+ srvCache.put(key, "other");
+
+ return null;
+ }
+ });
+
+ storeLatch.countDown();
+
+ fut1.get();
+
+// if (!tx)
+// fut2.get();
+
+ fut3.get();
+
+ final String srvVal = srvCache.get(key);
+ final String clientVal = clientCache.get(key);
+
+ assertEquals(srvVal, clientVal);
+ }
+ }
+
+ /**
+ * @param txConc Transaction concurrency.
+ * @param txIsolation Transaction isolation.
+ * @throws Exception If fail.
+ */
+ private void checkNearBatch(TransactionConcurrency txConc, TransactionIsolation txIsolation) throws Exception {
+ final Map<String, String> data1 = new HashMap<>();
+ final Map<String, String> data2 = new HashMap<>();
+
+ for (int i = 0; i < 10; i++) {
+ data1.put(String.valueOf(i), String.valueOf(i));
+ data2.put(String.valueOf(i), "other");
+ }
+
+ final IgniteCache<String, String> clientCache = this.cache;
+ final IgniteCache<String, String> srvCache = srv.cache(CACHE_NAME);
+
+ boolean tx = txConc != null && txIsolation != null;
+
+ // Read from store.
+ if (tx) {
+ doInTransaction(client, txConc, txIsolation, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ assertEquals(data1, clientCache.getAll(data1.keySet()));
+
+ return null;
+ }
+ });
+ }
+ else
+ assertEquals(data1, clientCache.getAll(data1.keySet()));
+
+ // Update value.
+ if (tx) {
+ doInTransaction(srv, txConc, txIsolation, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ srvCache.putAll(data2);
+
+ return null;
+ }
+ });
+ }
+ else
+ srvCache.putAll(data2);
+
+ if (tx) {
+ doInTransaction(client, txConc, txIsolation, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ assertEquals(data2, clientCache.getAll(data2.keySet()));
+
+ return null;
+ }
+ });
+ }
+ else
+ assertEquals(data2, clientCache.getAll(data2.keySet()));
+ }
+
+ /**
+ * @param txConc Transaction concurrency.
+ * @param txIsolation Transaction isolation.
+ * @throws Exception If fail.
+ */
+ private void checkNearBatchConcurrent(TransactionConcurrency txConc, TransactionIsolation txIsolation)
+ throws Exception {
+ final Map<String, String> data1 = new HashMap<>();
+ final Map<String, String> data2 = new HashMap<>();
+
+ for (int j = 0; j < 10; j++) {
+ data1.clear();
+ data2.clear();
+
+ for (int i = j * 10; i < j * 10 + 10; i++) {
+ data1.put(String.valueOf(i), String.valueOf(i));
+ data2.put(String.valueOf(i), "other");
+ }
+
+ final IgniteCache<String, String> clientCache = this.cache;
+ final IgniteCache<String, String> srvCache = srv.cache(CACHE_NAME);
+
+ boolean tx = txConc != null && txIsolation != null;
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ latch.await();
+
+ clientCache.getAll(data1.keySet());
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<Object> fut2 = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ latch.await();
+
+ srvCache.putAll(data2);
+
+ return null;
+ }
+ });
+
+// IgniteInternalFuture<Object> fut3 = null;
+//
+// // TODO Sometimes Near cache becomes inconsistent
+// if (!tx) {
+// // TODO: IGNITE-3498
+// // TODO: Doesn't work on transactional cache.
+// fut3 = GridTestUtils.runAsync(new Callable<Object>() {
+// @Override public Object call() throws Exception {
+// latch.await();
+//
+// srvCache.removeAll(data1.keySet());
+//
+// return null;
+// }
+// });
+// }
+
+ latch.countDown();
+
+// if (!tx)
+// fut3.get();
+
+ fut1.get();
+ fut2.get();
+
+ final Map<String, String> srvVals = srvCache.getAll(data1.keySet());
+ final Map<String, String> clientVals = clientCache.getAll(data1.keySet());
+
+ assertEquals(srvVals, clientVals);
+ }
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration<String, String> cacheConfiguration() {
+ CacheConfiguration<String, String> cfg = new CacheConfiguration<>(CACHE_NAME);
+
+ cfg.setCacheStoreFactory(new StoreFactory());
+
+ cfg.setReadThrough(true);
+ cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ private static class StoreFactory implements Factory<CacheStore<? super String, ? super String>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public CacheStore<? super String, ? super String> create() {
+ return new TestStore();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestStore extends CacheStoreAdapter<String, String> implements Serializable {
+ /** */
+ private final ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
+
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ *
+ */
+ public TestStore() {
+ for (int i = -100; i < 1000; i++)
+ map.put(String.valueOf(i), String.valueOf(i));
+
+ map.put("key", "key");
+ }
+
+ /** {@inheritDoc} */
+ @Override public String load(String key) throws CacheLoaderException {
+ return map.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends String, ? extends String> entry) throws CacheWriterException {
+ map.put(entry.getKey(), entry.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SuspiciousMethodCalls")
+ @Override public void delete(Object key) throws CacheWriterException {
+ map.remove(key);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffheapCacheStoreUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffheapCacheStoreUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffheapCacheStoreUpdateTest.java
new file mode 100644
index 0000000..ae3f695
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffheapCacheStoreUpdateTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Check that near cache is updated when entry loaded from store.
+ */
+public class GridNearOffheapCacheStoreUpdateTest extends GridNearCacheStoreUpdateTest {
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration<String, String> cacheConfiguration() {
+ final CacheConfiguration<String, String> ccfg = super.cacheConfiguration();
+
+ ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b54a4813/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 8792ea1..af46c57 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -114,8 +114,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePar
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxTimeoutSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheRendezvousAffinityClientSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridPartitionedBackupLoadSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheStoreUpdateTest;
import org.apache.ignite.internal.processors.cache.distributed.near.NearCacheSyncUpdateTest;
import org.apache.ignite.internal.processors.cache.distributed.near.NoneRebalanceModeSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOffheapCacheStoreUpdateTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEvictionSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedJobExecutionTest;
import org.apache.ignite.internal.processors.cache.local.GridCacheLocalAtomicBasicStoreSelfTest;
@@ -271,6 +273,9 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(OffheapCacheOnClientsTest.class));
suite.addTest(new TestSuite(CacheConcurrentReadThroughTest.class));
+ suite.addTest(new TestSuite(GridNearCacheStoreUpdateTest.class));
+ suite.addTest(new TestSuite(GridNearOffheapCacheStoreUpdateTest.class));
+
return suite;
}
}