You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/28 15:26:47 UTC
[40/49] ignite git commit: ignite-1607 Implemented deadlock-free
optimistic serializable tx mode
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index fe519a7..3c3527a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -221,34 +221,9 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
return true;
}
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked", "RedundantCast"})
- @Override public IgniteInternalFuture<Object> readThroughAllAsync(
- Collection<KeyCacheObject> keys,
- boolean reload,
- boolean skipVals,
- IgniteInternalTx tx,
- @Nullable UUID subjId,
- String taskName,
- IgniteBiInClosure<KeyCacheObject, Object> vis
- ) {
- return (IgniteInternalFuture)loadAsync(tx,
- keys,
- reload,
- /*force primary*/false,
- subjId,
- taskName,
- /*deserialize portable*/true,
- /*expiry policy*/null,
- skipVals,
- /*skip store*/false,
- /*can remap*/true);
- }
-
/**
* @param tx Transaction.
* @param keys Keys to load.
- * @param reload Reload flag.
* @param forcePrimary Force primary flag.
* @param subjId Subject ID.
* @param taskName Task name.
@@ -256,11 +231,11 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
* @param expiryPlc Expiry policy.
* @param skipVal Skip value flag.
* @param skipStore Skip store flag.
+ * @param canRemap Can remap flag.
* @return Loaded values.
*/
public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable IgniteInternalTx tx,
@Nullable Collection<KeyCacheObject> keys,
- boolean reload,
boolean forcePrimary,
@Nullable UUID subjId,
String taskName,
@@ -280,7 +255,6 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
keys,
!skipStore,
- reload,
forcePrimary,
txx,
subjId,
@@ -288,7 +262,9 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
deserializePortable,
expiry,
skipVal,
- canRemap);
+ canRemap,
+ false,
+ false);
// init() will register future for responses if future has remote mappings.
fut.init();
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 2ae03d3..d558cc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
@SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext", "TooBroadScope"})
public class GridNearCacheEntry extends GridDistributedCacheEntry {
/** */
- private static final int NEAR_SIZE_OVERHEAD = 36;
+ private static final int NEAR_SIZE_OVERHEAD = 36 + 16;
/** Topology version at the moment when value was initialized from primary node. */
private volatile long topVer = -1L;
@@ -58,6 +58,9 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
/** Partition. */
private int part;
+ /** */
+ private short evictReservations;
+
/**
* @param ctx Cache context.
* @param key Cache key.
@@ -316,15 +319,21 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
primaryNode(primaryNodeId, topVer);
}
- /**
- * This method should be called only when committing optimistic transactions.
- *
+ /*
* @param dhtVer DHT version to record.
+ * @return {@code False} if given version is lower then existing version.
*/
- public synchronized void recordDhtVersion(GridCacheVersion dhtVer) {
- // Version manager must be updated separately, when adding DHT version
- // to transaction entries.
- this.dhtVer = dhtVer;
+ public final boolean recordDhtVersion(GridCacheVersion dhtVer) {
+ assert dhtVer != null;
+ assert Thread.holdsLock(this);
+
+ if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) <= 0) {
+ this.dhtVer = dhtVer;
+
+ return true;
+ }
+
+ return false;
}
/** {@inheritDoc} */
@@ -332,7 +341,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
UUID subjId, String taskName) throws IgniteCheckedException {
return cctx.near().loadAsync(tx,
F.asList(key),
- reload,
/*force primary*/false,
subjId,
taskName,
@@ -350,7 +358,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
* @param val New value.
* @param ver Version to use.
* @param dhtVer DHT version received from remote node.
- * @param expVer Optional version to match.
* @param ttl Time to live.
* @param expireTime Expiration time.
* @param evt Event flag.
@@ -366,14 +373,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
CacheObject val,
GridCacheVersion ver,
GridCacheVersion dhtVer,
- @Nullable GridCacheVersion expVer,
long ttl,
long expireTime,
boolean evt,
AffinityTopologyVersion topVer,
UUID subjId)
throws IgniteCheckedException, GridCacheEntryRemovedException {
- boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion());
+ assert dhtVer != null;
GridCacheVersion enqueueVer = null;
@@ -389,28 +395,25 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
CacheObject old = this.val;
boolean hasVal = hasValueUnlocked();
- if (isNew() || !valid || expVer == null || expVer.equals(this.dhtVer)) {
+ if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) {
primaryNode(primaryNodeId, topVer);
- // Change entry only if dht version has changed.
- if (!dhtVer.equals(dhtVersion())) {
- update(val, expireTime, ttl, ver);
+ update(val, expireTime, ttl, ver);
- if (cctx.deferredDelete() && !isInternal()) {
- boolean deleted = val == null;
+ if (cctx.deferredDelete() && !isInternal()) {
+ boolean deleted = val == null;
- if (deleted != deletedUnlocked()) {
- deletedUnlocked(deleted);
+ if (deleted != deletedUnlocked()) {
+ deletedUnlocked(deleted);
- if (deleted)
- enqueueVer = ver;
- }
+ if (deleted)
+ enqueueVer = ver;
}
+ }
- recordDhtVersion(dhtVer);
+ this.dhtVer = dhtVer;
- ret = true;
- }
+ ret = true;
}
if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
@@ -647,6 +650,32 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
}
/**
+ * @throws GridCacheEntryRemovedException If entry was removed.
+ */
+ public synchronized void reserveEviction() throws GridCacheEntryRemovedException {
+ checkObsolete();
+
+ evictReservations++;
+ }
+
+ /**
+ *
+ */
+ public synchronized void releaseEviction() {
+ assert evictReservations > 0 : this;
+ assert !obsolete() : this;
+
+ evictReservations--;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean evictionDisabled() {
+ assert Thread.holdsLock(this);
+
+ return evictReservations > 0;
+ }
+
+ /**
* @param nodeId Primary node ID.
* @param topVer Topology version.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index eca2f71..ae1d43c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -24,7 +24,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -38,19 +37,17 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -59,6 +56,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -66,83 +64,31 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
-import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
/**
*
*/
-public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
- implements GridCacheFuture<Map<K, V>> {
+public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> {
/** */
private static final long serialVersionUID = 0L;
- /** Default max remap count value. */
- public static final int DFLT_MAX_REMAP_CNT = 3;
-
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
private static IgniteLogger log;
- /** Maximum number of attempts to remap key to the same primary node. */
- private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
-
- /** Context. */
- private final GridCacheContext<K, V> cctx;
-
- /** Keys. */
- private Collection<KeyCacheObject> keys;
-
- /** Reload flag. */
- private boolean reload;
-
- /** Read through flag. */
- private boolean readThrough;
-
- /** Force primary flag. */
- private boolean forcePrimary;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Version. */
- private GridCacheVersion ver;
-
/** Transaction. */
private IgniteTxLocalEx tx;
- /** Trackable flag. */
- private boolean trackable;
-
- /** Remap count. */
- private AtomicInteger remapCnt = new AtomicInteger();
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name. */
- private String taskName;
-
- /** Whether to deserialize portable objects. */
- private boolean deserializePortable;
-
- /** Skip values flag. */
- private boolean skipVals;
-
- /** Expiry policy. */
- private IgniteCacheExpiryPolicy expiryPlc;
-
- /** Flag indicating that get should be done on a locked topology version. */
- private final boolean canRemap;
+ /** */
+ private GridCacheVersion ver;
/**
* @param cctx Context.
* @param keys Keys.
* @param readThrough Read through flag.
- * @param reload Reload flag.
* @param forcePrimary If {@code true} get will be performed on primary node even if
* called on backup node.
* @param tx Transaction.
@@ -151,12 +97,14 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
* @param deserializePortable Deserialize portable flag.
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
+ * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+ * @param needVer If {@code true} returns values as tuples containing value and version.
+ * @param keepCacheObjects Keep cache objects flag.
*/
public GridNearGetFuture(
GridCacheContext<K, V> cctx,
Collection<KeyCacheObject> keys,
boolean readThrough,
- boolean reload,
boolean forcePrimary,
@Nullable IgniteTxLocalEx tx,
@Nullable UUID subjId,
@@ -164,24 +112,26 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer,
+ boolean keepCacheObjects
) {
- super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
+ super(cctx,
+ keys,
+ readThrough,
+ forcePrimary,
+ subjId,
+ taskName,
+ deserializePortable,
+ expiryPlc,
+ skipVals,
+ canRemap,
+ needVer,
+ keepCacheObjects);
assert !F.isEmpty(keys);
- this.cctx = cctx;
- this.keys = keys;
- this.readThrough = readThrough;
- this.reload = reload;
- this.forcePrimary = forcePrimary;
this.tx = tx;
- this.subjId = subjId;
- this.taskName = taskName;
- this.deserializePortable = deserializePortable;
- this.expiryPlc = expiryPlc;
- this.skipVals = skipVals;
- this.canRemap = canRemap;
futId = IgniteUuid.randomUuid();
@@ -318,16 +268,17 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(affNodes.size());
- Map<KeyCacheObject, GridCacheVersion> savedVers = null;
+ Map<KeyCacheObject, GridNearCacheEntry> savedEntries = null;
// Assign keys to primary nodes.
for (KeyCacheObject key : keys)
- savedVers = map(key, mappings, topVer, mapped, savedVers);
+ savedEntries = map(key, mappings, topVer, mapped, savedEntries);
if (isDone())
return;
- final Map<KeyCacheObject, GridCacheVersion> saved = savedVers;
+ final Map<KeyCacheObject, GridNearCacheEntry> saved = savedEntries != null ? savedEntries :
+ Collections.<KeyCacheObject, GridNearCacheEntry>emptyMap();
final int keysSize = keys.size();
@@ -346,7 +297,6 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
-1,
mappedKeys,
readThrough,
- reload,
topVer,
subjId,
taskName == null ? 0 : taskName.hashCode(),
@@ -405,7 +355,6 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
ver,
mappedKeys,
readThrough,
- reload,
topVer,
subjId,
taskName == null ? 0 : taskName.hashCode(),
@@ -434,43 +383,64 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
* @param key Key to map.
* @param topVer Topology version
* @param mapped Previously mapped.
- * @param savedVers Saved versions.
+ * @param saved Reserved near cache entries.
* @return Map.
*/
- private Map<KeyCacheObject, GridCacheVersion> map(
+ private Map<KeyCacheObject, GridNearCacheEntry> map(
KeyCacheObject key,
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings,
AffinityTopologyVersion topVer,
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
- Map<KeyCacheObject, GridCacheVersion> savedVers
+ Map<KeyCacheObject, GridNearCacheEntry> saved
) {
final GridNearCacheAdapter near = cache();
// Allow to get cached value from the local node.
boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
- GridCacheEntryEx entry = allowLocRead ? near.peekEx(key) : null;
-
while (true) {
+ GridNearCacheEntry entry = allowLocRead ? (GridNearCacheEntry)near.peekEx(key) : null;
+
try {
CacheObject v = null;
+ GridCacheVersion ver = null;
boolean isNear = entry != null;
// First we peek into near cache.
- if (isNear)
- v = entry.innerGet(tx,
- /*swap*/false,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /*metrics*/true,
- /*events*/!skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc);
+ if (isNear) {
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/true,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiryPlc);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+ }
+ }
+ else {
+ v = entry.innerGet(tx,
+ /*swap*/false,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*events*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc);
+ }
+ }
ClusterNode affNode = null;
@@ -486,18 +456,37 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
if (dhtEntry != null) {
boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
- v = dhtEntry.innerGet(tx,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /*update-metrics*/false,
- /*events*/!isNear && !skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc);
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!isNear && !skipVals,
+ subjId,
+ null,
+ taskName,
+ expiryPlc);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+ }
+ }
+ else {
+ v = dhtEntry.innerGet(tx,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*events*/!isNear && !skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc);
+ }
// Entry was not in memory or in swap, so we remove it from cache.
if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
@@ -515,7 +504,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid)."));
- return savedVers;
+ return saved;
}
if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
@@ -534,14 +523,29 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
}
}
- if (v != null && !reload) {
- K key0 = key.value(cctx.cacheObjectContext(), true);
- V val0 = v.value(cctx.cacheObjectContext(), true);
+ if (v != null) {
+ if (needVer) {
+ V val0 = (V)new T2<>(skipVals ? true : v, ver);
+
+ add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
+ }
+ else {
+ if (keepCacheObjects) {
+ K key0 = (K)key;
+ V val0 = (V)(skipVals ? true : v);
+
+ add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+ }
+ else {
+ K key0 = key.value(cctx.cacheObjectContext(), true);
+ V val0 = v.value(cctx.cacheObjectContext(), true);
- val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable);
- key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable);
+ val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable);
+ key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable);
- add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+ add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+ }
+ }
}
else {
if (affNode == null) {
@@ -551,19 +555,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid)."));
- return savedVers;
+ return saved;
}
}
- GridNearCacheEntry nearEntry = allowLocRead ? near.peekExx(key) : null;
-
- entry = nearEntry;
-
- if (savedVers == null)
- savedVers = U.newHashMap(3);
-
- savedVers.put(key, nearEntry == null ? null : nearEntry.dhtVersion());
-
LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode);
if (keys != null && keys.containsKey(key)) {
@@ -572,10 +567,23 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
MAX_REMAP_CNT + " attempts (key got remapped to the same node) " +
"[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']'));
- return savedVers;
+ return saved;
}
}
+ if (!cctx.affinity().localNode(key, topVer)) {
+ GridNearCacheEntry nearEntry = entry != null ? entry : near.entryExx(key, topVer);
+
+ nearEntry.reserveEviction();
+
+ entry = null;
+
+ if (saved == null)
+ saved = U.newHashMap(3);
+
+ saved.put(key, nearEntry);
+ }
+
// Don't add reader if transaction acquires lock anyway to avoid deadlock.
boolean addRdr = tx == null || tx.optimistic();
@@ -598,21 +606,15 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
break;
}
catch (GridCacheEntryRemovedException ignored) {
- entry = allowLocRead ? near.peekEx(key) : null;
- }
- catch (GridCacheFilterFailedException e) {
- if (log.isDebugEnabled())
- log.debug("Filter validation failed for entry: " + e);
-
- break;
+ // Retry.
}
finally {
- if (entry != null && !reload && tx == null)
+ if (entry != null && tx == null)
cctx.evicts().touch(entry, topVer);
}
}
- return savedVers;
+ return saved;
}
/**
@@ -655,7 +657,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
* @param nodeId Node id.
* @param keys Keys.
* @param infos Entry infos.
- * @param savedVers Saved versions.
+ * @param savedEntries Saved entries.
* @param topVer Topology version
* @return Result map.
*/
@@ -663,7 +665,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
UUID nodeId,
Collection<KeyCacheObject> keys,
Collection<GridCacheEntryInfo> infos,
- Map<KeyCacheObject, GridCacheVersion> savedVers,
+ Map<KeyCacheObject, GridNearCacheEntry> savedEntries,
AffinityTopologyVersion topVer
) {
boolean empty = F.isEmpty(keys);
@@ -681,9 +683,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
// Entries available locally in DHT should not be loaded into near cache for reading.
if (!cctx.affinity().localNode(info.key(), cctx.affinity().affinityTopologyVersion())) {
- GridNearCacheEntry entry = cache().entryExx(info.key(), topVer);
+ GridNearCacheEntry entry = savedEntries.get(info.key());
- GridCacheVersion saved = savedVers.get(info.key());
+ if (entry == null)
+ entry = cache().entryExx(info.key(), topVer);
// Load entry into cache.
entry.loadedValue(tx,
@@ -691,14 +694,11 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
info.value(),
atomic ? info.version() : ver,
info.version(),
- saved,
info.ttl(),
info.expireTime(),
true,
topVer,
subjId);
-
- cctx.evicts().touch(entry, topVer);
}
CacheObject val = info.value();
@@ -706,7 +706,16 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
assert skipVals == (info.value() == null);
- cctx.addResult(map, key, val, skipVals, false, deserializePortable, false);
+ if (needVer)
+ versionedResult(map, key, val, info.version());
+ else
+ cctx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
@@ -724,6 +733,26 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
return map;
}
+ /**
+ * @param keys Keys.
+ * @param saved Saved entries.
+ * @param topVer Topology version.
+ */
+ private void releaseEvictions(Collection<KeyCacheObject> keys,
+ Map<KeyCacheObject, GridNearCacheEntry> saved,
+ AffinityTopologyVersion topVer) {
+ for (KeyCacheObject key : keys) {
+ GridNearCacheEntry entry = saved.get(key);
+
+ if (entry != null) {
+ entry.releaseEviction();
+
+ if (tx == null)
+ cctx.evicts().touch(entry, topVer);
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@@ -763,7 +792,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
private LinkedHashMap<KeyCacheObject, Boolean> keys;
/** Saved entry versions. */
- private Map<KeyCacheObject, GridCacheVersion> savedVers;
+ private Map<KeyCacheObject, GridNearCacheEntry> savedEntries;
/** Topology version on which this future was mapped. */
private AffinityTopologyVersion topVer;
@@ -774,18 +803,18 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
/**
* @param node Node.
* @param keys Keys.
- * @param savedVers Saved entry versions.
+ * @param savedEntries Saved entries.
* @param topVer Topology version.
*/
MiniFuture(
ClusterNode node,
LinkedHashMap<KeyCacheObject, Boolean> keys,
- Map<KeyCacheObject, GridCacheVersion> savedVers,
+ Map<KeyCacheObject, GridNearCacheEntry> savedEntries,
AffinityTopologyVersion topVer
) {
this.node = node;
this.keys = keys;
- this.savedVers = savedVers;
+ this.savedEntries = savedEntries;
this.topVer = topVer;
}
@@ -821,6 +850,17 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
onDone(e);
}
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Map<K, V> res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ releaseEvictions(keys.keySet(), savedEntries, topVer);
+
+ return true;
+ }
+ else
+ return false;
+ }
+
/**
* @param e Topology exception.
*/
@@ -915,7 +955,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
}), F.t(node, keys), topVer);
// It is critical to call onDone after adding futures to compound list.
- onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer));
+ onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer));
return;
}
@@ -935,12 +975,12 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
}), F.t(node, keys), new AffinityTopologyVersion(readyTopVer));
// It is critical to call onDone after adding futures to compound list.
- onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer));
+ onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer));
}
});
}
else
- onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer));
+ onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index ff6375a..8482217 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -109,7 +109,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
* @param ver Version.
* @param keys Keys.
* @param readThrough Read through flag.
- * @param reload Reload flag.
* @param skipVals Skip values flag. When false, only boolean values will be returned indicating whether
* cache entry has a value.
* @param topVer Topology version.
@@ -125,7 +124,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
GridCacheVersion ver,
LinkedHashMap<KeyCacheObject, Boolean> keys,
boolean readThrough,
- boolean reload,
@NotNull AffinityTopologyVersion topVer,
UUID subjId,
int taskNameHash,
@@ -145,7 +143,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
this.keys = keys.keySet();
this.flags = keys.values();
this.readThrough = readThrough;
- this.reload = reload;
this.topVer = topVer;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
new file mode 100644
index 0000000..47c1d21
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -0,0 +1,930 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.transactions.TransactionState.PREPARED;
+import static org.apache.ignite.transactions.TransactionState.PREPARING;
+
+/**
+ *
+ */
+public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter
+ implements GridCacheMvccFuture<IgniteInternalTx> {
+ /** */
+ public static final IgniteProductVersion SER_TX_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+ /** */
+ @GridToStringExclude
+ private KeyLockFuture keyLockFut = new KeyLockFuture();
+
+ /** */
+ @GridToStringExclude
+ private ClientRemapFuture remapFut;
+
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ */
+ public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+ super(cctx, tx);
+
+ assert tx.optimistic() && tx.serializable() : tx;
+
+ // Should wait for all mini futures completion before finishing tx.
+ ignoreChildFailures(IgniteCheckedException.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
+ if (log.isDebugEnabled())
+ log.debug("Transaction future received owner changed callback: " + entry);
+
+ if ((entry.context().isNear() || entry.context().isLocal()) && owner != null) {
+ IgniteTxEntry txEntry = tx.entry(entry.txKey());
+
+ if (txEntry != null) {
+ if (entry.context().isLocal()) {
+ GridCacheVersion serReadVer = txEntry.serializableReadVersion();
+
+ if (serReadVer != null) {
+ GridCacheContext ctx = entry.context();
+
+ while (true) {
+ try {
+ if (!entry.checkSerializableReadVersion(serReadVer)) {
+ Object key = entry.key().value(ctx.cacheObjectContext(), false);
+
+ IgniteTxOptimisticCheckedException err0 =
+ new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " +
+ "read/write conflict [key=" + key + ", cache=" + ctx.name() + ']');
+
+ err.compareAndSet(null, err0);
+ }
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException e) {
+ entry = ctx.cache().entryEx(entry.key());
+
+ txEntry.cached(entry);
+ }
+ }
+
+ }
+ }
+
+ keyLockFut.onKeyLocked(entry.txKey());
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends ClusterNode> nodes() {
+ return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
+ @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
+ if (isMini(f))
+ return ((MiniFuture)f).node();
+
+ return cctx.discovery().localNode();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ boolean found = false;
+
+ for (IgniteInternalFuture<?> fut : futures()) {
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture) fut;
+
+ if (f.node().id().equals(nodeId)) {
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
+ nodeId);
+
+ e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+ f.onNodeLeft(e);
+
+ found = true;
+ }
+ }
+ }
+
+ return found;
+ }
+
+ /**
+ * @param m Failed mapping.
+ * @param e Error.
+ */
+ private void onError(@Nullable GridDistributedTxMapping m, Throwable e) {
+ if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
+ if (tx.onePhaseCommit()) {
+ tx.markForBackupCheck();
+
+ onComplete();
+
+ return;
+ }
+ }
+
+ if (e instanceof IgniteTxOptimisticCheckedException) {
+ if (m != null)
+ tx.removeMapping(m.node().id());
+ }
+
+ err.compareAndSet(null, e);
+
+ keyLockFut.onDone(e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ if (!isDone()) {
+ for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.node().id().equals(nodeId);
+
+ f.onResult(res);
+ }
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
+ if (isDone())
+ return false;
+
+ if (err != null) {
+ this.err.compareAndSet(null, err);
+
+ keyLockFut.onDone(err);
+ }
+
+ return onComplete();
+ }
+
+ /**
+ * @param f Future.
+ * @return {@code True} if mini-future.
+ */
+ private boolean isMini(IgniteInternalFuture<?> f) {
+ return f.getClass().equals(MiniFuture.class);
+ }
+
+ /**
+ * Completeness callback.
+ *
+ * @return {@code True} if future was finished by this call.
+ */
+ private boolean onComplete() {
+ Throwable err0 = err.get();
+
+ if (err0 == null || tx.needCheckBackup())
+ tx.state(PREPARED);
+
+ if (super.onDone(tx, err0)) {
+ if (err0 != null)
+ tx.setRollbackOnly();
+
+ // Don't forget to clean up.
+ cctx.mvcc().removeFuture(this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Initializes future.
+ *
+ * @param remap Remap flag.
+ */
+ @Override protected void prepare0(boolean remap, boolean topLocked) {
+ try {
+ boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
+
+ if (!txStateCheck) {
+ if (tx.setRollbackOnly()) {
+ if (tx.timedOut())
+ onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
+ "was rolled back: " + this));
+ else
+ onError(null, new IgniteCheckedException("Invalid transaction state for prepare " +
+ "[state=" + tx.state() + ", tx=" + this + ']'));
+ }
+ else
+ onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
+ "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+
+ return;
+ }
+
+ prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);
+
+ markInitialized();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+
+ /**
+ * @param reads Read entries.
+ * @param writes Write entries.
+ * @param remap Remap flag.
+ * @param topLocked Topology locked flag.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void prepare(
+ Iterable<IgniteTxEntry> reads,
+ Iterable<IgniteTxEntry> writes,
+ boolean remap,
+ boolean topLocked
+ ) throws IgniteCheckedException {
+ AffinityTopologyVersion topVer = tx.topologyVersion();
+
+ assert topVer.topologyVersion() > 0;
+
+ txMapping = new GridDhtTxMapping();
+
+ if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
+ for (int cacheId : tx.activeCacheIds()) {
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " +
+ "partition nodes left the grid): " + cacheCtx.name()));
+
+ return;
+ }
+ }
+ }
+
+ Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+
+ for (IgniteTxEntry write : writes)
+ map(write, topVer, mappings, remap, topLocked);
+
+ for (IgniteTxEntry read : reads)
+ map(read, topVer, mappings, remap, topLocked);
+
+ keyLockFut.onAllKeysAdded();
+
+ if (!remap)
+ add(keyLockFut);
+
+ if (isDone()) {
+ if (log.isDebugEnabled())
+ log.debug("Abandoning (re)map because future is done: " + this);
+
+ return;
+ }
+
+ tx.addEntryMapping(mappings.values());
+
+ cctx.mvcc().recheckPendingLocks();
+
+ tx.transactionNodes(txMapping.transactionNodes());
+
+ checkOnePhase();
+
+ for (GridDistributedTxMapping m : mappings.values()) {
+ assert !m.empty();
+
+ MiniFuture fut = new MiniFuture(m);
+
+ add(fut);
+ }
+
+ Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
+
+ Iterator<IgniteInternalFuture<?>> it = futs.iterator();
+
+ while (it.hasNext()) {
+ IgniteInternalFuture<?> fut0 = it.next();
+
+ if (skipFuture(remap, fut0))
+ continue;
+
+ MiniFuture fut = (MiniFuture)fut0;
+
+ IgniteCheckedException err = prepare(fut);
+
+ if (err != null) {
+ while (it.hasNext()) {
+ fut0 = it.next();
+
+ if (skipFuture(remap, fut0))
+ continue;
+
+ fut = (MiniFuture)fut0;
+
+ tx.removeMapping(fut.mapping().node().id());
+
+ fut.onResult(new IgniteCheckedException("Failed to prepare transaction.", err));
+ }
+
+ break;
+ }
+ }
+
+ markInitialized();
+ }
+
+ /**
+ * @param remap Remap flag.
+ * @param fut Future.
+ * @return {@code True} if skip future during remap.
+ */
+ private boolean skipFuture(boolean remap, IgniteInternalFuture<?> fut) {
+ return !(isMini(fut)) || (remap && ((MiniFuture)fut).rcvRes.get());
+ }
+
+ /**
+ * @param fut Mini future.
+ * @return Prepare error if any.
+ */
+ @Nullable private IgniteCheckedException prepare(final MiniFuture fut) {
+ GridDistributedTxMapping m = fut.mapping();
+
+ final ClusterNode n = m.node();
+
+ GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+ futId,
+ tx.topologyVersion(),
+ tx,
+ m.reads(),
+ m.writes(),
+ m.near(),
+ txMapping.transactionNodes(),
+ m.last(),
+ m.lastBackups(),
+ tx.onePhaseCommit(),
+ tx.needReturnValue() && tx.implicit(),
+ tx.implicitSingle(),
+ m.explicitLock(),
+ tx.subjectId(),
+ tx.taskNameHash(),
+ m.clientFirst(),
+ tx.activeCachesDeploymentEnabled());
+
+ for (IgniteTxEntry txEntry : m.writes()) {
+ if (txEntry.op() == TRANSFORM)
+ req.addDhtVersion(txEntry.txKey(), null);
+ }
+
+ // Must lock near entries separately.
+ if (m.near()) {
+ try {
+ tx.optimisticLockEntries(F.concat(false, m.writes(), m.reads()));
+
+ tx.userPrepare();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onResult(e);
+
+ return e;
+ }
+ }
+
+ req.miniId(fut.futureId());
+
+ // If this is the primary node for the keys.
+ if (n.isLocal()) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+
+ prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+ try {
+ fut.onResult(prepFut.get());
+ }
+ catch (IgniteCheckedException e) {
+ fut.onResult(e);
+ }
+ }
+ });
+ }
+ else {
+ try {
+ cctx.io().send(n, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+ fut.onNodeLeft(e);
+
+ return e;
+ }
+ catch (IgniteCheckedException e) {
+ fut.onResult(e);
+
+ return e;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * @param entry Transaction entry.
+ * @param topVer Topology version.
+ * @param curMapping Current mapping.
+ * @param remap Remap flag.
+ * @param topLocked Topology locked flag.
+ */
+ private void map(
+ IgniteTxEntry entry,
+ AffinityTopologyVersion topVer,
+ Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
+ boolean remap,
+ boolean topLocked
+ ) {
+ GridCacheContext cacheCtx = entry.context();
+
+ List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+
+ txMapping.addMapping(nodes);
+
+ ClusterNode primary = F.first(nodes);
+
+ assert primary != null;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Mapped key to primary node [key=" + entry.key() +
+ ", part=" + cacheCtx.affinity().partition(entry.key()) +
+ ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
+ }
+
+ if (primary.version().compareTo(SER_TX_SINCE) < 0) {
+ onDone(new IgniteCheckedException("Optimistic serializable transactions can be used only with node " +
+ "version starting from " + SER_TX_SINCE));
+
+ return;
+ }
+
+ // Must re-initialize cached entry while holding topology lock.
+ if (cacheCtx.isNear())
+ entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
+ else if (!cacheCtx.isLocal())
+ entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
+ else
+ entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
+
+ if (!remap && (cacheCtx.isNear() || cacheCtx.isLocal())) {
+ if (entry.explicitVersion() == null)
+ keyLockFut.addLockKey(entry.txKey());
+ }
+
+ IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear());
+
+ GridDistributedTxMapping cur = curMapping.get(key);
+
+ if (cur == null) {
+ cur = new GridDistributedTxMapping(primary);
+
+ curMapping.put(key, cur);
+
+ if (primary.isLocal()) {
+ if (entry.context().isNear())
+ tx.nearLocallyMapped(true);
+ else if (entry.context().isColocated())
+ tx.colocatedLocallyMapped(true);
+ }
+
+ // Initialize near flag right away.
+ cur.near(cacheCtx.isNear());
+
+ cur.clientFirst(!topLocked && cctx.kernalContext().clientNode());
+
+ cur.last(true);
+ }
+
+ cur.add(entry);
+
+ if (entry.explicitVersion() != null) {
+ tx.markExplicit(primary.id());
+
+ cur.markExplicitLock();
+ }
+
+ entry.nodeId(primary.id());
+
+ if (cacheCtx.isNear()) {
+ while (true) {
+ try {
+ GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached();
+
+ cached.dhtNodeId(tx.xidVersion(), primary.id());
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ entry.cached(cacheCtx.near().entryEx(entry.key()));
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ Collection<String> futs = F.viewReadOnly(futures(),
+ new C1<IgniteInternalFuture<?>, String>() {
+ @Override public String apply(IgniteInternalFuture<?> f) {
+ return "[node=" + ((MiniFuture)f).node().id() +
+ ", loc=" + ((MiniFuture)f).node().isLocal() +
+ ", done=" + f.isDone() + "]";
+ }
+ },
+ new P1<IgniteInternalFuture<?>>() {
+ @Override public boolean apply(IgniteInternalFuture<?> f) {
+ return isMini(f);
+ }
+ });
+
+ return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, this,
+ "innerFuts", futs,
+ "keyLockFut", keyLockFut,
+ "tx", tx,
+ "super", super.toString());
+ }
+
+ /**
+ *
+ */
+ private class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> {
+ /** */
+ private boolean remap = true;
+
+ /**
+ *
+ */
+ public ClientRemapFuture() {
+ super();
+
+ reducer(new IgniteReducer<GridNearTxPrepareResponse, Boolean>() {
+ @Override public boolean collect(GridNearTxPrepareResponse res) {
+ assert res != null;
+
+ if (res.clientRemapVersion() == null)
+ remap = false;
+
+ return true;
+ }
+
+ @Override public Boolean reduce() {
+ return remap;
+ }
+ });
+ }
+ }
+
+ /**
+ *
+ */
+ private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Keys. */
+ @GridToStringInclude
+ private GridDistributedTxMapping m;
+
+ /** Flag to signal some result being processed. */
+ private AtomicBoolean rcvRes = new AtomicBoolean(false);
+
+ /**
+ * @param m Mapping.
+ */
+ MiniFuture(GridDistributedTxMapping m) {
+ this.m = m;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public ClusterNode node() {
+ return m.node();
+ }
+
+ /**
+ * @return Keys.
+ */
+ public GridDistributedTxMapping mapping() {
+ return m;
+ }
+
+ /**
+ * @param e Error.
+ */
+ void onResult(Throwable e) {
+ if (rcvRes.compareAndSet(false, true)) {
+ onError(m, e);
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+ // Fail.
+ onDone(e);
+ }
+ else
+ U.warn(log, "Received error after another result has been processed [fut=" +
+ GridNearOptimisticSerializableTxPrepareFuture.this + ", mini=" + this + ']', e);
+ }
+
+ /**
+ * @param e Node failure.
+ */
+ void onNodeLeft(ClusterTopologyCheckedException e) {
+ if (isDone())
+ return;
+
+ if (rcvRes.compareAndSet(false, true)) {
+ if (log.isDebugEnabled())
+ log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);
+
+ onError(null, e);
+
+ onDone(e);
+ }
+ }
+
+ /**
+ * @param res Result callback.
+ */
+ @SuppressWarnings("unchecked")
+ void onResult(final GridNearTxPrepareResponse res) {
+ if (isDone())
+ return;
+
+ if (rcvRes.compareAndSet(false, true)) {
+ if (res.error() != null) {
+ // Fail the whole compound future.
+ onError(m, res.error());
+
+ onDone(res.error());
+ }
+ else {
+ if (res.clientRemapVersion() != null) {
+ assert cctx.kernalContext().clientNode();
+ assert m.clientFirst();
+
+ tx.removeMapping(m.node().id());
+
+ ClientRemapFuture remapFut0 = null;
+
+ synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) {
+ if (remapFut == null) {
+ remapFut = new ClientRemapFuture();
+
+ remapFut0 = remapFut;
+ }
+ }
+
+ if (remapFut0 != null) {
+ Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
+
+ for (IgniteInternalFuture<?> fut : futs) {
+ if (isMini(fut) && fut != this)
+ remapFut0.add((MiniFuture)fut);
+ }
+
+ remapFut0.markInitialized();
+
+ remapFut0.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> remapFut0) {
+ try {
+ IgniteInternalFuture<?> affFut =
+ cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+
+ if (affFut == null)
+ affFut = new GridFinishedFuture<Object>();
+
+ if (remapFut.get()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Will remap client tx [" +
+ "fut=" + GridNearOptimisticSerializableTxPrepareFuture.this +
+ ", topVer=" + res.topologyVersion() + ']');
+ }
+
+ synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) {
+ assert remapFut0 == remapFut;
+
+ remapFut = null;
+ }
+
+ affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> affFut) {
+ try {
+ affFut.get();
+
+ remap(res);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
+ "Cluster topology changed while client transaction is preparing.");
+
+ err.retryReadyFuture(affFut);
+
+ onDone(err);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Prepare failed, will not remap tx: " +
+ GridNearOptimisticSerializableTxPrepareFuture.this);
+ }
+
+ onDone(e);
+ }
+ }
+ });
+ }
+ else
+ onDone(res);
+ }
+ else {
+ onPrepareResponse(m, res);
+
+ // Finish this mini future (need result only on client node).
+ onDone(cctx.kernalContext().clientNode() ? res : null);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param res Response.
+ */
+ private void remap(final GridNearTxPrepareResponse res) {
+ prepareOnTopology(true, new Runnable() {
+ @Override public void run() {
+ onDone(res);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+ }
+ }
+
+ /**
+ * Keys lock future.
+ */
+ private class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
+ /** */
+ @GridToStringInclude
+ private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+
+ /** */
+ private volatile boolean allKeysAdded;
+
+ /**
+ * @param key Key to track for locking.
+ */
+ private void addLockKey(IgniteTxKey key) {
+ assert !allKeysAdded;
+
+ lockKeys.add(key);
+ }
+
+ /**
+ * @param key Locked keys.
+ */
+ private void onKeyLocked(IgniteTxKey key) {
+ lockKeys.remove(key);
+
+ checkLocks();
+ }
+
+ /**
+ * Moves future to the ready state.
+ */
+ private void onAllKeysAdded() {
+ allKeysAdded = true;
+
+ checkLocks();
+ }
+
+ /**
+ * @return {@code True} if all locks are owned.
+ */
+ private boolean checkLocks() {
+ boolean locked = lockKeys.isEmpty();
+
+ if (locked && allKeysAdded) {
+ if (log.isDebugEnabled())
+ log.debug("All locks are acquired for near prepare future: " + this);
+
+ onDone((GridNearTxPrepareResponse)null);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
+ }
+
+ return locked;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(KeyLockFuture.class, this, super.toString());
+ }
+ }
+}