You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ra...@apache.org on 2015/10/28 13:07:09 UTC
[28/31] ignite git commit: ignite-1607 Implemented deadlock-free
optimistic serializable tx mode
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 0834e88..fcbf58d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -574,19 +574,42 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
// Nullify explicit version so that innerSet/innerRemove will work as usual.
explicitVer = null;
+ GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null;
+
if (op == CREATE || op == UPDATE) {
// Invalidate only for near nodes (backups cannot be invalidated).
if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear()))
- cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true,
- topVer, null, replicate ? DR_BACKUP : DR_NONE,
+ cached.innerRemove(this,
+ eventNodeId(),
+ nodeId,
+ false,
+ false,
+ true,
+ true,
+ topVer,
+ null,
+ replicate ? DR_BACKUP : DR_NONE,
near() ? null : explicitVer, CU.subjectId(this, cctx),
- resolveTaskName());
+ resolveTaskName(),
+ dhtVer);
else {
- cached.innerSet(this, eventNodeId(), nodeId, val, false, false,
- txEntry.ttl(), true, true, topVer, null,
- replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(),
- near() ? null : explicitVer, CU.subjectId(this, cctx),
- resolveTaskName());
+ cached.innerSet(this,
+ eventNodeId(),
+ nodeId,
+ val,
+ false,
+ false,
+ txEntry.ttl(),
+ true,
+ true,
+ topVer,
+ null,
+ replicate ? DR_BACKUP : DR_NONE,
+ txEntry.conflictExpireTime(),
+ near() ? null : explicitVer,
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ dhtVer);
// Keep near entry up to date.
if (nearCached != null) {
@@ -602,9 +625,20 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
}
else if (op == DELETE) {
- cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true,
- topVer, null, replicate ? DR_BACKUP : DR_NONE,
- near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName());
+ cached.innerRemove(this,
+ eventNodeId(),
+ nodeId,
+ false,
+ false,
+ true,
+ true,
+ topVer,
+ null,
+ replicate ? DR_BACKUP : DR_NONE,
+ near() ? null : explicitVer,
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ dhtVer);
// Keep near entry up to date.
if (nearCached != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
new file mode 100644
index 0000000..721ba4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+
+/**
+ *
+ */
+public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
+ implements GridCacheFuture<Map<K, V>> {
+ /** Default max remap count value. */
+ public static final int DFLT_MAX_REMAP_CNT = 3;
+
+ /** Maximum number of attempts to remap key to the same primary node. */
+ protected static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
+
+ /** Context. */
+ protected final GridCacheContext<K, V> cctx;
+
+ /** Keys. */
+ protected Collection<KeyCacheObject> keys;
+
+ /** Read through flag. */
+ protected boolean readThrough;
+
+ /** Force primary flag. */
+ protected boolean forcePrimary;
+
+ /** Future ID. */
+ protected IgniteUuid futId;
+
+ /** Trackable flag. */
+ protected boolean trackable;
+
+ /** Remap count. */
+ protected AtomicInteger remapCnt = new AtomicInteger();
+
+ /** Subject ID. */
+ protected UUID subjId;
+
+ /** Task name. */
+ protected String taskName;
+
+ /** Whether to deserialize portable objects. */
+ protected boolean deserializePortable;
+
+ /** Skip values flag. */
+ protected boolean skipVals;
+
+ /** Expiry policy. */
+ protected IgniteCacheExpiryPolicy expiryPlc;
+
+ /** Flag indicating that get should be done on a locked topology version. */
+ protected final boolean canRemap;
+
+ /** */
+ protected final boolean needVer;
+
+ /** */
+ protected final boolean keepCacheObjects;
+
+ /**
+ * @param cctx Context.
+ * @param keys Keys.
+ * @param readThrough Read through flag.
+ * @param forcePrimary If {@code true} then will force network trip to primary node even
+ * if called on backup node.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param deserializePortable Deserialize portable flag.
+ * @param expiryPlc Expiry policy.
+ * @param skipVals Skip values flag.
+ * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+ * @param needVer If {@code true} returns values as tuples containing value and version.
+ * @param keepCacheObjects Keep cache objects flag.
+ */
+ protected CacheDistributedGetFutureAdapter(
+ GridCacheContext<K, V> cctx,
+ Collection<KeyCacheObject> keys,
+ boolean readThrough,
+ boolean forcePrimary,
+ @Nullable UUID subjId,
+ String taskName,
+ boolean deserializePortable,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean skipVals,
+ boolean canRemap,
+ boolean needVer,
+ boolean keepCacheObjects
+ ) {
+ super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
+
+ assert !F.isEmpty(keys);
+
+ this.cctx = cctx;
+ this.keys = keys;
+ this.readThrough = readThrough;
+ this.forcePrimary = forcePrimary;
+ this.subjId = subjId;
+ this.taskName = taskName;
+ this.deserializePortable = deserializePortable;
+ this.expiryPlc = expiryPlc;
+ this.skipVals = skipVals;
+ this.canRemap = canRemap;
+ this.needVer = needVer;
+ this.keepCacheObjects = keepCacheObjects;
+
+ futId = IgniteUuid.randomUuid();
+ }
+
+ /**
+ * @param map Result map.
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ */
+ @SuppressWarnings("unchecked")
+ protected final void versionedResult(Map map, KeyCacheObject key, Object val, GridCacheVersion ver) {
+ assert needVer;
+ assert skipVals || val != null;
+ assert ver != null;
+
+ map.put(key, new T2<>(skipVals ? true : val, ver));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 9d02705..bdd1140 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CI3;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -562,7 +563,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/**
* This method is used internally. Use
- * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, AffinityTopologyVersion, UUID, int, IgniteCacheExpiryPolicy, boolean)}
+ * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, AffinityTopologyVersion, UUID, int, IgniteCacheExpiryPolicy, boolean)}
* method instead to retrieve DHT value.
*
* @param keys {@inheritDoc}
@@ -574,7 +575,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
@Nullable Collection<? extends K> keys,
boolean forcePrimary,
boolean skipTx,
- @Nullable GridCacheEntryEx entry,
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
@@ -585,7 +585,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
return getAllAsync(keys,
opCtx == null || !opCtx.skipStore(),
- null,
/*don't check local tx. */false,
subjId,
taskName,
@@ -603,9 +602,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @param taskName Task name.
* @param expiry Expiry policy.
* @param skipVals Skip values flag.
+ * @param canRemap Can remap flag.
* @return Get future.
*/
- IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> getDhtAllAsync(
+ IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> getDhtAllAsync(
Collection<KeyCacheObject> keys,
boolean readThrough,
@Nullable UUID subjId,
@@ -623,7 +623,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
expiry,
skipVals,
/*keep cache objects*/true,
- canRemap);
+ canRemap,
+ /*need version*/true);
}
/**
@@ -631,18 +632,17 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @param msgId Message ID.
* @param keys Keys to get.
* @param readThrough Read through flag.
- * @param reload Reload flag.
* @param topVer Topology version.
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
* @param expiry Expiry policy.
+ * @param skipVals Skip values flag.
* @return DHT future.
*/
public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader,
long msgId,
LinkedHashMap<KeyCacheObject, Boolean> keys,
boolean readThrough,
- boolean reload,
AffinityTopologyVersion topVer,
@Nullable UUID subjId,
int taskNameHash,
@@ -653,7 +653,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
reader,
keys,
readThrough,
- reload,
/*tx*/null,
topVer,
subjId,
@@ -672,6 +671,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
*/
protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) {
assert ctx.affinityNode();
+ assert !req.reload() : req;
long ttl = req.accessTtl();
@@ -682,7 +682,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
req.messageId(),
req.keys(),
req.readThrough(),
- req.reload(),
req.topologyVersion(),
req.subjectId(),
req.taskNameHash(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index be2f3d3..1b2d834 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -163,6 +163,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
* @param topVer Topology version.
* @param threadId Owning thread ID.
* @param ver Lock version.
+ * @param serOrder Version for serializable transactions ordering.
+ * @param serReadVer Optional read entry version for optimistic serializable transaction.
* @param timeout Timeout to acquire lock.
* @param reenter Reentry flag.
* @param tx Tx flag.
@@ -177,10 +179,17 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
AffinityTopologyVersion topVer,
long threadId,
GridCacheVersion ver,
+ @Nullable GridCacheVersion serOrder,
+ @Nullable GridCacheVersion serReadVer,
long timeout,
boolean reenter,
boolean tx,
- boolean implicitSingle) throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
+ boolean implicitSingle)
+ throws GridCacheEntryRemovedException, GridDistributedLockCancelledException
+ {
+ assert serReadVer == null || serOrder != null;
+ assert !reenter || serOrder == null;
+
GridCacheMvccCandidate cand;
GridCacheMvccCandidate prev;
GridCacheMvccCandidate owner;
@@ -213,6 +222,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
threadId,
ver,
timeout,
+ serOrder,
reenter,
tx,
implicitSingle,
@@ -235,12 +245,12 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
val = this.val;
- if (mvcc != null && mvcc.isEmpty())
+ if (mvcc.isEmpty())
mvccExtras(null);
}
// Don't link reentries.
- if (cand != null && !cand.reentry())
+ if (!cand.reentry())
// Link with other candidates in the same thread.
cctx.mvcc().addNext(cctx, cand);
@@ -250,7 +260,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override public boolean tmLock(IgniteInternalTx tx, long timeout)
+ @Override public boolean tmLock(IgniteInternalTx tx,
+ long timeout,
+ @Nullable GridCacheVersion serOrder,
+ GridCacheVersion serReadVer)
throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
if (tx.local()) {
GridDhtTxLocalAdapter dhtTx = (GridDhtTxLocalAdapter)tx;
@@ -262,6 +275,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
tx.topologyVersion(),
tx.threadId(),
tx.xidVersion(),
+ serOrder,
+ serReadVer,
timeout,
/*reenter*/false,
/*tx*/true,
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index a67b1de..7108da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -17,16 +17,16 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
@@ -72,9 +73,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
/** */
private UUID reader;
- /** Reload flag. */
- private boolean reload;
-
/** Read through flag. */
private boolean readThrough;
@@ -120,7 +118,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
* @param reader Reader.
* @param keys Keys.
* @param readThrough Read through flag.
- * @param reload Reload flag.
* @param tx Transaction.
* @param topVer Topology version.
* @param subjId Subject ID.
@@ -134,7 +131,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
UUID reader,
LinkedHashMap<KeyCacheObject, Boolean> keys,
boolean readThrough,
- boolean reload,
@Nullable IgniteTxLocalEx tx,
@NotNull AffinityTopologyVersion topVer,
@Nullable UUID subjId,
@@ -152,7 +148,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
this.msgId = msgId;
this.keys = keys;
this.readThrough = readThrough;
- this.reload = reload;
this.tx = tx;
this.topVer = topVer;
this.subjId = subjId;
@@ -291,8 +286,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
return new GridFinishedFuture<Collection<GridCacheEntryInfo>>(
Collections.<GridCacheEntryInfo>emptyList());
- final Collection<GridCacheEntryInfo> infos = new LinkedList<>();
-
String taskName0 = cctx.kernalContext().job().currentTaskName();
if (taskName0 == null)
@@ -302,89 +295,77 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
GridCompoundFuture<Boolean, Boolean> txFut = null;
- for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) {
- while (true) {
- GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer);
+ ClusterNode readerNode = cctx.discovery().node(reader);
- try {
- GridCacheEntryInfo info = e.info();
+ if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) {
+ for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) {
+ while (true) {
+ GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer);
- // If entry is obsolete.
- if (info == null)
- continue;
+ try {
+ if (e.obsolete())
+ continue;
- boolean addReader = (!e.deleted() && k.getValue() && !skipVals);
+ boolean addReader = (!e.deleted() && k.getValue() && !skipVals);
- if (addReader)
- e.unswap(false);
+ if (addReader)
+ e.unswap(false);
- // Register reader. If there are active transactions for this entry,
- // then will wait for their completion before proceeding.
- // TODO: GG-4003:
- // TODO: What if any transaction we wait for actually removes this entry?
- // TODO: In this case seems like we will be stuck with untracked near entry.
- // TODO: To fix, check that reader is contained in the list of readers once
- // TODO: again after the returned future completes - if not, try again.
- // TODO: Also, why is info read before transactions are complete, and not after?
- IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null;
+ // Register reader. If there are active transactions for this entry,
+ // then will wait for their completion before proceeding.
+ // TODO: GG-4003:
+ // TODO: What if any transaction we wait for actually removes this entry?
+ // TODO: In this case seems like we will be stuck with untracked near entry.
+ // TODO: To fix, check that reader is contained in the list of readers once
+ // TODO: again after the returned future completes - if not, try again.
+ IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null;
- if (f != null) {
- if (txFut == null)
- txFut = new GridCompoundFuture<>(CU.boolReducer());
-
- txFut.add(f);
- }
+ if (f != null) {
+ if (txFut == null)
+ txFut = new GridCompoundFuture<>(CU.boolReducer());
- infos.add(info);
+ txFut.add(f);
+ }
- break;
- }
- catch (IgniteCheckedException err) {
- return new GridFinishedFuture<>(err);
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry when getting a DHT value: " + e);
- }
- finally {
- cctx.evicts().touch(e, topVer);
+ break;
+ }
+ catch (IgniteCheckedException err) {
+ return new GridFinishedFuture<>(err);
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry when getting a DHT value: " + e);
+ }
+ finally {
+ cctx.evicts().touch(e, topVer);
+ }
}
}
- }
- if (txFut != null)
- txFut.markInitialized();
+ if (txFut != null)
+ txFut.markInitialized();
+ }
- IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> fut;
+ IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
if (txFut == null || txFut.isDone()) {
- if (reload && cctx.readThrough() && cctx.store().configured()) {
- fut = cache().reloadAllAsync0(keys.keySet(),
- true,
- skipVals,
+ if (tx == null) {
+ fut = cache().getDhtAllAsync(
+ keys.keySet(),
+ readThrough,
subjId,
- taskName);
+ taskName,
+ expiryPlc,
+ skipVals,
+ /*can remap*/true);
}
else {
- if (tx == null) {
- fut = cache().getDhtAllAsync(
- keys.keySet(),
- readThrough,
- subjId,
- taskName,
- expiryPlc,
- skipVals,
- /*can remap*/true);
- }
- else {
- fut = tx.getAllAsync(cctx,
- keys.keySet(),
- null,
- /*deserialize portable*/false,
- skipVals,
- /*keep cache objects*/true,
- /*skip store*/!readThrough);
- }
+ fut = tx.getAllAsync(cctx,
+ keys.keySet(),
+ /*deserialize portable*/false,
+ skipVals,
+ /*keep cache objects*/true,
+ /*skip store*/!readThrough);
}
}
else {
@@ -393,38 +374,28 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
// transactions to complete.
fut = new GridEmbeddedFuture<>(
txFut,
- new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, CacheObject>>>() {
- @Override public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> apply(Boolean b, Exception e) {
+ new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() {
+ @Override public IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> apply(Boolean b, Exception e) {
if (e != null)
throw new GridClosureException(e);
- if (reload && cctx.readThrough() && cctx.store().configured()) {
- return cache().reloadAllAsync0(keys.keySet(),
- true,
- skipVals,
+ if (tx == null) {
+ return cache().getDhtAllAsync(
+ keys.keySet(),
+ readThrough,
subjId,
- taskName);
+ taskName,
+ expiryPlc,
+ skipVals,
+ /*can remap*/true);
}
else {
- if (tx == null) {
- return cache().getDhtAllAsync(
- keys.keySet(),
- readThrough,
- subjId,
- taskName,
- expiryPlc,
- skipVals,
- /*can remap*/true);
- }
- else {
- return tx.getAllAsync(cctx,
- keys.keySet(),
- null,
- /*deserialize portable*/false,
- skipVals,
- /*keep cache objects*/true,
- /*skip store*/!readThrough);
- }
+ return tx.getAllAsync(cctx,
+ keys.keySet(),
+ /*deserialize portable*/false,
+ skipVals,
+ /*keep cache objects*/true,
+ /*skip store*/!readThrough);
}
}
}
@@ -432,23 +403,29 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
}
return new GridEmbeddedFuture<>(
- new C2<Map<KeyCacheObject, CacheObject>, Exception, Collection<GridCacheEntryInfo>>() {
- @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, CacheObject> map, Exception e) {
+ new C2<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>, Exception, Collection<GridCacheEntryInfo>>() {
+ @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e) {
if (e != null) {
onDone(e);
return Collections.emptyList();
}
else {
- for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) {
- GridCacheEntryInfo info = it.next();
+ Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size());
- Object v = map.get(info.key());
+ for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) {
+ T2<CacheObject, GridCacheVersion> val = entry.getValue();
- if (v == null)
- it.remove();
- else
- info.value(skipVals ? null : (CacheObject)v);
+ assert val != null;
+
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ info.cacheId(cctx.cacheId());
+ info.key(entry.getKey());
+ info.value(skipVals ? null : val.get1());
+ info.version(val.get2());
+
+ infos.add(info);
}
return infos;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 4f3e97d..c175b0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -380,9 +380,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
* @return Lock candidate.
* @throws GridCacheEntryRemovedException If entry was removed.
* @throws GridDistributedLockCancelledException If lock is canceled.
+ * @throws IgniteCheckedException If failed.
*/
@Nullable public GridCacheMvccCandidate addEntry(GridDhtCacheEntry entry)
- throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
+ throws GridCacheEntryRemovedException, GridDistributedLockCancelledException, IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Adding entry: " + entry);
@@ -400,6 +401,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
topVer,
threadId,
lockVer,
+ null,
+ null,
timeout,
/*reenter*/false,
inTx(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index c09a611..4ce4759 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -672,9 +672,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (log.isDebugEnabled())
log.debug("Got removed entry when adding lock (will retry): " + entry);
}
- catch (GridDistributedLockCancelledException e) {
+ catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
if (log.isDebugEnabled())
- log.debug("Got lock request for cancelled lock (will fail): " + entry);
+ log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');
return new GridDhtFinishedFuture<>(e);
}
@@ -1106,62 +1106,55 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (tx == null || !tx.isRollbackOnly()) {
GridCacheVersion dhtVer = req.dhtVersion(i);
- try {
- GridCacheVersion ver = e.version();
-
- boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver);
-
- CacheObject val = null;
-
- if (ret)
- val = e.innerGet(tx,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast.*/false,
- /*unmarshal*/false,
- /*update-metrics*/true,
- /*event notification*/req.returnValue(i),
- /*temporary*/false,
- CU.subjectId(tx, ctx.shared()),
- null,
- tx != null ? tx.resolveTaskName() : null,
- null);
-
- assert e.lockedBy(mappedVer) ||
- (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) :
- "Entry does not own lock for tx [locNodeId=" + ctx.localNodeId() +
- ", entry=" + e +
- ", mappedVer=" + mappedVer + ", ver=" + ver +
- ", tx=" + tx + ", req=" + req +
- ", err=" + err + ']';
-
- boolean filterPassed = false;
-
- if (tx != null && tx.onePhaseCommit()) {
- IgniteTxEntry writeEntry = tx.entry(ctx.txKey(e.key()));
-
- assert writeEntry != null :
- "Missing tx entry for locked cache entry: " + e;
-
- filterPassed = writeEntry.filtersPassed();
- }
-
- if (ret && val == null)
- val = e.valueBytes(null);
-
- // We include values into response since they are required for local
- // calls and won't be serialized. We are also including DHT version.
- res.addValueBytes(
- ret ? val : null,
- filterPassed,
- ver,
- mappedVer);
- }
- catch (GridCacheFilterFailedException ex) {
- assert false : "Filter should never fail if fail-fast is false.";
+ GridCacheVersion ver = e.version();
+
+ boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver);
+
+ CacheObject val = null;
+
+ if (ret)
+ val = e.innerGet(tx,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast.*/false,
+ /*unmarshal*/false,
+ /*update-metrics*/true,
+ /*event notification*/req.returnValue(i),
+ /*temporary*/false,
+ CU.subjectId(tx, ctx.shared()),
+ null,
+ tx != null ? tx.resolveTaskName() : null,
+ null);
+
+ assert e.lockedBy(mappedVer) ||
+ (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) :
+ "Entry does not own lock for tx [locNodeId=" + ctx.localNodeId() +
+ ", entry=" + e +
+ ", mappedVer=" + mappedVer + ", ver=" + ver +
+ ", tx=" + tx + ", req=" + req +
+ ", err=" + err + ']';
+
+ boolean filterPassed = false;
- ex.printStackTrace();
+ if (tx != null && tx.onePhaseCommit()) {
+ IgniteTxEntry writeEntry = tx.entry(ctx.txKey(e.key()));
+
+ assert writeEntry != null :
+ "Missing tx entry for locked cache entry: " + e;
+
+ filterPassed = writeEntry.filtersPassed();
}
+
+ if (ret && val == null)
+ val = e.valueBytes(null);
+
+ // We include values into response since they are required for local
+ // calls and won't be serialized. We are also including DHT version.
+ res.addValueBytes(
+ ret ? val : null,
+ filterPassed,
+ ver,
+ mappedVer);
}
else {
// We include values into response since they are required for local
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 4f8469f..44f34aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -595,7 +595,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (finish(false) || state() == UNKNOWN)
fut.finish();
else
- fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this)));
+ fut.onError(new IgniteCheckedException("Failed to rollback transaction: " + CU.txString(this)));
}
catch (IgniteTxOptimisticCheckedException e) {
if (log.isDebugEnabled())
@@ -627,7 +627,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (finish(false) || state() == UNKNOWN)
fut.finish();
else
- fut.onError(new IgniteCheckedException("Failed to commit transaction: " +
+ fut.onError(new IgniteCheckedException("Failed to rollback transaction: " +
CU.txString(GridDhtTxLocal.this)));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index a15a334..d806801 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
@@ -57,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridLeanSet;
@@ -429,9 +429,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
catch (GridCacheEntryRemovedException e) {
assert false : "Got entry removed exception while holding transactional lock on entry: " + e;
}
- catch (GridCacheFilterFailedException e) {
- assert false : "Got filter failed exception with fail fast false " + e;
- }
}
}
@@ -472,8 +469,18 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (log.isDebugEnabled())
log.debug("Marking all local candidates as ready: " + this);
- Iterable<IgniteTxEntry> checkEntries = writes;
+ readyLocks(writes);
+
+ if (tx.serializable() && tx.optimistic())
+ readyLocks(reads);
+ locksReady = true;
+ }
+
+ /**
+ * @param checkEntries Entries.
+ */
+ private void readyLocks(Iterable<IgniteTxEntry> checkEntries) {
for (IgniteTxEntry txEntry : checkEntries) {
GridCacheContext cacheCtx = txEntry.context();
@@ -513,8 +520,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
}
}
-
- locksReady = true;
}
/**
@@ -813,12 +818,19 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
this.writes = writes;
this.txNodes = txNodes;
- if (!F.isEmpty(writes)) {
+ boolean ser = tx.serializable() && tx.optimistic();
+
+ if (!F.isEmpty(writes) || (ser && !F.isEmpty(reads))) {
Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
for (IgniteTxEntry entry : writes)
forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
+ if (ser) {
+ for (IgniteTxEntry entry : reads)
+ forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
+ }
+
forceKeysFut = forceRebalanceKeys(forceKeys);
}
@@ -847,7 +859,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
IgniteTxEntry e,
Map<Integer, Collection<KeyCacheObject>> map
) {
- if (retVal || !F.isEmpty(e.entryProcessors()) || !F.isEmpty(e.filters())) {
+ if (retVal ||
+ !F.isEmpty(e.entryProcessors()) ||
+ !F.isEmpty(e.filters()) ||
+ e.serializableReadVersion() != null) {
if (map == null)
map = new HashMap<>();
@@ -906,10 +921,86 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
/**
+ * @param entries Entries.
+ * @return Not null exception if version check failed.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable private IgniteCheckedException checkReadConflict(Iterable<IgniteTxEntry> entries)
+ throws IgniteCheckedException {
+ try {
+ for (IgniteTxEntry entry : entries) {
+ GridCacheVersion serReadVer = entry.serializableReadVersion();
+
+ if (serReadVer != null) {
+ entry.cached().unswap();
+
+ if (!entry.cached().checkSerializableReadVersion(serReadVer))
+ return versionCheckError(entry);
+ }
+ }
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert false : "Got removed exception on entry with dht local candidate: " + entries;
+ }
+
+ return null;
+ }
+
+ /**
+ * @param entry Entry.
+ * @return Optimistic version check error.
+ */
+ private IgniteTxOptimisticCheckedException versionCheckError(IgniteTxEntry entry) {
+ GridCacheContext cctx = entry.context();
+
+ return new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " +
+ "read/write conflict [key=" + entry.key().value(cctx.cacheObjectContext(), false) +
+ ", cache=" + cctx.name() + ']');
+ }
+
+ /**
*
*/
private void prepare0() {
try {
+ if (tx.serializable() && tx.optimistic()) {
+ IgniteCheckedException err0;
+
+ try {
+ err0 = checkReadConflict(writes);
+
+ if (err0 == null)
+ err0 = checkReadConflict(reads);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to check entry version: " + e, e);
+
+ err0 = e;
+ }
+
+ if (err0 != null) {
+ err.compareAndSet(null, err0);
+
+ final GridNearTxPrepareResponse res = createPrepareResponse();
+
+ tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+ if (GridDhtTxPrepareFuture.super.onDone(res, res.error())) {
+ try {
+ if (replied.compareAndSet(false, true))
+ sendPrepareResponse(res);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send prepare response for transaction: " + tx, e);
+ }
+ }
+ }
+ });
+
+ return;
+ }
+ }
+
// We are holding transaction-level locks for entries here, so we can get next write version.
onEntriesLocked();
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index a68e834..febe9ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -24,11 +24,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -39,8 +37,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -49,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -58,6 +53,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -65,83 +61,30 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
-
/**
* Colocated get future.
*/
-public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
- implements GridCacheFuture<Map<K, V>> {
+public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> {
/** */
private static final long serialVersionUID = 0L;
- /** Default max remap count value. */
- public static final int DFLT_MAX_REMAP_CNT = 3;
-
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
private static IgniteLogger log;
- /** Maximum number of attempts to remap key to the same primary node. */
- private static final int MAX_REMAP_CNT = IgniteSystemProperties.getInteger(IGNITE_NEAR_GET_MAX_REMAPS,
- DFLT_MAX_REMAP_CNT);
-
- /** Context. */
- private final GridCacheContext<K, V> cctx;
-
- /** Keys. */
- private Collection<KeyCacheObject> keys;
-
/** Topology version. */
private AffinityTopologyVersion topVer;
- /** Reload flag. */
- private boolean reload;
-
- /** Read-through flag. */
- private boolean readThrough;
-
- /** Force primary flag. */
- private boolean forcePrimary;
-
- /** Future ID. */
- private IgniteUuid futId;
-
/** Version. */
private GridCacheVersion ver;
- /** Trackable flag. */
- private volatile boolean trackable;
-
- /** Remap count. */
- private AtomicInteger remapCnt = new AtomicInteger();
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name. */
- private String taskName;
-
- /** Whether to deserialize portable objects. */
- private boolean deserializePortable;
-
- /** Expiry policy. */
- private IgniteCacheExpiryPolicy expiryPlc;
-
- /** Skip values flag. */
- private boolean skipVals;
-
- /** Flag indicating whether future can be remapped on a newer topology version. */
- private final boolean canRemap;
-
/**
* @param cctx Context.
* @param keys Keys.
* @param topVer Topology version.
* @param readThrough Read through flag.
- * @param reload Reload flag.
* @param forcePrimary If {@code true} then will force network trip to primary node even
* if called on backup node.
* @param subjId Subject ID.
@@ -149,39 +92,39 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
* @param deserializePortable Deserialize portable flag.
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
+ * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+ * @param needVer If {@code true} returns values as tuples containing value and version.
+ * @param keepCacheObjects Keep cache objects flag.
*/
public GridPartitionedGetFuture(
GridCacheContext<K, V> cctx,
Collection<KeyCacheObject> keys,
AffinityTopologyVersion topVer,
boolean readThrough,
- boolean reload,
boolean forcePrimary,
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ boolean needVer,
+ boolean keepCacheObjects
) {
- super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
-
- assert !F.isEmpty(keys);
+ super(cctx,
+ keys,
+ readThrough,
+ forcePrimary,
+ subjId,
+ taskName,
+ deserializePortable,
+ expiryPlc,
+ skipVals,
+ canRemap,
+ needVer,
+ keepCacheObjects);
- this.cctx = cctx;
- this.keys = keys;
this.topVer = topVer;
- this.readThrough = readThrough;
- this.reload = reload;
- this.forcePrimary = forcePrimary;
- this.subjId = subjId;
- this.deserializePortable = deserializePortable;
- this.taskName = taskName;
- this.expiryPlc = expiryPlc;
- this.skipVals = skipVals;
- this.canRemap = canRemap;
-
- futId = IgniteUuid.randomUuid();
ver = cctx.versions().next();
@@ -351,7 +294,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
-1,
mappedKeys,
readThrough,
- reload,
topVer,
subjId,
taskName == null ? 0 : taskName.hashCode(),
@@ -404,7 +346,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
ver,
mappedKeys,
readThrough,
- reload,
topVer,
subjId,
taskName == null ? 0 : taskName.hashCode(),
@@ -452,10 +393,10 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
while (true) {
- GridCacheEntryEx entry = null;
+ GridCacheEntryEx entry;
try {
- if (!reload && allowLocRead) {
+ if (allowLocRead) {
try {
entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
colocated.peekEx(key);
@@ -464,18 +405,40 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
if (entry != null) {
boolean isNew = entry.isNewLocked();
- CacheObject v = entry.innerGet(null,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc);
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiryPlc);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+ }
+ }
+ else {
+ v = entry.innerGet(null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc);
+ }
colocated.context().evicts().touch(entry, topVer);
@@ -485,7 +448,16 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
colocated.removeIfObsolete(key);
}
else {
- cctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true);
+ if (needVer)
+ versionedResult(locVals, key, v, ver);
+ else
+ cctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ true);
return false;
}
@@ -536,14 +508,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
catch (GridCacheEntryRemovedException ignored) {
// No-op, will retry.
}
- catch (GridCacheFilterFailedException e) {
- if (log.isDebugEnabled())
- log.debug("Filter validation failed for entry: " + e);
-
- colocated.context().evicts().touch(entry, topVer);
-
- break;
- }
}
return remote;
@@ -591,7 +555,16 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
for (GridCacheEntryInfo info : infos) {
assert skipVals == (info.value() == null);
- cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable, false);
+ if (needVer)
+ versionedResult(map, info.key(), info.value(), info.version());
+ else
+ cctx.addResult(map,
+ info.key(),
+ info.value(),
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
}
return map;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index cba6872..4cd9e84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -306,7 +306,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable final Collection<? extends K> keys,
final boolean forcePrimary,
boolean skipTx,
- @Nullable final GridCacheEntryEx entry,
@Nullable UUID subjId,
final String taskName,
final boolean deserializePortable,
@@ -334,7 +333,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
@Override public IgniteInternalFuture<Map<K, V>> apply() {
return getAllAsync0(ctx.cacheKeysView(keys),
- false,
forcePrimary,
subjId0,
taskName,
@@ -920,7 +918,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* Entry point to all public API get methods.
*
* @param keys Keys to remove.
- * @param reload Reload flag.
* @param forcePrimary Force primary flag.
* @param subjId Subject ID.
* @param taskName Task name.
@@ -931,7 +928,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @return Get future.
*/
private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys,
- boolean reload,
boolean forcePrimary,
UUID subjId,
String taskName,
@@ -947,7 +943,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
// Optimisation: try to resolve value locally and escape 'get future' creation.
- if (!reload && !forcePrimary) {
+ if (!forcePrimary) {
Map<K, V> locVals = U.newHashMap(keys.size());
boolean success = true;
@@ -997,10 +993,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
catch (GridCacheEntryRemovedException ignored) {
// No-op, retry.
}
- catch (GridCacheFilterFailedException ignored) {
- // No-op, skip the key.
- break;
- }
catch (GridDhtInvalidPartitionException ignored) {
success = false;
@@ -1036,14 +1028,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
keys,
topVer,
!skipStore,
- reload,
forcePrimary,
subjId,
taskName,
deserializePortable,
expiry,
skipVals,
- canRemap);
+ canRemap,
+ false,
+ false);
fut.init();
@@ -1663,6 +1656,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (idx != null) {
GridDhtCacheEntry entry = entries.get(idx);
+
try {
GridCacheVersion ver = entry.version();
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 6d69198..f03b461 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
@@ -68,6 +67,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -188,7 +188,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Nullable final Collection<? extends K> keys,
boolean forcePrimary,
boolean skipTx,
- @Nullable final GridCacheEntryEx entry,
@Nullable UUID subjId,
String taskName,
final boolean deserializePortable,
@@ -212,7 +211,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
return tx.getAllAsync(ctx,
ctx.cacheKeysView(keys),
- entry,
deserializePortable,
skipVals,
false,
@@ -230,7 +228,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
return loadAsync(
ctx.cacheKeysView(keys),
opCtx == null || !opCtx.skipStore(),
- false,
forcePrimary,
topVer,
subjId,
@@ -257,7 +254,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
/**
* @param keys Keys to load.
* @param readThrough Read through flag.
- * @param reload Reload flag.
* @param forcePrimary Force get from primary node flag.
* @param topVer Topology version.
* @param subjId Subject ID.
@@ -265,12 +261,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param deserializePortable Deserialize portable flag.
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
+ * @param canRemap Can remap flag.
* @return Loaded values.
*/
public IgniteInternalFuture<Map<K, V>> loadAsync(
@Nullable Collection<KeyCacheObject> keys,
boolean readThrough,
- boolean reload,
boolean forcePrimary,
AffinityTopologyVersion topVer,
@Nullable UUID subjId,
@@ -278,7 +274,45 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
- boolean canRemap
+ boolean canRemap) {
+ return loadAsync(keys,
+ readThrough,
+ forcePrimary,
+ topVer, subjId,
+ taskName,
+ deserializePortable,
+ expiryPlc,
+ skipVals,
+ canRemap,
+ false,
+ false);
+ }
+
+ /**
+ * @param keys Keys to load.
+ * @param readThrough Read through flag.
+ * @param forcePrimary Force get from primary node flag.
+ * @param topVer Topology version.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param deserializePortable Deserialize portable flag.
+ * @param expiryPlc Expiry policy.
+ * @param skipVals Skip values flag.
+ * @return Loaded values.
+ */
+ public IgniteInternalFuture<Map<K, V>> loadAsync(
+ @Nullable Collection<KeyCacheObject> keys,
+ boolean readThrough,
+ boolean forcePrimary,
+ AffinityTopologyVersion topVer,
+ @Nullable UUID subjId,
+ String taskName,
+ boolean deserializePortable,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean skipVals,
+ boolean canRemap,
+ boolean needVer,
+ boolean keepCacheObj
) {
if (keys == null || keys.isEmpty())
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -287,8 +321,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
expiryPlc = expiryPolicy(null);
// Optimisation: try to resolve value locally and escape 'get future' creation.
- if (!reload && !forcePrimary) {
- Map<K, V> locVals = U.newHashMap(keys.size());
+ if (!forcePrimary) {
+ Map<K, V> locVals = null;
boolean success = true;
@@ -304,18 +338,40 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (entry != null) {
boolean isNew = entry.isNewLocked();
- CacheObject v = entry.innerGet(null,
- /*swap*/true,
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc);
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiryPlc);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+ }
+ }
+ else {
+ v = entry.innerGet(null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc);
+ }
// Entry was not in memory or in swap, so we remove it from cache.
if (v == null) {
@@ -326,8 +382,22 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
success = false;
}
- else
- ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true);
+ else {
+ if (locVals == null)
+ locVals = U.newHashMap(keys.size());
+
+ if (needVer)
+ locVals.put((K)key, (V)new T2<>((Object)(skipVals ? true : v), ver));
+ else {
+ ctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObj,
+ deserializePortable,
+ true);
+ }
+ }
}
else
success = false;
@@ -337,10 +407,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
catch (GridCacheEntryRemovedException ignored) {
// No-op, retry.
}
- catch (GridCacheFilterFailedException ignored) {
- // No-op, skip the key.
- break;
- }
catch (GridDhtInvalidPartitionException ignored) {
success = false;
@@ -377,14 +443,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
keys,
topVer,
readThrough,
- reload,
forcePrimary,
subjId,
taskName,
deserializePortable,
expiryPlc,
skipVals,
- canRemap);
+ canRemap,
+ needVer,
+ keepCacheObj);
fut.init();
@@ -803,10 +870,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (log.isDebugEnabled())
log.debug("Got removed entry when adding lock (will retry): " + entry);
}
- catch (GridDistributedLockCancelledException e) {
+ catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
if (log.isDebugEnabled())
- log.debug("Got lock request for cancelled lock (will ignore): " +
- entry);
+ log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');
fut.onError(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 53c2b63..365b46b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -323,7 +323,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
inTx(),
inTx() && tx.implicitSingle(),
false,
- false);
+ false,
+ null);
cand.topologyVersion(topVer.get());
}
@@ -342,7 +343,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
inTx(),
inTx() && tx.implicitSingle(),
false,
- false);
+ false,
+ null);
cand.topologyVersion(topVer.get());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 82054d9..1bf03a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -385,7 +385,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
@Nullable Collection<? extends K> keys,
boolean forcePrimary,
boolean skipTx,
- @Nullable GridCacheEntryEx entry,
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
@@ -406,7 +405,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
return loadAsync(null,
ctx.cacheKeysView(keys),
- false,
forcePrimary,
subjId,
taskName,