You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/04 16:03:22 UTC
[14/50] [abbrv] ignite git commit: IGNITE-7764: MVCC: cache API
support. This closes #4725.
IGNITE-7764: MVCC: cache API support. This closes #4725.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f7f834bf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f7f834bf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f7f834bf
Branch: refs/heads/ignite-5797
Commit: f7f834bfaf8c4170ab852e829554c8ab5b373b77
Parents: 6f39115
Author: AMRepo <an...@gmail.com>
Authored: Fri Sep 28 15:57:24 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Sep 28 15:57:24 2018 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 6 +-
.../communication/GridIoMessageFactory.java | 14 +-
.../processors/cache/GridCacheAdapter.java | 29 +-
.../processors/cache/GridCacheEntryEx.java | 12 +-
.../processors/cache/GridCacheMapEntry.java | 164 +++--
.../cache/GridCacheUpdateTxResult.java | 23 +-
.../cache/IgniteCacheOffheapManager.java | 24 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 48 +-
.../processors/cache/IgniteCacheProxyImpl.java | 2 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 134 +++-
.../dht/GridDhtTxAbstractEnlistFuture.java | 72 +-
.../distributed/dht/GridDhtTxEnlistFuture.java | 147 ++++
.../dht/GridDhtTxQueryAbstractEnlistFuture.java | 83 +++
.../dht/GridDhtTxQueryEnlistFuture.java | 23 +-
.../dht/GridDhtTxQueryResultsEnlistFuture.java | 37 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 4 +
.../dht/GridPartitionedSingleGetFuture.java | 3 +-
.../distributed/dht/NearTxResultHandler.java | 128 ++++
.../dht/colocated/GridDhtColocatedCache.java | 138 ++--
.../GridNearPessimisticTxPrepareFuture.java | 9 +-
.../near/GridNearTxAbstractEnlistFuture.java | 20 +-
.../near/GridNearTxEnlistFuture.java | 683 +++++++++++++++++++
.../near/GridNearTxEnlistRequest.java | 642 +++++++++++++++++
.../near/GridNearTxEnlistResponse.java | 372 ++++++++++
.../cache/distributed/near/GridNearTxLocal.java | 416 +++++++++--
.../GridNearTxQueryAbstractEnlistFuture.java | 36 +
.../near/GridNearTxQueryEnlistFuture.java | 4 +-
.../near/GridNearTxQueryEnlistResponse.java | 3 +-
.../GridNearTxQueryResultsEnlistFuture.java | 5 +-
.../GridNearTxQueryResultsEnlistResponse.java | 2 +-
.../cache/mvcc/MvccProcessorImpl.java | 3 +-
.../cache/mvcc/MvccQueryTrackerImpl.java | 10 +-
.../persistence/GridCacheOffheapManager.java | 15 +-
.../transactions/IgniteTxLocalAdapter.java | 5 +-
.../cache/transactions/IgniteTxManager.java | 6 +-
.../cache/tree/mvcc/data/MvccUpdateDataRow.java | 88 ++-
.../cache/tree/mvcc/data/ResultType.java | 4 +-
.../processors/cache/GridCacheTestEntryEx.java | 7 +-
.../IgniteCacheTxIteratorSelfTest.java | 10 +
...vccAbstractBasicCoordinatorFailoverTest.java | 25 +-
...acheMvccAbstractCoordinatorFailoverTest.java | 21 -
.../mvcc/CacheMvccAbstractFeatureTest.java | 2 +-
.../cache/mvcc/CacheMvccAbstractTest.java | 132 ++--
.../cache/mvcc/CacheMvccTransactionsTest.java | 596 +++++++---------
.../DataStreamProcessorMvccSelfTest.java | 5 +
.../configvariations/ConfigVariations.java | 1 -
.../query/h2/DhtResultSetEnlistFuture.java | 4 +-
.../query/h2/NearResultSetEnlistFuture.java | 3 -
...sactionsCommandsWithMvccEnabledSelfTest.java | 78 +--
...cheMvccSelectForUpdateQueryAbstractTest.java | 2 +
.../mvcc/CacheMvccSqlQueriesAbstractTest.java | 4 +
.../mvcc/CacheMvccSqlTxQueriesAbstractTest.java | 15 +-
.../mvcc/MvccRepeatableReadBulkOpsTest.java | 441 ++++++++++++
.../mvcc/MvccRepeatableReadOperationsTest.java | 276 ++++++++
.../testsuites/IgniteCacheMvccSqlTestSuite.java | 6 +
.../ApiParity/IgniteConfigurationParityTest.cs | 5 +-
56 files changed, 4168 insertions(+), 879 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 9512bae..bcb9ef4 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -38,12 +38,14 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.GridCodegenConverter;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -168,7 +170,7 @@ public class MessageCodeGenerator {
// gen.generateAll(true);
-// gen.generateAndWrite(GridNearTxQueryResultsEnlistRequest.class);
+ gen.generateAndWrite(GridNearTxEnlistResponse.class);
// gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 41c75be..389d8c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -110,6 +110,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
@@ -1066,7 +1068,17 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- // [-3..119] [124..129] [-23..-27] [-36..-55]- this
+ case 159:
+ msg = new GridNearTxEnlistRequest();
+
+ break;
+
+ case 160:
+ msg = new GridNearTxEnlistResponse();
+
+ break;
+
+ // [-3..119] [124..129] [-23..-27] [-36..-55]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
// [2048..2053] - Snapshots
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 476b083..cf9337b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -154,7 +154,9 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Adapter for different cache implementations.
@@ -1940,6 +1942,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
if (tx == null || tx.implicit()) {
+ assert !ctx.mvccEnabled() || mvccSnapshot != null;
+
Map<KeyCacheObject, EntryGetResult> misses = null;
Set<GridCacheEntryEx> newLocalEntries = null;
@@ -1978,7 +1982,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
boolean skipEntry = readNoEntry;
if (readNoEntry) {
- CacheDataRow row = mvccSnapshot != null ? ctx.offheap().mvccRead(ctx, key, mvccSnapshot) :
+ CacheDataRow row = mvccSnapshot != null ?
+ ctx.offheap().mvccRead(ctx, key, mvccSnapshot) :
ctx.offheap().read(ctx, key);
if (row != null) {
@@ -3411,7 +3416,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKeys(keys);
- //TODO IGNITE-7764
+ //TODO: IGNITE-9324: add explicit locks support.
MvccUtils.verifyMvccOperationSupport(ctx, "Lock");
IgniteInternalFuture<Boolean> fut = lockAllAsync(keys, timeout);
@@ -3442,7 +3447,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKey(key);
- //TODO IGNITE-7764
+ //TODO: IGNITE-9324: add explicit locks support.
MvccUtils.verifyMvccOperationSupport(ctx, "Lock");
return lockAllAsync(Collections.singletonList(key), timeout);
@@ -4213,11 +4218,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
true,
op.single(),
ctx.systemTx() ? ctx : null,
- OPTIMISTIC,
- READ_COMMITTED,
+ ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC,
+ ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED,
tCfg.getDefaultTxTimeout(),
!ctx.skipStore(),
- false,
+ ctx.mvccEnabled(),
0,
null
);
@@ -4315,11 +4320,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
true,
op.single(),
ctx.systemTx() ? ctx : null,
- OPTIMISTIC,
- READ_COMMITTED,
+ ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC,
+ ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED,
txCfg.getDefaultTxTimeout(),
!skipStore,
- false,
+ ctx.mvccEnabled(),
0,
null);
@@ -4996,11 +5001,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
true,
op.single(),
ctx.systemTx() ? ctx : null,
- OPTIMISTIC,
- READ_COMMITTED,
+ ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC,
+ ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED,
CU.transactionConfiguration(ctx, ctx.kernalContext().config()).getDefaultTxTimeout(),
opCtx == null || !opCtx.skipStore(),
- false,
+ ctx.mvccEnabled(),
0,
null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 18fa820..2e96a9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -352,6 +352,8 @@ public interface GridCacheEntryEx {
* @param op Cache operation.
* @param needHistory Whether to collect rows created or affected by the current tx.
* @param noCreate Entry should not be created when enabled, e.g. SQL INSERT.
+ * @param filter Filter.
+ * @param retVal Previous value return flag.
* @return Tuple containing success flag and old value. If success is {@code false},
* then value is {@code null}.
* @throws IgniteCheckedException If storing value failed.
@@ -366,7 +368,9 @@ public interface GridCacheEntryEx {
MvccSnapshot mvccVer,
GridCacheOperation op,
boolean needHistory,
- boolean noCreate) throws IgniteCheckedException, GridCacheEntryRemovedException;
+ boolean noCreate,
+ @Nullable CacheEntryPredicate filter,
+ boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
* @param tx Cache transaction.
@@ -374,6 +378,8 @@ public interface GridCacheEntryEx {
* @param topVer Topology version.
* @param mvccVer Mvcc version.
* @param needHistory Whether to collect rows created or affected by the current tx.
+ * @param filter Filter.
+ * @param retVal Previous value return flag.
* @return Tuple containing success flag and old value. If success is {@code false},
* then value is {@code null}.
* @throws IgniteCheckedException If storing value failed.
@@ -384,7 +390,9 @@ public interface GridCacheEntryEx {
UUID affNodeId,
AffinityTopologyVersion topVer,
MvccSnapshot mvccVer,
- boolean needHistory) throws IgniteCheckedException, GridCacheEntryRemovedException;
+ boolean needHistory,
+ @Nullable CacheEntryPredicate filter,
+ boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
* @param tx Transaction adapter.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 8fe559d..f58a3dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateDataRow;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.ResultType;
import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
@@ -1046,7 +1047,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
MvccSnapshot mvccVer,
GridCacheOperation op,
boolean needHistory,
- boolean noCreate) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ boolean noCreate,
+ CacheEntryPredicate filter,
+ boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert tx != null;
final boolean valid = valid(tx.topologyVersion());
@@ -1087,7 +1090,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert val != null;
res = cctx.offheap().mvccUpdate(
- this, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate);
+ this, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate, filter, retVal);
assert res != null;
@@ -1100,7 +1103,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (res.resultType() == ResultType.VERSION_MISMATCH)
throw new IgniteSQLException("Mvcc version mismatch.", CONCURRENT_UPDATE);
- else if (noCreate && res.resultType() == ResultType.PREV_NULL)
+ else if (res.resultType() == ResultType.FILTERED || (noCreate && res.resultType() == ResultType.PREV_NULL))
return new GridCacheUpdateTxResult(false);
else if (res.resultType() == ResultType.LOCKED) {
unlockEntry();
@@ -1112,7 +1115,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer);
lockFut.listen(new MvccUpdateLockListener(tx, this, affNodeId, topVer, val, ttl0, mvccVer,
- op, needHistory, noCreate, resFut));
+ op, needHistory, noCreate, filter, retVal, resFut));
return new GridCacheUpdateTxResult(false, resFut);
}
@@ -1141,17 +1144,18 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
counters.incrementUpdateCounter(cctx.cacheId(), partition());
}
- if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
+ if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) {
logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
- cctx.cacheId(),
- key,
- val,
- res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE,
- tx.nearXidVersion(),
- newVer,
- expireTime,
- key.partition(),
- 0L)));
+ cctx.cacheId(),
+ key,
+ val,
+ res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE,
+ tx.nearXidVersion(),
+ newVer,
+ expireTime,
+ key.partition(),
+ 0L)));
+ }
update(val, expireTime, ttl, newVer, true);
@@ -1172,6 +1176,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) :
new GridCacheUpdateTxResult(false, logPtr);
+ CacheDataRow oldRow = ((MvccUpdateDataRow)res).oldRow();
+
+ if(retVal && (res.resultType() == ResultType.PREV_NOT_NULL || res.resultType() == ResultType.VERSION_FOUND)) {
+ assert oldRow != null;
+
+ updRes.prevValue(oldRow.value());
+ }
+
updRes.mvccHistory(res.history());
return updRes;
@@ -1183,7 +1195,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
UUID affNodeId,
AffinityTopologyVersion topVer,
MvccSnapshot mvccVer,
- boolean needHistory) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ boolean needHistory,
+ @Nullable CacheEntryPredicate filter,
+ boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert tx != null;
assert mvccVer != null;
@@ -1204,13 +1218,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert newVer != null : "Failed to get write version for tx: " + tx;
- res = cctx.offheap().mvccRemove(this, mvccVer, tx.local(), needHistory);
+ res = cctx.offheap().mvccRemove(this, mvccVer, tx.local(), needHistory, filter, retVal);
assert res != null;
if (res.resultType() == ResultType.VERSION_MISMATCH)
throw new IgniteSQLException("Mvcc version mismatch.", CONCURRENT_UPDATE);
- else if (res.resultType() == ResultType.PREV_NULL)
+ else if (res.resultType() == ResultType.PREV_NULL || res.resultType() == ResultType.FILTERED)
return new GridCacheUpdateTxResult(false);
else if (res.resultType() == ResultType.LOCKED) {
unlockEntry();
@@ -1222,7 +1236,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer);
lockFut.listen(new MvccRemoveLockListener(tx, this, affNodeId, topVer, mvccVer, needHistory,
- resFut));
+ resFut, retVal, filter));
return new GridCacheUpdateTxResult(false, resFut);
}
@@ -1265,6 +1279,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) :
new GridCacheUpdateTxResult(false, logPtr);
+ CacheDataRow oldRow = ((MvccUpdateDataRow)res).oldRow();
+
+ if(retVal && (res.resultType() == ResultType.PREV_NOT_NULL || res.resultType() == ResultType.VERSION_FOUND)) {
+ assert oldRow != null;
+
+ updRes.prevValue(oldRow.value());
+ }
+
updRes.mvccHistory(res.history());
return updRes;
@@ -2264,12 +2286,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (updateMetrics &&
updateRes.outcome().updateReadMetrics() &&
needVal)
- cctx.cache().metrics0().onRead(oldVal != null);
+ cctx.cache().metrics0().onRead(oldVal != null);
if (updateMetrics && INVOKE_NO_OP.equals(updateRes.outcome()) && (transformOp || updateRes.transformed()))
cctx.cache().metrics0().onReadOnlyInvoke(oldVal != null);
else if (updateMetrics && REMOVE_NO_VAL.equals(updateRes.outcome())
- && (transformOp || updateRes.transformed()))
+ && (transformOp || updateRes.transformed()))
cctx.cache().metrics0().onInvokeRemove(oldVal != null);
switch (updateRes.outcome()) {
@@ -3521,11 +3543,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Detach value before index update.
val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
- if (val != null) {
- if (cctx.mvccEnabled())
- cctx.offheap().mvccInitialValue(this, val, newVer, expTime);
- else
- storeValue(val, expTime, newVer);
+ if (val != null) {
+ if (cctx.mvccEnabled())
+ cctx.offheap().mvccInitialValue(this, val, newVer, expTime);
+ else
+ storeValue(val, expTime, newVer);
if (deletedUnlocked())
deletedUnlocked(false);
@@ -4157,12 +4179,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
- * Stores value inoffheap.*
+ * Stores value in off-heap.
+ *
* @param val Value.
* @param expireTime Expire time.
* @param ver New entry version.
* @param predicate Optional predicate.
- *
* @return {@code True} if storage was modified.
* @throws IgniteCheckedException If update failed.
*/
@@ -4299,7 +4321,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (detached())
return rawGet();
- for (;;) {
+ for (; ; ) {
GridCacheEntryEx e = cctx.cache().peekEx(key);
if (e == null)
@@ -4806,7 +4828,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (transformed)
cctx.cache().metrics0().onInvokeRemove(hasOldVal);
- } else if (op == READ && transformed)
+ }
+ else if (op == READ && transformed)
cctx.cache().metrics0().onReadOnlyInvoke(hasOldVal);
else {
cctx.cache().metrics0().onWrite();
@@ -4940,6 +4963,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/** */
private final GridFutureAdapter<GridCacheUpdateTxResult> resFut;
+ /** Need previous value flag. */
+ private final boolean needVal;
+
+ /** Filter. */
+ private final CacheEntryPredicate filter;
+
/** */
private GridCacheMapEntry entry;
@@ -4950,7 +4979,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
AffinityTopologyVersion topVer,
MvccSnapshot mvccVer,
boolean needHistory,
- GridFutureAdapter<GridCacheUpdateTxResult> resFut) {
+ GridFutureAdapter<GridCacheUpdateTxResult> resFut,
+ boolean retVal,
+ @Nullable CacheEntryPredicate filter) {
this.tx = tx;
this.entry = entry;
this.topVer = topVer;
@@ -4958,6 +4989,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
this.mvccVer = mvccVer;
this.needHistory = needHistory;
this.resFut = resFut;
+ this.needVal = retVal;
+ this.filter = filter;
}
/** {@inheritDoc} */
@@ -4989,8 +5022,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
cctx.shared().database().checkpointReadLock();
try {
- res = cctx.offheap().mvccRemove(entry, mvccVer, tx.local(), needHistory);
- } finally {
+ res = cctx.offheap().mvccRemove(entry, mvccVer, tx.local(), needHistory, filter, needVal);
+ }
+ finally {
cctx.shared().database().checkpointReadUnlock();
}
@@ -5001,7 +5035,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return;
}
- else if (res.resultType() == ResultType.PREV_NULL) {
+ else if (res.resultType() == ResultType.PREV_NULL || res.resultType() == ResultType.FILTERED) {
resFut.onDone(new GridCacheUpdateTxResult(false));
return;
@@ -5034,15 +5068,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
- cctx.cacheId(),
- entry.key(),
- null,
- DELETE,
- tx.nearXidVersion(),
- tx.writeVersion(),
- 0,
- entry.key().partition(),
- 0)));
+ cctx.cacheId(),
+ entry.key(),
+ null,
+ DELETE,
+ tx.nearXidVersion(),
+ tx.writeVersion(),
+ 0,
+ entry.key().partition(),
+ 0)));
entry.update(null, 0, 0, newVer, true);
@@ -5209,6 +5243,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/** */
private final boolean noCreate;
+ /** Filter. */
+ private final CacheEntryPredicate filter;
+
+ /** Need previous value flag.*/
+ private final boolean needVal;
+
/** */
MvccUpdateLockListener(IgniteInternalTx tx,
GridCacheMapEntry entry,
@@ -5220,6 +5260,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheOperation op,
boolean needHistory,
boolean noCreate,
+ CacheEntryPredicate filter,
+ boolean needVal,
GridFutureAdapter<GridCacheUpdateTxResult> resFut) {
this.tx = tx;
this.entry = entry;
@@ -5231,6 +5273,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
this.op = op;
this.needHistory = needHistory;
this.noCreate = noCreate;
+ this.filter = filter;
+ this.needVal = needVal;
this.resFut = resFut;
}
@@ -5279,8 +5323,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
res = cctx.offheap().mvccUpdate(
- entry, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate);
- } finally {
+ entry, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate, filter, needVal);
+ }
+ finally {
cctx.shared().database().checkpointReadUnlock();
}
@@ -5329,15 +5374,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
- cctx.cacheId(),
- entry.key(),
- val,
- res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE,
- tx.nearXidVersion(),
- newVer,
- expireTime,
- entry.key().partition(),
- 0L)));
+ cctx.cacheId(),
+ entry.key(),
+ val,
+ res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE,
+ tx.nearXidVersion(),
+ newVer,
+ expireTime,
+ entry.key().partition(),
+ 0L)));
entry.update(val, expireTime, ttl, newVer, true);
@@ -6007,8 +6052,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable IgniteBiTuple<Object, Exception> invokeRes,
boolean readFromStore,
boolean transformed)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
GridCacheContext cctx = entry.context();
final CacheObject oldVal = entry.val;
@@ -6059,7 +6103,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else {
- newSysTtl = newTtl = conflictCtx.ttl();
+ newSysTtl = newTtl = conflictCtx.ttl();
newSysExpireTime = newExpireTime = conflictCtx.expireTime();
}
@@ -6166,8 +6210,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable IgniteBiTuple<Object, Exception> invokeRes,
boolean readFromStore,
boolean transformed)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
GridCacheContext cctx = entry.context();
CacheObject oldVal = entry.val;
@@ -6264,8 +6307,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
private GridCacheVersionConflictContext<?, ?> resolveConflict(
CacheObject newVal,
@Nullable IgniteBiTuple<Object, Exception> invokeRes)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
GridCacheContext cctx = entry.context();
// Cache is conflict-enabled.
@@ -6437,7 +6479,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
catch (Exception e) {
if (e instanceof UnregisteredClassException || e instanceof UnregisteredBinaryTypeException)
- throw (IgniteException) e;
+ throw (IgniteException)e;
writeObj = invokeEntry.valObj;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
index b646cf9..4543dfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
@@ -40,14 +40,17 @@ public class GridCacheUpdateTxResult {
private GridLongList mvccWaitTxs;
/** */
- private GridFutureAdapter<GridCacheUpdateTxResult> fut;
+ private GridFutureAdapter<GridCacheUpdateTxResult> fut;
/** */
private WALPointer logPtr;
- /** */
+ /** Mvcc history. */
private List<MvccLinkAwareSearchRow> mvccHistory;
+ /** Previous value. */
+ private CacheObject prevVal;
+
/**
* Constructor.
*
@@ -158,6 +161,22 @@ public class GridCacheUpdateTxResult {
this.mvccHistory = mvccHistory;
}
+ /**
+ *
+ * @return Previous value.
+ */
+ @Nullable public CacheObject prevValue() {
+ return prevVal;
+ }
+
+ /**
+ *
+ * @param prevVal Previous value.
+ */
+ public void prevValue( @Nullable CacheObject prevVal) {
+ this.prevVal = prevVal;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheUpdateTxResult.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index b4b6c9b..f576cc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -276,6 +276,8 @@ public interface IgniteCacheOffheapManager {
* @param primary {@code True} if on primary node.
* @param needHistory Flag to collect history.
* @param noCreate Flag indicating that row should not be created if absent.
+ * @param filter Filter.
+ * @param retVal Flag to return previous value.
* @return Update result.
* @throws IgniteCheckedException If failed.
*/
@@ -287,13 +289,17 @@ public interface IgniteCacheOffheapManager {
MvccSnapshot mvccSnapshot,
boolean primary,
boolean needHistory,
- boolean noCreate) throws IgniteCheckedException;
+ boolean noCreate,
+ @Nullable CacheEntryPredicate filter,
+ boolean retVal) throws IgniteCheckedException;
/**
* @param entry Entry.
* @param mvccSnapshot MVCC snapshot.
* @param primary {@code True} if on primary node.
* @param needHistory Flag to collect history.
+ * @param filter Filter.
+ * @param retVal Flag to return previous value.
* @return Update result.
* @throws IgniteCheckedException If failed.
*/
@@ -301,7 +307,9 @@ public interface IgniteCacheOffheapManager {
GridCacheMapEntry entry,
MvccSnapshot mvccSnapshot,
boolean primary,
- boolean needHistory) throws IgniteCheckedException;
+ boolean needHistory,
+ @Nullable CacheEntryPredicate filter,
+ boolean retVal) throws IgniteCheckedException;
/**
* @param entry Entry.
@@ -788,9 +796,11 @@ public interface IgniteCacheOffheapManager {
* @param ver Version.
* @param expireTime Expire time.
* @param mvccSnapshot MVCC snapshot.
+ * @param filter Filter.
* @param primary {@code True} if update is executed on primary node.
* @param needHistory Flag to collect history.
* @param noCreate Flag indicating that row should not be created if absent.
+ * @param retVal Flag to return previous value.
* @return Update result.
* @throws IgniteCheckedException If failed.
*/
@@ -801,16 +811,20 @@ public interface IgniteCacheOffheapManager {
GridCacheVersion ver,
long expireTime,
MvccSnapshot mvccSnapshot,
+ @Nullable CacheEntryPredicate filter,
boolean primary,
boolean needHistory,
- boolean noCreate) throws IgniteCheckedException;
+ boolean noCreate,
+ boolean retVal) throws IgniteCheckedException;
/**
* @param cctx Cache context.
* @param key Key.
* @param mvccSnapshot MVCC snapshot.
+ * @param filter Filter.
* @param primary {@code True} if update is executed on primary node.
* @param needHistory Flag to collect history.
+ * @param retVal Flag to return previous value.
* @return List of transactions to wait for.
* @throws IgniteCheckedException If failed.
*/
@@ -818,8 +832,10 @@ public interface IgniteCacheOffheapManager {
GridCacheContext cctx,
KeyCacheObject key,
MvccSnapshot mvccSnapshot,
+ @Nullable CacheEntryPredicate filter,
boolean primary,
- boolean needHistory) throws IgniteCheckedException;
+ boolean needHistory,
+ boolean retVal) throws IgniteCheckedException;
/**
* @param cctx Cache context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 11e67d3..e0b9c06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -514,7 +514,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
MvccSnapshot mvccSnapshot,
boolean primary,
boolean needHistory,
- boolean noCreate) throws IgniteCheckedException {
+ boolean noCreate,
+ @Nullable CacheEntryPredicate filter,
+ boolean retVal) throws IgniteCheckedException {
if (entry.detached() || entry.isNear())
return null;
@@ -526,9 +528,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
ver,
expireTime,
mvccSnapshot,
+ filter,
primary,
needHistory,
- noCreate);
+ noCreate,
+ retVal);
}
/** {@inheritDoc} */
@@ -536,7 +540,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCacheMapEntry entry,
MvccSnapshot mvccSnapshot,
boolean primary,
- boolean needHistory) throws IgniteCheckedException {
+ boolean needHistory,
+ @Nullable CacheEntryPredicate filter,
+ boolean retVal) throws IgniteCheckedException {
if (entry.detached() || entry.isNear())
return null;
@@ -545,8 +551,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return dataStore(entry.localPartition()).mvccRemove(entry.context(),
entry.key(),
mvccSnapshot,
+ filter,
primary,
- needHistory);
+ needHistory,
+ retVal);
}
/** {@inheritDoc} */
@@ -1848,9 +1856,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCacheVersion ver,
long expireTime,
MvccSnapshot mvccSnapshot,
+ @Nullable CacheEntryPredicate filter,
boolean primary,
boolean needHistory,
- boolean noCreate) throws IgniteCheckedException {
+ boolean noCreate,
+ boolean retVal) throws IgniteCheckedException {
assert mvccSnapshot != null;
assert primary || !needHistory;
@@ -1866,7 +1876,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
key.valueBytes(coCtx);
val.valueBytes(coCtx);
- MvccUpdateDataRow updateRow = new MvccUpdateDataRow(
+ MvccUpdateDataRow updateRow = new MvccUpdateDataRow(
cctx,
key,
val,
@@ -1875,11 +1885,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
expireTime,
mvccSnapshot,
null,
+ filter,
primary,
false,
needHistory,
// we follow fast update visit flow here if row cannot be created by current operation
- noCreate);
+ noCreate,
+ retVal);
assert cctx.shared().database().checkpointLockIsHeldByThread();
@@ -1890,17 +1902,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (res == ResultType.LOCKED // cannot update locked
|| res == ResultType.VERSION_MISMATCH) // cannot update on write conflict
return updateRow;
- else if (res == ResultType.VERSION_FOUND) {
+ else if (res == ResultType.VERSION_FOUND || // exceptional case
+ res == ResultType.FILTERED || // Operation should be skipped.
+ (res == ResultType.PREV_NULL && noCreate) // No op.
+ ) {
// Do nothing, except cleaning up not needed versions
cleanup(cctx, updateRow.cleanupRows());
return updateRow;
}
- else if (res == ResultType.PREV_NULL && noCreate) {
- cleanup(cctx, updateRow.cleanupRows());
-
- return updateRow;
- }
CacheDataRow oldRow = null;
@@ -1961,8 +1971,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
@Override public MvccUpdateResult mvccRemove(GridCacheContext cctx,
KeyCacheObject key,
MvccSnapshot mvccSnapshot,
+ @Nullable CacheEntryPredicate filter,
boolean primary,
- boolean needHistory) throws IgniteCheckedException {
+ boolean needHistory,
+ boolean retVal) throws IgniteCheckedException {
assert mvccSnapshot != null;
assert primary || mvccSnapshot.activeTransactions().size() == 0 : mvccSnapshot;
assert primary || !needHistory;
@@ -1987,10 +1999,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
0,
mvccSnapshot,
null,
+ filter,
primary,
false,
needHistory,
- true);
+ true,
+ retVal);
assert cctx.shared().database().checkpointLockIsHeldByThread();
@@ -2001,7 +2015,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (res == ResultType.LOCKED // cannot update locked
|| res == ResultType.VERSION_MISMATCH) // cannot update on write conflict
return updateRow;
- else if (res == ResultType.VERSION_FOUND) {
+ else if (res == ResultType.VERSION_FOUND || res == ResultType.FILTERED) {
// Do nothing, except cleaning up not needed versions
cleanup(cctx, updateRow.cleanupRows());
@@ -2051,9 +2065,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
0,
mvccSnapshot,
null,
+ null,
true,
true,
false,
+ false,
false);
assert cctx.shared().database().checkpointLockIsHeldByThread();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 225fa81..4989efb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -348,7 +348,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
/** {@inheritDoc} */
@Override public Lock lockAll(final Collection<? extends K> keys) {
- //TODO IGNITE-7764
+ //TODO: IGNITE-9324: add explicit locks support.
MvccUtils.verifyMvccOperationSupport(ctx, "Lock");
return new CacheLockImpl<>(ctx.gate(), delegate, ctx.operationContextPerCall(), keys);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 4480dae..52638c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -60,6 +60,9 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse;
@@ -188,7 +191,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryEnlistResponse.class, new CI2<UUID, GridNearTxQueryEnlistResponse>() {
@Override public void apply(UUID nodeId, GridNearTxQueryEnlistResponse req) {
- processNearEnlistResponse(nodeId, req);
+ processNearTxQueryEnlistResponse(nodeId, req);
}
});
@@ -216,7 +219,21 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryResultsEnlistResponse.class,
new CI2<UUID, GridNearTxQueryResultsEnlistResponse>() {
@Override public void apply(UUID nodeId, GridNearTxQueryResultsEnlistResponse req) {
- processNearTxEnlistResponse(nodeId, req);
+ processNearTxQueryResultsEnlistResponse(nodeId, req);
+ }
+ });
+
+ ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistRequest.class,
+ new CI2<UUID, GridNearTxEnlistRequest>() {
+ @Override public void apply(UUID nodeId, GridNearTxEnlistRequest req) {
+ processNearTxEnlistRequest(nodeId, req);
+ }
+ });
+
+ ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistResponse.class,
+ new CI2<UUID, GridNearTxEnlistResponse>() {
+ @Override public void apply(UUID nodeId, GridNearTxEnlistResponse msg) {
+ processNearTxEnlistResponse(nodeId, msg);
}
});
@@ -756,17 +773,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
/**
* @param nodeId Node ID.
- * @param res Response.
- */
- private void processNearEnlistResponse(UUID nodeId, final GridNearTxQueryEnlistResponse res) {
- GridNearTxQueryEnlistFuture fut = (GridNearTxQueryEnlistFuture)ctx.mvcc().versionedFuture(res.version(), res.futureId());
-
- if (fut != null)
- fut.onResult(nodeId, res);
- }
-
- /**
- * @param nodeId Node ID.
* @param req Request.
*/
private void processNearLockRequest(UUID nodeId, GridNearLockRequest req) {
@@ -801,7 +807,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
IgniteInternalFuture<?> f;
if (req.firstClientRequest()) {
- for (;;) {
+ for (; ; ) {
if (waitForExchangeFuture(nearNode, req))
return;
@@ -1079,9 +1085,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (top != null && needRemap(req.topologyVersion(), top.readyTopologyVersion())) {
if (log.isDebugEnabled()) {
log.debug("Client topology version mismatch, need remap lock request [" +
- "reqTopVer=" + req.topologyVersion() +
- ", locTopVer=" + top.readyTopologyVersion() +
- ", req=" + req + ']');
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.readyTopologyVersion() +
+ ", req=" + req + ']');
}
GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
@@ -1124,7 +1130,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (tx == null || !tx.init()) {
String msg = "Failed to acquire lock (transaction has been completed): " +
- req.version();
+ req.version();
U.warn(log, msg);
@@ -1401,7 +1407,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
int i = 0;
- for (ListIterator<GridCacheEntryEx> it = entries.listIterator(); it.hasNext();) {
+ for (ListIterator<GridCacheEntryEx> it = entries.listIterator(); it.hasNext(); ) {
GridCacheEntryEx e = it.next();
assert e != null;
@@ -1995,6 +2001,71 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
/**
+ * @param nodeId Node ID.
+ * @param req Request.
+ */
+ private void processNearTxEnlistRequest(UUID nodeId, final GridNearTxEnlistRequest req) {
+ assert nodeId != null;
+ assert req != null;
+
+ ClusterNode nearNode = ctx.discovery().node(nodeId);
+
+ GridDhtTxLocal tx;
+
+ try {
+ tx = initTxTopologyVersion(nodeId,
+ nearNode,
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.firstClientRequest(),
+ req.topologyVersion(),
+ req.threadId(),
+ req.txTimeout(),
+ req.subjectId(),
+ req.taskNameHash());
+ }
+ catch (IgniteCheckedException | IgniteException ex) {
+ GridNearTxEnlistResponse res = new GridNearTxEnlistResponse(req.cacheId(),
+ req.futureId(),
+ req.miniId(),
+ req.version(),
+ ex);
+
+ try {
+ ctx.io().send(nearNode, res, ctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send near enlist response [" +
+ "txId=" + req.version() +
+ ", node=" + nodeId +
+ ", res=" + res + ']', e);
+ }
+
+ return;
+ }
+
+ GridDhtTxEnlistFuture fut = new GridDhtTxEnlistFuture(
+ nodeId,
+ req.version(),
+ req.mvccSnapshot(),
+ req.threadId(),
+ req.futureId(),
+ req.miniId(),
+ tx,
+ req.timeout(),
+ ctx,
+ req.rows(),
+ req.operation(),
+ req.filter(),
+ req.needRes());
+
+ fut.listen(NearTxResultHandler.instance());
+
+ fut.init();
+ }
+
+ /**
* @param nodeId Near node id.
* @param nearNode Near node.
* @param nearLockVer Near lock version.
@@ -2125,7 +2196,30 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @param nodeId Node ID.
* @param res Response.
*/
- private void processNearTxEnlistResponse(UUID nodeId, final GridNearTxQueryResultsEnlistResponse res) {
+ private void processNearTxEnlistResponse(UUID nodeId, final GridNearTxEnlistResponse res) {
+ GridNearTxEnlistFuture fut = (GridNearTxEnlistFuture)
+ ctx.mvcc().versionedFuture(res.version(), res.futureId());
+
+ if (fut != null)
+ fut.onResult(nodeId, res);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ private void processNearTxQueryEnlistResponse(UUID nodeId, final GridNearTxQueryEnlistResponse res) {
+ GridNearTxQueryEnlistFuture fut = (GridNearTxQueryEnlistFuture)ctx.mvcc().versionedFuture(res.version(), res.futureId());
+
+ if (fut != null)
+ fut.onResult(nodeId, res);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ private void processNearTxQueryResultsEnlistResponse(UUID nodeId, final GridNearTxQueryResultsEnlistResponse res) {
GridNearTxQueryResultsEnlistFuture fut = (GridNearTxQueryResultsEnlistFuture)
ctx.mvcc().versionedFuture(res.version(), res.futureId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index ad164e7..64f966d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -77,11 +78,10 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
- * Abstract future processing transaction enlisting and locking
- * of entries produced with DML and SELECT FOR UPDATE queries.
+ * Abstract future processing transaction enlisting and locking.
*/
-public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapter<Long>
- implements DhtLockFuture<Long> {
+public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAdapter<T>
+ implements DhtLockFuture<T> {
/** Done field updater. */
private static final AtomicIntegerFieldUpdater<GridDhtTxAbstractEnlistFuture> DONE_UPD =
AtomicIntegerFieldUpdater.newUpdater(GridDhtTxAbstractEnlistFuture.class, "done");
@@ -134,9 +134,6 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
/** */
protected final MvccSnapshot mvccSnapshot;
- /** Processed entries count. */
- protected long cnt;
-
/** New DHT nodes. */
protected Set<UUID> newDhtNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -146,6 +143,9 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
/** Near lock version. */
protected final GridCacheVersion nearLockVer;
+ /** Filter. */
+ private final CacheEntryPredicate filter;
+
/** Timeout object. */
@GridToStringExclude
protected LockTimeoutObject timeoutObj;
@@ -202,6 +202,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
* @param tx Transaction.
* @param timeout Lock acquisition timeout.
* @param cctx Cache context.
+ * @param filter Filter.
*/
protected GridDhtTxAbstractEnlistFuture(UUID nearNodeId,
GridCacheVersion nearLockVer,
@@ -212,7 +213,8 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
@Nullable int[] parts,
GridDhtTxLocalAdapter tx,
long timeout,
- GridCacheContext<?, ?> cctx) {
+ GridCacheContext<?, ?> cctx,
+ @Nullable CacheEntryPredicate filter) {
assert tx != null;
assert timeout >= 0;
assert nearNodeId != null;
@@ -229,6 +231,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
this.timeout = timeout;
this.tx = tx;
this.parts = parts;
+ this.filter = filter;
lockVer = tx.xidVersion();
@@ -238,12 +241,38 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
}
/**
+ * Gets source to be updated iterator.
+ *
* @return iterator.
* @throws IgniteCheckedException If failed.
*/
protected abstract UpdateSourceIterator<?> createIterator() throws IgniteCheckedException;
/**
+ * Gets query result.
+ *
+ * @return Query result.
+ */
+ protected abstract T result0();
+
+ /**
+ * Gets need previous value flag.
+ *
+ * @return {@code True} if previous value is required.
+ */
+ public boolean needResult() {
+ return false;
+ }
+
+ /**
+ * Entry processed callback.
+ *
+ * @param key Entry key.
+ * @param res Update result.
+ */
+ protected abstract void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult res);
+
+ /**
*
*/
public void init() {
@@ -291,14 +320,14 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
boolean added = cctx.mvcc().addFuture(this, futId);
- assert added;
-
if (isDone()) {
cctx.mvcc().removeFuture(futId);
return;
}
+ assert added;
+
if (timeoutObj != null)
cctx.time().addTimeoutObject(timeoutObj);
@@ -310,12 +339,15 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
if (!it.hasNext()) {
U.close(it, log);
- onDone(0L);
+ onDone(result0());
return;
}
- tx.addActiveCache(cctx, false);
+ if(!tx.implicitSingle())
+ tx.addActiveCache(cctx, false);
+ else // Nothing to do for single update.
+ assert tx.txState().cacheIds().contains(cctx.cacheId()) && tx.txState().cacheIds().size() == 1;
this.it = it;
}
@@ -391,7 +423,9 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
cctx.localNodeId(),
topVer,
mvccSnapshot,
- isMoving(key.partition()));
+ isMoving(key.partition()),
+ filter,
+ needResult());
break;
@@ -407,7 +441,9 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
mvccSnapshot,
op.cacheOperation(),
isMoving(key.partition()),
- op.noCreate());
+ op.noCreate(),
+ filter,
+ needResult());
break;
@@ -493,7 +529,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
}
if (noPendingRequests()) {
- onDone(cnt);
+ onDone(result0());
return;
}
@@ -569,11 +605,11 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
if (ptr0 != null)
walPtr = ptr0;
+ onEntryProcessed(entry.key(), updRes);
+
if (!updRes.success())
return;
- cnt++;
-
if (op != EnlistOperation.LOCK)
addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId());
}
@@ -980,7 +1016,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
}
/** {@inheritDoc} */
- @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err) {
+ @Override public boolean onDone(@Nullable T res, @Nullable Throwable err) {
assert res != null || err != null;
if (!DONE_UPD.compareAndSet(this, 0, 1))
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
new file mode 100644
index 0000000..58d6b15
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
+import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Future processing transaction enlisting and locking of entries produces by cache API operations.
+ */
+public final class GridDhtTxEnlistFuture extends GridDhtTxAbstractEnlistFuture<GridCacheReturn> implements UpdateSourceIterator<Object> {
+ /** Enlist operation. */
+ private EnlistOperation op;
+
+ /** Source iterator. */
+ private Iterator<Object> it;
+
+ /** Future result. */
+ private GridCacheReturn res;
+
+ /** Need result flag. If {@code True} previous value should be returned as well. */
+ private boolean needRes;
+
+ /**
+ * Constructor.
+ *
+ * @param nearNodeId Near node ID.
+ * @param nearLockVer Near lock version.
+ * @param mvccSnapshot Mvcc snapshot.
+ * @param threadId Thread ID.
+ * @param nearFutId Near future id.
+ * @param nearMiniId Near mini future id.
+ * @param tx Transaction.
+ * @param timeout Lock acquisition timeout.
+ * @param cctx Cache context.
+ * @param rows Collection of rows.
+ * @param op Operation.
+ * @param filter Filter.
+ * @param needRes Return previous value flag.
+ */
+ public GridDhtTxEnlistFuture(UUID nearNodeId,
+ GridCacheVersion nearLockVer,
+ MvccSnapshot mvccSnapshot,
+ long threadId,
+ IgniteUuid nearFutId,
+ int nearMiniId,
+ GridDhtTxLocalAdapter tx,
+ long timeout,
+ GridCacheContext<?, ?> cctx,
+ Collection<Object> rows,
+ EnlistOperation op,
+ @Nullable CacheEntryPredicate filter,
+ boolean needRes) {
+ super(nearNodeId,
+ nearLockVer,
+ mvccSnapshot,
+ threadId,
+ nearFutId,
+ nearMiniId,
+ null,
+ tx,
+ timeout,
+ cctx,
+ filter);
+
+ this.op = op;
+ this.needRes = needRes;
+
+ it = rows.iterator();
+
+ res = new GridCacheReturn(cctx.localNodeId().equals(nearNodeId), false);
+
+ skipNearNodeUpdates = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected UpdateSourceIterator<?> createIterator() throws IgniteCheckedException {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable protected GridCacheReturn result0() {
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult txRes) {
+ if (needRes && txRes.success())
+ res.set(cctx, txRes.prevValue(), txRes.success(), true);
+ else
+ res.success(txRes.success());
+ }
+
+ /** {@inheritDoc} */
+ public boolean needResult() {
+ return needRes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public EnlistOperation operation() {
+ return op;
+ }
+
+ /** {@inheritDoc} */
+ public boolean hasNextX() {
+ return it.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ public Object nextX() {
+ return it.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtTxEnlistFuture.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java
new file mode 100644
index 0000000..0a26d75
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstract future processing transaction enlisting and locking of entries produced with DML and SELECT FOR UPDATE
+ * queries.
+ */
+public abstract class GridDhtTxQueryAbstractEnlistFuture extends GridDhtTxAbstractEnlistFuture<Long> {
+ /** Processed entries count. */
+ protected long cnt;
+
+ /**
+ * Constructor.
+ *
+ * @param nearNodeId Near node ID.
+ * @param nearLockVer Near lock version.
+ * @param mvccSnapshot Mvcc snapshot.
+ * @param threadId Thread ID.
+ * @param nearFutId Near future id.
+ * @param nearMiniId Near mini future id.
+ * @param parts Partitions.
+ * @param tx Transaction.
+ * @param timeout Lock acquisition timeout.
+ * @param cctx Cache context.
+ */
+ protected GridDhtTxQueryAbstractEnlistFuture(UUID nearNodeId,
+ GridCacheVersion nearLockVer,
+ MvccSnapshot mvccSnapshot,
+ long threadId,
+ IgniteUuid nearFutId,
+ int nearMiniId,
+ @Nullable int[] parts,
+ GridDhtTxLocalAdapter tx,
+ long timeout,
+ GridCacheContext<?, ?> cctx) {
+ super(nearNodeId,
+ nearLockVer,
+ mvccSnapshot,
+ threadId,
+ nearFutId,
+ nearMiniId,
+ null,
+ tx,
+ timeout,
+ cctx, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Long result0() {
+ return cnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult res) {
+ if(res.success())
+ cnt++;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
index dd30855..ed792f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
-import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -29,9 +28,9 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
/**
- * Cache lock future.
+ * Cache query lock future.
*/
-public final class GridDhtTxQueryEnlistFuture extends GridDhtTxAbstractEnlistFuture {
+public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture {
/** Involved cache ids. */
private final int[] cacheIds;
@@ -116,24 +115,6 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxAbstractEnlistFut
}
/** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- GridDhtTxQueryEnlistFuture future = (GridDhtTxQueryEnlistFuture)o;
-
- return Objects.equals(futId, future.futId);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return futId.hashCode();
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtTxQueryEnlistFuture.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
index b3d15d4..c6140fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
import java.util.Iterator;
-import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -34,13 +33,11 @@ import org.apache.ignite.lang.IgniteUuid;
* Future processing transaction enlisting and locking of entries
* produces by complex DML queries with reduce step.
*/
-public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxAbstractEnlistFuture implements UpdateSourceIterator<Object> {
- /** */
- private static final long serialVersionUID = -4933550335145438798L;
- /** */
+public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture implements UpdateSourceIterator<Object> {
+ /** Enlist operation. */
private EnlistOperation op;
- /** */
+ /** Source iterator. */
private Iterator<Object> it;
/**
@@ -91,29 +88,6 @@ public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxAbstractEn
}
/** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- GridDhtTxQueryResultsEnlistFuture future = (GridDhtTxQueryResultsEnlistFuture)o;
-
- return Objects.equals(futId, future.futId);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return futId.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxQueryResultsEnlistFuture.class, this);
- }
-
- /** {@inheritDoc} */
@Override public EnlistOperation operation() {
return op;
}
@@ -127,4 +101,9 @@ public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxAbstractEn
@Override public Object nextX() {
return it.next();
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtTxQueryResultsEnlistFuture.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 6662a1c..9883f6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -440,6 +440,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
ctx.localNodeId(),
topologyVersion(),
snapshot,
+ false,
+ null,
false);
break;
@@ -456,6 +458,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
snapshot,
op.cacheOperation(),
false,
+ false,
+ null,
false);
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index f5689f9..5d3bef2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -389,7 +389,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
boolean skipEntry = readNoEntry;
if (readNoEntry) {
- CacheDataRow row = mvccSnapshot != null ? cctx.offheap().mvccRead(cctx, key, mvccSnapshot) :
+ CacheDataRow row = mvccSnapshot != null ?
+ cctx.offheap().mvccRead(cctx, key, mvccSnapshot) :
cctx.offheap().read(cctx, key);
if (row != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
new file mode 100644
index 0000000..0bc00e1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Response factory.
+ */
+public final class NearTxResultHandler implements CI1<IgniteInternalFuture<GridCacheReturn>> {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** Singleton instance.*/
+ private static final NearTxResultHandler INSTANCE = new NearTxResultHandler();
+
+ /** Constructor. */
+ private NearTxResultHandler() {
+ }
+
+ /**
+ * @return Handler instance.
+ */
+ public static NearTxResultHandler instance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Response factory method.
+ *
+ * @param future Enlist future.
+ * @return Enlist response.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T createResponse(IgniteInternalFuture<?> future) {
+ assert future != null;
+
+ Class<?> clazz = future.getClass();
+
+ if (clazz == GridDhtTxEnlistFuture.class)
+ return (T)createResponse((GridDhtTxEnlistFuture)future);
+ else
+ throw new IllegalStateException();
+ }
+
+ /**
+ * Response factory method.
+ *
+ * @param fut Enlist future.
+ * @return Enlist response.
+ */
+ public static GridNearTxEnlistResponse createResponse(GridDhtTxEnlistFuture fut) {
+ try {
+ GridCacheReturn res = fut.get();
+
+ GridCacheVersion ver = null;
+ IgniteUuid id = null;
+
+ if (fut.hasNearNodeUpdates) {
+ ver = fut.cctx.tm().mappedVersion(fut.nearLockVer);
+
+ id = fut.futId;
+ }
+
+ return new GridNearTxEnlistResponse(fut.cctx.cacheId(), fut.nearFutId, fut.nearMiniId,
+ fut.nearLockVer, res, ver, id, fut.newDhtNodes);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridNearTxEnlistResponse(fut.cctx.cacheId(), fut.nearFutId, fut.nearMiniId, fut.nearLockVer, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void apply(IgniteInternalFuture<GridCacheReturn> fut0) {
+ GridDhtTxAbstractEnlistFuture fut = (GridDhtTxAbstractEnlistFuture)fut0;
+
+ GridCacheContext<?, ?> cctx = fut.cctx;
+ GridDhtTxLocal tx = (GridDhtTxLocal)fut.tx;
+ UUID nearNodeId = fut.nearNodeId;
+
+ GridNearTxEnlistResponse res = createResponse(fut);
+
+ try {
+ cctx.io().send(nearNodeId, res, cctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(fut.log, "Failed to send near enlist response (will rollback transaction) [" +
+ "tx=" + CU.txString(tx) +
+ ", node=" + nearNodeId +
+ ", res=" + res + ']', e);
+
+ try {
+ tx.rollbackDhtLocalAsync();
+ }
+ catch (Throwable e1) {
+ e.addSuppressed(e1);
+ }
+
+ throw new GridClosureException(e);
+ }
+ }
+}