You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/13 09:05:37 UTC
[04/21] ignite git commit: ignite-1607 WIP
ignite-1607 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6849ebe1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6849ebe1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6849ebe1
Branch: refs/heads/ignite-1607
Commit: 6849ebe10779265cbd22f1afb35bb40c12529881
Parents: 54bbc75
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 7 10:33:34 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 7 17:31:09 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryEx.java | 5 +-
.../processors/cache/GridCacheMapEntry.java | 5 +-
.../dht/CacheDistributedGetFutureAdapter.java | 203 +++
.../distributed/dht/GridDhtCacheEntry.java | 2 +-
.../dht/GridPartitionedGetFuture.java | 106 +-
.../dht/colocated/GridDhtColocatedCache.java | 1 +
.../distributed/near/GridNearGetFuture.java | 101 +-
...arOptimisticSerializableTxPrepareFuture.java | 2 +-
.../cache/distributed/near/GridNearTxLocal.java | 42 +-
.../cache/transactions/IgniteTxAdapter.java | 6 +-
.../cache/transactions/IgniteTxEntry.java | 3 +-
.../transactions/IgniteTxLocalAdapter.java | 457 ++++---
.../CacheSerializableTransactionsTest.java | 1230 +++++++++++++++---
.../processors/cache/GridCacheTestEntryEx.java | 4 +-
14 files changed, 1629 insertions(+), 538 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index bc36d2c..9106b05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -309,6 +309,7 @@ public interface GridCacheEntryEx {
throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException;
/**
+ * @param tx Cache transaction.
* @param readSwap Flag indicating whether to check swap memory.
* @param unmarshal Unmarshal flag.
* @param updateMetrics If {@code true} then metrics should be updated.
@@ -321,7 +322,9 @@ public interface GridCacheEntryEx {
* @throws IgniteCheckedException If loading value failed.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned(boolean readSwap,
+ @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ IgniteInternalTx tx,
+ boolean readSwap,
boolean unmarshal,
boolean updateMetrics,
boolean evt,
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b22f9b4..9378017 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -691,6 +691,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/** {@inheritDoc} */
@Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ IgniteInternalTx tx,
boolean readSwap,
boolean unmarshal,
boolean updateMetrics,
@@ -700,7 +701,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc)
throws IgniteCheckedException, GridCacheEntryRemovedException {
- return (T2<CacheObject, GridCacheVersion>)innerGet0(null,
+ return (T2<CacheObject, GridCacheVersion>)innerGet0(tx,
readSwap,
false,
evt,
@@ -711,7 +712,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
transformClo,
taskName,
expiryPlc,
- false);
+ true);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
new file mode 100644
index 0000000..459362b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+
+/**
+ *
+ */
+public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
+ implements GridCacheFuture<Map<K, V>> {
+ /** Default max remap count value. */
+ public static final int DFLT_MAX_REMAP_CNT = 3;
+
+ /** Maximum number of attempts to remap key to the same primary node. */
+ protected static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
+
+ /** Context. */
+ protected final GridCacheContext<K, V> cctx;
+
+ /** Keys. */
+ protected Collection<KeyCacheObject> keys;
+
+ /** Reload flag. */
+ protected boolean reload;
+
+ /** Read through flag. */
+ protected boolean readThrough;
+
+ /** Force primary flag. */
+ protected boolean forcePrimary;
+
+ /** Future ID. */
+ protected IgniteUuid futId;
+
+ /** Trackable flag. */
+ protected boolean trackable;
+
+ /** Remap count. */
+ protected AtomicInteger remapCnt = new AtomicInteger();
+
+ /** Subject ID. */
+ protected UUID subjId;
+
+ /** Task name. */
+ protected String taskName;
+
+ /** Whether to deserialize portable objects. */
+ protected boolean deserializePortable;
+
+ /** Skip values flag. */
+ protected boolean skipVals;
+
+ /** Expiry policy. */
+ protected IgniteCacheExpiryPolicy expiryPlc;
+
+ /** Flag indicating that get should be done on a locked topology version. */
+ protected final boolean canRemap;
+
+ /** */
+ protected final boolean needVer;
+
+ /** */
+ protected final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC;
+
+ /**
+ * @param cctx Context.
+ * @param keys Keys.
+ * @param readThrough Read through flag.
+ * @param reload Reload flag.
+ * @param forcePrimary If {@code true} then will force network trip to primary node even
+ * if called on backup node.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param deserializePortable Deserialize portable flag.
+ * @param expiryPlc Expiry policy.
+ * @param skipVals Skip values flag.
+ * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+ * @param resC Closure applied on 'get' result.
+ * @param needVer If {@code true} need provide entry version to result closure.
+ */
+ protected CacheDistributedGetFutureAdapter(
+ GridCacheContext<K, V> cctx,
+ Collection<KeyCacheObject> keys,
+ boolean readThrough,
+ boolean reload,
+ boolean forcePrimary,
+ @Nullable UUID subjId,
+ String taskName,
+ boolean deserializePortable,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean skipVals,
+ boolean canRemap,
+ boolean needVer,
+ @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC
+ ) {
+ super(cctx.kernalContext(),
+ resC != null ? new ResultClosureReducer<K, V>(keys.size()) : CU.<K, V>mapsReducer(keys.size()));
+
+ assert !F.isEmpty(keys);
+ assert !needVer || resC != null;
+
+ this.cctx = cctx;
+ this.keys = keys;
+ this.readThrough = readThrough;
+ this.reload = reload;
+ this.forcePrimary = forcePrimary;
+ this.subjId = subjId;
+ this.taskName = taskName;
+ this.deserializePortable = deserializePortable;
+ this.expiryPlc = expiryPlc;
+ this.skipVals = skipVals;
+ this.canRemap = canRemap;
+ this.needVer = needVer;
+ this.resC = resC;
+
+ futId = IgniteUuid.randomUuid();
+ }
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ */
+ @SuppressWarnings("unchecked")
+ protected final void resultClosureValue(KeyCacheObject key, Object val, GridCacheVersion ver) {
+ assert resC != null;
+
+ ResultClosureReducer<K, V> rdc = (ResultClosureReducer)reducer();
+
+ assert rdc != null;
+
+ rdc.collect(key);
+
+ resC.apply(key, val, ver);
+ }
+
+ /**
+ *
+ */
+ private static class ResultClosureReducer<K, V> implements IgniteReducer<Map<K, V>, Map<K, V>> {
+ /** */
+ private final ConcurrentHashMap8<KeyCacheObject, Boolean> map;
+
+ /**
+ * @param keys Number of keys.
+ */
+ public ResultClosureReducer(int keys) {
+ this.map = new ConcurrentHashMap8<>(keys);
+ }
+
+ /**
+ * @param key Key.
+ */
+ void collect(KeyCacheObject key) {
+ map.put(key, Boolean.TRUE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean collect(@Nullable Map<K, V> map) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> reduce() {
+ return (Map)map;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 90f6551..0004f02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -199,7 +199,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
if (serReadVer != null) {
if (!serReadVer.equals(this.ver)) {
- if (!(isNewLocked() && serReadVer.equals(IgniteTxEntry.SER_READ_NEW_ENTRY_VER)))
+ if (!((isNew() || deleted()) && serReadVer.equals(IgniteTxEntry.READ_NEW_ENTRY_VER)))
return null;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 6b8c2ab..d8456d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -24,11 +24,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -40,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -49,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridInClosure3;
@@ -67,83 +63,25 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
-
/**
* Colocated get future.
*/
-public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
- implements GridCacheFuture<Map<K, V>> {
+public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> {
/** */
private static final long serialVersionUID = 0L;
- /** Default max remap count value. */
- public static final int DFLT_MAX_REMAP_CNT = 3;
-
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
private static IgniteLogger log;
- /** Maximum number of attempts to remap key to the same primary node. */
- private static final int MAX_REMAP_CNT = IgniteSystemProperties.getInteger(IGNITE_NEAR_GET_MAX_REMAPS,
- DFLT_MAX_REMAP_CNT);
-
- /** Context. */
- private final GridCacheContext<K, V> cctx;
-
- /** Keys. */
- private Collection<KeyCacheObject> keys;
-
/** Topology version. */
private AffinityTopologyVersion topVer;
- /** Reload flag. */
- private boolean reload;
-
- /** Read-through flag. */
- private boolean readThrough;
-
- /** Force primary flag. */
- private boolean forcePrimary;
-
- /** Future ID. */
- private IgniteUuid futId;
-
/** Version. */
private GridCacheVersion ver;
- /** Trackable flag. */
- private volatile boolean trackable;
-
- /** Remap count. */
- private AtomicInteger remapCnt = new AtomicInteger();
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name. */
- private String taskName;
-
- /** Whether to deserialize portable objects. */
- private boolean deserializePortable;
-
- /** Expiry policy. */
- private IgniteCacheExpiryPolicy expiryPlc;
-
- /** Skip values flag. */
- private boolean skipVals;
-
- /** Flag indicating whether future can be remapped on a newer topology version. */
- private final boolean canRemap;
-
- /** */
- private final boolean needVer;
-
- /** */
- private final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC;
-
/**
* @param cctx Context.
* @param keys Keys.
@@ -158,6 +96,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
* @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+ * @param needVer If {@code true} need provide entry version to result closure.
* @param resC Closure applied on 'get' result.
*/
public GridPartitionedGetFuture(
@@ -176,27 +115,21 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
boolean needVer,
@Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC
) {
- super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
+ super(cctx,
+ keys,
+ readThrough,
+ reload,
+ forcePrimary,
+ subjId,
+ taskName,
+ deserializePortable,
+ expiryPlc,
+ skipVals,
+ canRemap,
+ needVer,
+ resC);
- assert !F.isEmpty(keys);
- assert !needVer || resC != null;
-
- this.cctx = cctx;
- this.keys = keys;
this.topVer = topVer;
- this.readThrough = readThrough;
- this.reload = reload;
- this.forcePrimary = forcePrimary;
- this.subjId = subjId;
- this.deserializePortable = deserializePortable;
- this.taskName = taskName;
- this.expiryPlc = expiryPlc;
- this.skipVals = skipVals;
- this.canRemap = canRemap;
- this.needVer = needVer;
- this.resC = resC;
-
- futId = IgniteUuid.randomUuid();
ver = cctx.versions().next();
@@ -331,7 +264,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
final int keysSize = keys.size();
- Map<K, V> locVals = U.newHashMap(keysSize);
+ Map<K, V> locVals = resC == null ? U.<K, V>newHashMap(keysSize) : null;
boolean hasRmtNodes = false;
@@ -342,7 +275,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
if (isDone())
return;
- if (!locVals.isEmpty())
+ if (!F.isEmpty(locVals))
add(new GridFinishedFuture<>(locVals));
if (hasRmtNodes) {
@@ -483,6 +416,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
if (needVer) {
T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
/*swap*/true,
/*unmarshal*/true,
/**update-metrics*/false,
@@ -521,7 +455,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
}
else {
if (resC != null)
- resC.apply(key, skipVals ? true : v, ver);
+ resultClosureValue(key, skipVals ? true : v, ver);
else
cctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true);
@@ -628,7 +562,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
for (GridCacheEntryInfo info : infos) {
assert skipVals == (info.value() == null);
- resC.apply(info.key(), skipVals ? true : info.value(), info.version());
+ resultClosureValue(info.key(), skipVals ? true : info.value(), info.version());
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index de82068..6b6352f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -353,6 +353,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (needVer) {
T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
/*swap*/true,
/*unmarshal*/true,
/**update-metrics*/false,
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 3b70325..9ed63ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -24,7 +24,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -39,18 +38,17 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridInClosure3;
@@ -68,83 +66,26 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
-import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
/**
*
*/
-public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
- implements GridCacheFuture<Map<K, V>> {
+public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> {
/** */
private static final long serialVersionUID = 0L;
- /** Default max remap count value. */
- public static final int DFLT_MAX_REMAP_CNT = 3;
-
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
private static IgniteLogger log;
- /** Maximum number of attempts to remap key to the same primary node. */
- private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
-
- /** Context. */
- private final GridCacheContext<K, V> cctx;
-
- /** Keys. */
- private Collection<KeyCacheObject> keys;
-
- /** Reload flag. */
- private boolean reload;
-
- /** Read through flag. */
- private boolean readThrough;
-
- /** Force primary flag. */
- private boolean forcePrimary;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Version. */
- private GridCacheVersion ver;
-
/** Transaction. */
private IgniteTxLocalEx tx;
- /** Trackable flag. */
- private boolean trackable;
-
- /** Remap count. */
- private AtomicInteger remapCnt = new AtomicInteger();
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name. */
- private String taskName;
-
- /** Whether to deserialize portable objects. */
- private boolean deserializePortable;
-
- /** Skip values flag. */
- private boolean skipVals;
-
- /** Expiry policy. */
- private IgniteCacheExpiryPolicy expiryPlc;
-
- /** Flag indicating that get should be done on a locked topology version. */
- private final boolean canRemap;
-
/** */
- private final boolean needVer;
-
- /** */
- private final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC;
+ private GridCacheVersion ver;
/**
* @param cctx Context.
@@ -159,6 +100,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
* @param deserializePortable Deserialize portable flag.
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
+ * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+ * @param needVer If {@code true} need provide entry version to result closure.
+ * @param resC Closure applied on 'get' result.
*/
public GridNearGetFuture(
GridCacheContext<K, V> cctx,
@@ -176,25 +120,24 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
boolean needVer,
@Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC
) {
- super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
+ super(cctx,
+ keys,
+ readThrough,
+ reload,
+ forcePrimary,
+ subjId,
+ taskName,
+ deserializePortable,
+ expiryPlc,
+ skipVals,
+ canRemap,
+ needVer,
+ resC);
assert !F.isEmpty(keys);
assert !needVer || resC != null;
- this.cctx = cctx;
- this.keys = keys;
- this.readThrough = readThrough;
- this.reload = reload;
- this.forcePrimary = forcePrimary;
this.tx = tx;
- this.subjId = subjId;
- this.taskName = taskName;
- this.deserializePortable = deserializePortable;
- this.expiryPlc = expiryPlc;
- this.skipVals = skipVals;
- this.canRemap = canRemap;
- this.needVer = needVer;
- this.resC = resC;
futId = IgniteUuid.randomUuid();
@@ -474,6 +417,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
if (isNear) {
if (needVer) {
T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
/*swap*/true,
/*unmarshal*/true,
/**update-metrics*/true,
@@ -520,6 +464,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
if (needVer) {
T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
/*swap*/true,
/*unmarshal*/true,
/**update-metrics*/false,
@@ -595,7 +540,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
}
else
- resC.apply(key, v, ver);
+ resultClosureValue(key, v, ver);
}
else {
if (affNode == null) {
@@ -761,7 +706,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
assert skipVals == (info.value() == null);
if (resC != null)
- resC.apply(key, skipVals ? true : val, info.version());
+ resultClosureValue(key, skipVals ? true : val, info.version());
else
cctx.addResult(map, key, val, skipVals, false, deserializePortable, false);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 4a7efb4..e48601d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -570,7 +570,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
// Must lock near entries separately.
if (m.near()) {
try {
- tx.optimisticLockEntries(req.writes());
+ tx.optimisticLockEntries(m.entries());
tx.userPrepare();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 2c2915f..c43cab5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
@@ -362,7 +363,23 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
needVer,
c).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
@Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
- return true;
+ try {
+ Map<Object, Object> map = f.get();
+
+ if (map != null && map.size() != keys.size()) {
+ for (KeyCacheObject key : keys) {
+ if (!map.containsKey(key))
+ c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER);
+ }
+ }
+
+ return true;
+ }
+ catch (Exception e) {
+ setRollbackOnly();
+
+ throw new GridClosureException(e);
+ }
}
});
}
@@ -383,7 +400,23 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
c
).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
@Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
- return true;
+ try {
+ Map<Object, Object> map = f.get();
+
+ if (map != null && map.size() != keys.size()) {
+ for (KeyCacheObject key : keys) {
+ if (!map.containsKey(key))
+ c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER);
+ }
+ }
+
+ return true;
+ }
+ catch (Exception e) {
+ setRollbackOnly();
+
+ throw new GridClosureException(e);
+ }
}
});
} else {
@@ -868,7 +901,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
@Nullable Collection<IgniteTxEntry> reads,
@Nullable Collection<IgniteTxEntry> writes,
- Map<UUID, Collection<UUID>> txNodes, boolean last,
+ Map<UUID, Collection<UUID>> txNodes,
+ boolean last,
Collection<UUID> lastBackups
) {
if (state() != PREPARING) {
@@ -896,7 +930,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
try {
// At this point all the entries passed in must be enlisted in transaction because this is an
// optimistic transaction.
- optimisticLockEntries = writes;
+ optimisticLockEntries = optimistic() && serializable() ? F.concat(false, writes, reads) : writes;
userPrepare();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 0286efe..c896f6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -325,7 +325,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
threadId = Thread.currentThread().getId();
- log = U.logger(cctx.kernalContext(), logRef, this);
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, this);
}
/**
@@ -374,7 +375,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
implicitSingle = false;
loc = false;
- log = U.logger(cctx.kernalContext(), logRef, this);
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, this);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index f1cd2d4..7929167 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -67,7 +67,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
private static final long serialVersionUID = 0L;
/** Dummy version for non-existing entry read in SERIALIZABLE transaction. */
- public static final GridCacheVersion SER_READ_NEW_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0);
+ public static final GridCacheVersion READ_NEW_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0);
/** Owning transaction. */
@GridToStringExclude
@@ -322,6 +322,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
cp.conflictVer = conflictVer;
cp.expiryPlc = expiryPlc;
cp.flags = flags;
+ cp.serReadVer = serReadVer;
return cp;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 2b745ac..76df164 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
@@ -1268,11 +1269,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
AffinityTopologyVersion topVer = topologyVersion();
+ boolean needReadVer = optimistic() && serializable();
+
// In this loop we cover only read-committed or optimistic transactions.
// Transactions that are pessimistic and not read-committed are covered
// outside of this loop.
for (KeyCacheObject key : keys) {
- if (pessimistic() && !readCommitted() && !skipVals)
+ if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
addActiveCache(cacheCtx);
IgniteTxKey txKey = cacheCtx.txKey(key);
@@ -1370,24 +1373,42 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
GridCacheVersion ver = entry.version();
CacheObject val = null;
+ GridCacheVersion readVer = null;
if (!pessimistic() || readCommitted() && !skipVals) {
IgniteCacheExpiryPolicy accessPlc =
optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
- // This call will check for filter.
- val = entry.innerGet(this,
- /*swap*/true,
- /*no read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /*metrics*/true,
- /*event*/true,
- /*temporary*/false,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- accessPlc);
+ if (needReadVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*event*/true,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ accessPlc);
+
+ if (res != null) {
+ val = res.get1();
+ readVer = res.get2();
+ }
+ }
+ else {
+ val = entry.innerGet(this,
+ /*swap*/true,
+ /*no read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*event*/true,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ accessPlc);
+ }
if (val != null) {
cacheCtx.addResult(map,
@@ -1421,8 +1442,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// As optimization, mark as checked immediately
// for non-pessimistic if value is not null.
- if (val != null && !pessimistic())
+ if (val != null && !pessimistic()) {
txEntry.markValid();
+
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.serializableReadVersion(readVer);
+ }
+ }
}
break; // While.
@@ -1532,9 +1560,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (log.isDebugEnabled())
log.debug("Loading missed values for missed map: " + missedMap);
- final Collection<KeyCacheObject> loaded = U.newHashSet(missedMap.size());
+ final Collection<KeyCacheObject> loaded =
+ readCommitted() ? U.<KeyCacheObject>newHashSet(missedMap.size()) : null;
- final boolean needVer = optimistic() && serializable();
+ final boolean needReadVer = optimistic() && serializable();
return new GridEmbeddedFuture<>(
new C2<Boolean, Exception, Map<K, V>>() {
@@ -1555,27 +1584,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
}
- // In read-committed mode touch entries that have just been read.
- boolean touch = readCommitted();
+ if (readCommitted()) {
+ assert loaded != null;
- for (KeyCacheObject key : missedMap.keySet()) {
- if (loaded.contains(key))
- continue;
+ Collection<KeyCacheObject> notFound = new HashSet<>(missedMap.keySet());
- GridCacheVersion ver = needVer ? IgniteTxEntry.SER_READ_NEW_ENTRY_VER : null;
+ notFound.removeAll(loaded);
- onLoaded(key,
- null,
- ver,
- cacheCtx,
- map,
- missedMap,
- deserializePortable,
- skipVals,
- keepCacheObjects,
- loaded);
+ // In read-committed mode touch entries that have just been read.
+ for (KeyCacheObject key : notFound) {
+ if (loaded.contains(key))
+ continue;
- if (touch) {
IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
@@ -1596,174 +1616,141 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
missedMap.keySet(),
deserializePortable,
skipVals,
- needVer,
+ needReadVer,
new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
- @Override public void apply(KeyCacheObject key,
- @Nullable Object val,
- @Nullable GridCacheVersion loadVer) {
- onLoaded(key,
- val,
- loadVer,
- cacheCtx,
- map,
- missedMap,
- deserializePortable,
- skipVals,
- keepCacheObjects,
- loaded);
- }
- })
- );
- }
+ /** */
+ private GridCacheVersion nextVer;
- /**
- * @param key Key.
- * @param val Value.
- * @param loadVer Entry version.
- * @param cacheCtx Cache context.
- * @param map Return map.
- * @param missedMap Missed keys.
- * @param deserializePortable Deserialize portable flag.
- * @param skipVals Skip values flag.
- * @param keepCacheObjects Keep cache objects flag.
- * @param loaded Loaded values map.
- */
- private <K, V> void onLoaded(
- KeyCacheObject key,
- @Nullable Object val,
- @Nullable GridCacheVersion loadVer,
- GridCacheContext cacheCtx,
- Map<K, V> map,
- Map<KeyCacheObject, GridCacheVersion> missedMap,
- final boolean deserializePortable,
- boolean skipVals,
- boolean keepCacheObjects,
- Collection<KeyCacheObject> loaded) {
- if (isRollbackOnly()) {
- if (log.isDebugEnabled())
- log.debug("Ignoring loaded value for read because transaction was rolled back: " +
- IgniteTxLocalAdapter.this);
+ @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) {
+ if (isRollbackOnly()) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring loaded value for read because transaction was rolled back: " +
+ IgniteTxLocalAdapter.this);
- return;
- }
+ return;
+ }
- GridCacheVersion ver = missedMap.get(key);
+ GridCacheVersion ver = missedMap.get(key);
- if (ver == null) {
- if (log.isDebugEnabled())
- log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
+ if (ver == null) {
+ if (log.isDebugEnabled())
+ log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
- return;
- }
+ return;
+ }
- CacheObject cacheVal = cacheCtx.toCacheObject(val);
+ CacheObject cacheVal = cacheCtx.toCacheObject(val);
- CacheObject visibleVal = cacheVal;
+ CacheObject visibleVal = cacheVal;
- IgniteTxKey txKey = cacheCtx.txKey(key);
+ IgniteTxKey txKey = cacheCtx.txKey(key);
- IgniteTxEntry txEntry = entry(txKey);
+ IgniteTxEntry txEntry = entry(txKey);
- if (txEntry != null) {
- if (!readCommitted())
- txEntry.readValue(cacheVal);
+ if (txEntry != null) {
+ if (!readCommitted())
+ txEntry.readValue(cacheVal);
- if (!F.isEmpty(txEntry.entryProcessors()))
- visibleVal = txEntry.applyEntryProcessors(visibleVal);
- }
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ visibleVal = txEntry.applyEntryProcessors(visibleVal);
+ }
- // In pessimistic mode we hold the lock, so filter validation
- // should always be valid.
- if (pessimistic())
- ver = null;
+ // In pessimistic mode we hold the lock, so filter validation
+ // should always be valid.
+ if (pessimistic())
+ ver = null;
- // Initialize next version.
- GridCacheVersion nextVer = cctx.versions().next(topologyVersion());
+ // Initialize next version.
+ if (nextVer == null)
+ nextVer = cctx.versions().next(topologyVersion());
- while (true) {
- assert txEntry != null || readCommitted() || skipVals;
+ while (true) {
+ assert txEntry != null || readCommitted() || skipVals;
- GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
+ GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
- try {
- // Must initialize to true since even if filter didn't pass,
- // we still record the transaction value.
- boolean set;
+ try {
+ // Must initialize to true since even if filter didn't pass,
+ // we still record the transaction value.
+ boolean set;
- try {
- set = e.versionedValue(cacheVal, ver, nextVer);
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in transaction getAll method " +
- "(will try again): " + e);
+ try {
+ set = e.versionedValue(cacheVal, ver, nextVer);
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction getAll method " +
+ "(will try again): " + e);
- if (pessimistic() && !readCommitted() && !isRollbackOnly()) {
- U.error(log, "Inconsistent transaction state (entry got removed while " +
- "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
+ if (pessimistic() && !readCommitted() && !isRollbackOnly()) {
+ U.error(log, "Inconsistent transaction state (entry got removed while " +
+ "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
- setRollbackOnly();
+ setRollbackOnly();
- return;
- }
+ return;
+ }
- if (txEntry != null)
- txEntry.cached(entryEx(cacheCtx, txKey));
+ if (txEntry != null)
+ txEntry.cached(entryEx(cacheCtx, txKey));
- continue; // While loop.
- }
+ continue; // While loop.
+ }
- // In pessimistic mode, we should always be able to set.
- assert set || !pessimistic();
+ // In pessimistic mode, we should always be able to set.
+ assert set || !pessimistic();
- if (readCommitted() || skipVals) {
- cacheCtx.evicts().touch(e, topologyVersion());
+ if (readCommitted() || skipVals) {
+ cacheCtx.evicts().touch(e, topologyVersion());
- if (visibleVal != null) {
- cacheCtx.addResult(map,
- key,
- visibleVal,
- skipVals,
- keepCacheObjects,
- deserializePortable,
- false);
- }
- }
- else {
- assert txEntry != null;
+ if (visibleVal != null) {
+ cacheCtx.addResult(map,
+ key,
+ visibleVal,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
+ }
+ }
+ else {
+ assert txEntry != null;
- txEntry.setAndMarkValid(cacheVal);
+ txEntry.setAndMarkValid(cacheVal);
- if (optimistic() && serializable()) {
- assert loadVer != null;
+ if (needReadVer) {
+ assert loadVer != null;
- txEntry.serializableReadVersion(loadVer);
- }
+ txEntry.serializableReadVersion(loadVer);
+ }
- if (visibleVal != null) {
- cacheCtx.addResult(map,
- key,
- visibleVal,
- skipVals,
- keepCacheObjects,
- deserializePortable,
- false);
- }
- }
+ if (visibleVal != null) {
+ cacheCtx.addResult(map,
+ key,
+ visibleVal,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
+ }
+ }
- if (val != null)
- loaded.add(key);
+ if (readCommitted())
+ loaded.add(key);
- if (log.isDebugEnabled())
- log.debug("Set value loaded from store into entry from transaction [set=" + set +
- ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
+ if (log.isDebugEnabled())
+ log.debug("Set value loaded from store into entry from transaction [set=" + set +
+ ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
- break; // While loop.
- }
- catch (IgniteCheckedException ex) {
- throw new IgniteException("Failed to put value for cache entry: " + e, ex);
- }
- }
+ break; // While loop.
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException("Failed to put value for cache entry: " + e, ex);
+ }
+ }
+ }
+ })
+ );
}
/** {@inheritDoc} */
@@ -2066,14 +2053,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/**
* Checks filter for non-pessimistic transactions.
*
- * @param cached Cached entry.
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
* @param filter Filter to check.
* @return {@code True} if passed or pessimistic.
- * @throws IgniteCheckedException If failed.
*/
- private <K, V> boolean filter(GridCacheEntryEx cached,
- CacheEntryPredicate[] filter) throws IgniteCheckedException {
- return pessimistic() || (optimistic() && implicit()) || cached.context().isAll(cached, filter);
+ private boolean filter(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ CacheObject val,
+ CacheEntryPredicate[] filter) {
+ return pessimistic() || (optimistic() && implicit()) || isAll(cctx, key, val, filter);
}
/**
@@ -2097,7 +2088,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
* @param skipStore Skip store flag.
* @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
*/
- protected <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
+ private <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
final GridCacheContext cacheCtx,
Collection<?> keys,
@Nullable GridCacheEntryEx cached,
@@ -2108,7 +2099,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
@Nullable Object[] invokeArgs,
boolean retval,
boolean lockOnly,
- CacheEntryPredicate[] filter,
+ final CacheEntryPredicate[] filter,
final GridCacheReturn ret,
Collection<KeyCacheObject> enlisted,
@Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
@@ -2117,6 +2108,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
) {
assert cached == null || keys.size() == 1;
assert cached == null || F.first(keys).equals(cached.key());
+ assert retval || invokeMap == null;
try {
addActiveCache(cacheCtx);
@@ -2131,6 +2123,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
Set<KeyCacheObject> missedForLoad = null;
+ final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ final boolean needReadVer = (retval || hasFilters) && (optimistic() && serializable());
+
try {
// Set transform flag for transaction.
if (invokeMap != null)
@@ -2210,24 +2205,40 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
", locNodeId=" + cctx.localNodeId() + ']');
CacheObject old = null;
-
- boolean readThrough = !skipStore && !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ GridCacheVersion readVer = null;
if (optimistic() && !implicit()) {
try {
- // Should read through if filter is specified.
- old = entry.innerGet(this,
- /*swap*/false,
- /*read-through*/readThrough && cacheCtx.loadPreviousValue(),
- /*fail-fast*/false,
- /*unmarshal*/retval,
- /*metrics*/retval,
- /*events*/retval,
- /*temporary*/false,
- CU.subjectId(this, cctx),
- entryProcessor,
- resolveTaskName(),
- null);
+ if (needReadVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this,
+ /*swap*/false,
+ /*unmarshal*/retval,
+ /*metrics*/retval,
+ /*events*/retval,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null);
+
+ if (res != null) {
+ old = res.get1();
+ readVer = res.get2();
+ }
+ }
+ else {
+ old = entry.innerGet(this,
+ /*swap*/false,
+ /*read-through*/false,
+ /*fail-fast*/false,
+ /*unmarshal*/retval,
+ /*metrics*/retval,
+ /*events*/retval,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null);
+ }
}
catch (ClusterTopologyCheckedException e) {
entry.context().evicts().touch(entry, topologyVersion());
@@ -2243,12 +2254,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
else
old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
- if (!filter(entry, filter)) {
+ if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
skipped = skip(skipped, cacheKey);
ret.set(cacheCtx, old, false);
- if (!readCommitted() && old != null) {
+ if (!readCommitted()) {
// Enlist failed filters as reads for non-read-committed mode,
// so future ops will get the same values.
txEntry = addEntry(READ,
@@ -2265,9 +2276,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
skipStore);
txEntry.markValid();
+
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.serializableReadVersion(readVer);
+ }
}
- if (readCommitted() || old == null)
+ if (readCommitted())
cacheCtx.evicts().touch(entry, topologyVersion());
break; // While.
@@ -2298,7 +2315,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
txEntry.markValid();
if (old == null) {
- boolean load = retval && !readThrough;
+ boolean load = retval || hasFilters;
if (load) {
if (missedForLoad == null)
@@ -2317,6 +2334,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
}
else {
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.serializableReadVersion(readVer);
+ }
+
if (retval && !transform)
ret.set(cacheCtx, old, true);
else {
@@ -2362,7 +2385,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
else {
if (entryProcessor == null && txEntry.op() == TRANSFORM)
throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
- "transaction after transform closure is applied): " + key);
+ "transaction after EntryProcessor is applied): " + key);
GridCacheEntryEx entry = txEntry.cached();
@@ -2371,7 +2394,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
boolean del = txEntry.op() == DELETE && rmv;
if (!del) {
- if (!filter(entry, filter)) {
+ if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
skipped = skip(skipped, cacheKey);
ret.set(cacheCtx, v, false);
@@ -2439,7 +2462,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
missedForLoad,
deserializePortables(cacheCtx),
/*skip values*/false,
- false,
+ needReadVer,
new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
@Override public void apply(KeyCacheObject key,
@Nullable Object val,
@@ -2451,6 +2474,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
assert e != null;
+ if (needReadVer) {
+ assert loadVer != null;
+
+ e.serializableReadVersion(loadVer);
+ }
+
CacheObject cacheVal = cacheCtx.toCacheObject(val);
if (e.op() == TRANSFORM) {
@@ -2470,8 +2499,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
addInvokeResult(e, cacheVal, ret, ver);
}
- else
- ret.set(cacheCtx, cacheVal, true);
+ else {
+ boolean success = hasFilters ? isAll(e.context(), key, cacheVal, filter) : true;
+
+ ret.set(cacheCtx, cacheVal, success);
+ }
}
});
@@ -2491,6 +2523,31 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param filter Filter.
+ * @return {@code True} if filter passed.
+ */
+ private boolean isAll(GridCacheContext cctx,
+ KeyCacheObject key,
+ CacheObject val,
+ CacheEntryPredicate[] filter) {
+ GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) {
+ @Nullable @Override public CacheObject peekVisibleValue() {
+ return rawGet();
+ }
+ };
+
+ for (CacheEntryPredicate p0 : filter) {
+ if (p0 != null && !p0.apply(e))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* Post lock processing for put or remove.
*
* @param cacheCtx Context.