You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/01 05:55:22 UTC
[12/21] ignite git commit: IGNITE-7764: MVCC: cache API support. This
closes #4725.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 4d5fa13..a83a93f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,8 +42,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
@@ -61,6 +59,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -68,6 +70,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.C2;
+import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
@@ -186,13 +189,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (keyCheck)
validateCacheKey(key);
- GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
+ GridNearTxLocal tx = ctx.mvccEnabled() ? MvccUtils.tx(ctx.kernalContext()) : ctx.tm().threadLocalTx(ctx);
final CacheOperationContext opCtx = ctx.operationContextPerCall();
final boolean recovery = opCtx != null && opCtx.recovery();
- if (tx != null && !tx.implicit() && !skipTx) {
+ // Get operation bypass Tx in Mvcc mode.
+ if (!ctx.mvccEnabled() && tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx,
@@ -230,6 +234,26 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
subjId = ctx.subjectIdPerCall(subjId, opCtx);
+ MvccSnapshot mvccSnapshot = null;
+ MvccQueryTracker mvccTracker = null;
+
+ if (ctx.mvccEnabled()) {
+ try {
+ if (tx != null)
+ mvccSnapshot = MvccUtils.requestSnapshot(ctx, tx);
+ else {
+ mvccTracker = MvccUtils.mvccTracker(ctx, null);
+
+ mvccSnapshot = mvccTracker.snapshot();
+ }
+
+ assert mvccSnapshot != null;
+ }
+ catch (IgniteCheckedException ex) {
+ return new GridFinishedFuture<>(ex);
+ }
+ }
+
GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
ctx.toCacheKeyObject(key),
topVer,
@@ -243,10 +267,21 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
needVer,
/*keepCacheObjects*/false,
opCtx != null && opCtx.recovery(),
- null);
+ mvccSnapshot);
fut.init();
+ if(mvccTracker != null){
+ final MvccQueryTracker mvccTracker0 = mvccTracker;
+
+ fut.listen(new CI1<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> future) {
+ if(future.isDone())
+ mvccTracker0.onDone();
+ }
+ });
+ }
+
return (IgniteInternalFuture<V>)fut;
}
@@ -270,13 +305,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (keyCheck)
validateCacheKeys(keys);
- GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
+ GridNearTxLocal tx = (ctx.mvccEnabled()) ? MvccUtils.tx(ctx.kernalContext()) : ctx.tm().threadLocalTx(ctx);
final CacheOperationContext opCtx = ctx.operationContextPerCall();
- if (tx != null && !tx.implicit() && !skipTx) {
+ if (!ctx.mvccEnabled() && tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
- @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx,
+ AffinityTopologyVersion readyTopVer) {
return tx.getAllAsync(ctx,
readyTopVer,
ctx.cacheKeysView(keys),
@@ -290,14 +327,34 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}, opCtx, /*retry*/false);
}
- AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
-
subjId = ctx.subjectIdPerCall(subjId, opCtx);
- return loadAsync(
+ MvccSnapshot mvccSnapshot = null;
+ MvccQueryTracker mvccTracker = null;
+
+ if (ctx.mvccEnabled()) {
+ try {
+ if (tx != null)
+ mvccSnapshot = MvccUtils.requestSnapshot(ctx, tx);
+ else {
+ mvccTracker = MvccUtils.mvccTracker(ctx, null);
+
+ mvccSnapshot = mvccTracker.snapshot();
+ }
+
+ assert mvccSnapshot != null;
+ }
+ catch (IgniteCheckedException ex) {
+ return new GridFinishedFuture(ex);
+ }
+ }
+
+ AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+
+ IgniteInternalFuture<Map<K, V>> fut = loadAsync(
ctx.cacheKeysView(keys),
opCtx == null || !opCtx.skipStore(),
- forcePrimary,
+ forcePrimary ,
topVer,
subjId,
taskName,
@@ -305,46 +362,23 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
recovery,
skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
skipVals,
- needVer);
- }
-
- /**
- * @param keys Keys to load.
- * @param readThrough Read through flag.
- * @param forcePrimary Force get from primary node flag.
- * @param topVer Topology version.
- * @param subjId Subject ID.
- * @param taskName Task name.
- * @param deserializeBinary Deserialize binary flag.
- * @param expiryPlc Expiry policy.
- * @param skipVals Skip values flag.
- * @param needVer Need version.
- * @return Loaded values.
- */
- private IgniteInternalFuture<Map<K, V>> loadAsync(
- @Nullable Collection<KeyCacheObject> keys,
- boolean readThrough,
- boolean forcePrimary,
- AffinityTopologyVersion topVer,
- @Nullable UUID subjId,
- String taskName,
- boolean deserializeBinary,
- boolean recovery,
- @Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean skipVals,
- boolean needVer) {
- return loadAsync(keys,
- readThrough,
- forcePrimary,
- topVer, subjId,
- taskName,
- deserializeBinary,
- recovery,
- expiryPlc,
- skipVals,
needVer,
false,
- null);
+ mvccSnapshot);
+
+ if(mvccTracker != null){
+ final MvccQueryTracker mvccTracker0 = mvccTracker;
+
+ fut.listen(new CI1<IgniteInternalFuture<Map<K, V>>>() {
+ /** {@inheritDoc} */
+ @Override public void apply(IgniteInternalFuture<Map<K, V>> future) {
+ if(future.isDone())
+ mvccTracker0.onDone();
+ }
+ });
+ }
+
+ return fut;
}
/**
@@ -445,7 +479,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
for (KeyCacheObject key : keys) {
if (readNoEntry) {
- CacheDataRow row = ctx.offheap().read(ctx, key);
+ CacheDataRow row = mvccSnapshot != null ?
+ ctx.offheap().mvccRead(ctx, key, mvccSnapshot) :
+ ctx.offheap().read(ctx, key);
if (row != null) {
long expireTime = row.expireTime();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index b167f26..85a48a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -287,10 +287,15 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
boolean queryMapped = false;
- for (GridDistributedTxMapping m : F.view(tx.mappings().mappings(), CU.FILTER_QUERY_MAPPING)) {
+ assert !tx.implicitSingle() || tx.queryEnlisted(); // Non-mvcc implicit-single tx goes fast commit way.
+
+ Collection<GridDistributedTxMapping> txMappings = !tx.implicitSingle() ? tx.mappings().mappings()
+ : Collections.singleton(tx.mappings().singleMapping());
+
+ for (GridDistributedTxMapping m : F.view(txMappings, CU.FILTER_QUERY_MAPPING)) {
GridDistributedTxMapping nodeMapping = mappings.get(m.primary().id());
- if(nodeMapping == null)
+ if (nodeMapping == null)
mappings.put(m.primary().id(), m);
txMapping.addMapping(F.asList(m.primary()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
index f484bd6..11f98ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
@@ -42,10 +42,10 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -53,11 +53,8 @@ import org.jetbrains.annotations.Nullable;
/**
*
*/
-public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundIdentityFuture<Long> implements
- GridCacheVersionedFuture<Long> {
- /** */
- private static final long serialVersionUID = -6069985059301497282L;
-
+public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoundIdentityFuture<T> implements
+ GridCacheVersionedFuture<T> {
/** Done field updater. */
private static final AtomicIntegerFieldUpdater<GridNearTxAbstractEnlistFuture> DONE_UPD =
AtomicIntegerFieldUpdater.newUpdater(GridNearTxAbstractEnlistFuture.class, "done");
@@ -117,10 +114,11 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId
* @param cctx Cache context.
* @param tx Transaction.
* @param timeout Timeout.
+ * @param rdc Compound future reducer.
*/
public GridNearTxAbstractEnlistFuture(
- GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout) {
- super(CU.longReducer());
+ GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout, @Nullable IgniteReducer<T, T> rdc) {
+ super(rdc);
assert cctx != null;
assert tx != null;
@@ -300,8 +298,6 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId
throw new IgniteCheckedException("Future is done.");
}
-
-
/**
*/
private void mapOnTopology() {
@@ -359,7 +355,7 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId
}
/** {@inheritDoc} */
- @Override protected boolean processFailure(Throwable err, IgniteInternalFuture<Long> fut) {
+ @Override protected boolean processFailure(Throwable err, IgniteInternalFuture<T> fut) {
if (ex != null || !EX_UPD.compareAndSet(this, null, err))
ex.addSuppressed(err);
@@ -367,7 +363,7 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId
}
/** {@inheritDoc} */
- @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err, boolean cancelled) {
+ @Override public boolean onDone(@Nullable T res, @Nullable Throwable err, boolean cancelled) {
if (!DONE_UPD.compareAndSet(this, 0, 1))
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
new file mode 100644
index 0000000..8d85bd9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
@@ -0,0 +1,683 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxAbstractEnlistFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxEnlistFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
+import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
+import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.NearTxResultHandler.createResponse;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_NA;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * A future tracking requests for remote nodes transaction enlisting and locking produces by cache API operations.
+ */
+public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridCacheReturn> {
+ /** Default batch size. */
+ public static final int DFLT_BATCH_SIZE = 1024;
+
+ /** SkipCntr field updater. */
+ private static final AtomicIntegerFieldUpdater<GridNearTxEnlistFuture> SKIP_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(GridNearTxEnlistFuture.class, "skipCntr");
+
+ /** Marker object. */
+ private static final Object FINISHED = new Object();
+
+ /** Source iterator. */
+ @GridToStringExclude
+ private final UpdateSourceIterator<?> it;
+
+ /** Batch size. */
+ private int batchSize;
+
+ /** */
+ private AtomicInteger batchCntr = new AtomicInteger();
+
+ /** */
+ @SuppressWarnings("unused")
+ @GridToStringExclude
+ private volatile int skipCntr;
+
+ /** Future result. */
+ @GridToStringExclude
+ private volatile GridCacheReturn res;
+
+ /** */
+ private final Map<UUID, Batch> batches = new ConcurrentHashMap<>();
+
+ /** Row extracted from iterator but not yet used. */
+ private Object peek;
+
+ /** Topology locked flag. */
+ private boolean topLocked;
+
+ /** Ordered batch sending flag. */
+ private final boolean sequential;
+
+ /** Filter. */
+ private final CacheEntryPredicate filter;
+
+ /** Need previous value flag. */
+ private final boolean needRes;
+
+ /**
+ * @param cctx Cache context.
+ * @param tx Transaction.
+ * @param timeout Timeout.
+ * @param it Rows iterator.
+ * @param batchSize Batch size.
+ * @param sequential Sequential locking flag.
+ * @param filter Filter.
+ * @param needRes Need previous value flag.
+ */
+ public GridNearTxEnlistFuture(GridCacheContext<?, ?> cctx,
+ GridNearTxLocal tx,
+ long timeout,
+ UpdateSourceIterator<?> it,
+ int batchSize,
+ boolean sequential,
+ @Nullable CacheEntryPredicate filter,
+ boolean needRes) {
+ super(cctx, tx, timeout, null);
+
+ this.it = it;
+ this.batchSize = batchSize > 0 ? batchSize : DFLT_BATCH_SIZE;
+ this.sequential = sequential;
+ this.filter = filter;
+ this.needRes = needRes;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void map(boolean topLocked) {
+ this.topLocked = topLocked;
+
+ sendNextBatches(null);
+ }
+
+ /**
+ * Continue iterating the data rows and form new batches.
+ *
+ * @param nodeId Node that is ready for a new batch.
+ */
+ private void sendNextBatches(@Nullable UUID nodeId) {
+ try {
+ Collection<Batch> next = continueLoop(nodeId);
+
+ if (next == null)
+ return;
+
+ boolean first = (nodeId != null);
+
+ for (Batch batch : next) {
+ ClusterNode node = batch.node();
+
+ sendBatch(node, batch, first);
+
+ if (!node.isLocal())
+ first = false;
+ }
+ }
+ catch (Throwable e) {
+ onDone(e);
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
+
+ /**
+ * Iterate data rows and form batches.
+ *
+ * @param nodeId Id of node acknowledged the last batch.
+ * @return Collection of newly completed batches.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Collection<Batch> continueLoop(@Nullable UUID nodeId) throws IgniteCheckedException {
+ if (nodeId != null)
+ batches.remove(nodeId);
+
+ // Accumulate number of batches released since we got here.
+ // Let only one thread do the looping.
+ if (isDone() || SKIP_UPD.getAndIncrement(this) != 0)
+ return null;
+
+ ArrayList<Batch> res = null;
+ Batch batch = null;
+
+ boolean flush = false;
+
+ EnlistOperation op = it.operation();
+
+ while (true) {
+ while (hasNext0()) {
+ checkCompleted();
+
+ Object cur = next0();
+
+ KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey());
+
+ List<ClusterNode> nodes = cctx.affinity().nodesByKey(key, topVer);
+
+ ClusterNode node;
+
+ if (F.isEmpty(nodes) || ((node = nodes.get(0)) == null))
+ throw new ClusterTopologyCheckedException("Failed to get primary node " +
+ "[topVer=" + topVer + ", key=" + key + ']');
+
+ tx.markQueryEnlisted(null);
+
+ if (!sequential)
+ batch = batches.get(node.id());
+ else if (batch != null && !batch.node().equals(node))
+ res = markReady(res, batch);
+
+ if (batch == null)
+ batches.put(node.id(), batch = new Batch(node));
+
+ if (batch.ready()) {
+ // Can't advance further at the moment.
+ batch = null;
+
+ peek = cur;
+
+ it.beforeDetach();
+
+ flush = true;
+
+ break;
+ }
+
+ batch.add(op.isDeleteOrLock() ? key : cur,
+ op != EnlistOperation.LOCK && cctx.affinityNode() && (cctx.isReplicated() || nodes.indexOf(cctx.localNode()) > 0));
+
+ if (batch.size() == batchSize)
+ res = markReady(res, batch);
+ }
+
+ if (SKIP_UPD.decrementAndGet(this) == 0)
+ break;
+
+ skipCntr = 1;
+ }
+
+ if (flush)
+ return res;
+
+ // No data left - flush incomplete batches.
+ for (Batch batch0 : batches.values()) {
+ if (!batch0.ready()) {
+ if (res == null)
+ res = new ArrayList<>();
+
+ batch0.ready(true);
+
+ res.add(batch0);
+ }
+ }
+
+ if (batches.isEmpty())
+ onDone(this.res);
+
+ return res;
+ }
+
+ /** */
+ private Object next0() {
+ if (!hasNext0())
+ throw new NoSuchElementException();
+
+ Object cur;
+
+ if ((cur = peek) != null)
+ peek = null;
+ else
+ cur = it.next();
+
+ return cur;
+ }
+
+ /** */
+ private boolean hasNext0() {
+ if (peek == null && !it.hasNext())
+ peek = FINISHED;
+
+ return peek != FINISHED;
+ }
+
+ /**
+ * Add batch to batch collection if it is ready.
+ *
+ * @param batches Collection of batches.
+ * @param batch Batch to be added.
+ */
+ private ArrayList<Batch> markReady(ArrayList<Batch> batches, Batch batch) {
+ if (!batch.ready()) {
+ batch.ready(true);
+
+ if (batches == null)
+ batches = new ArrayList<>();
+
+ batches.add(batch);
+ }
+
+ return batches;
+ }
+
+ /**
+ * @param primaryId Primary node id.
+ * @param rows Rows.
+ * @param dhtVer Dht version assigned at primary node.
+ * @param dhtFutId Dht future id assigned at primary node.
+ */
+ private void processBatchLocalBackupKeys(UUID primaryId, List<Object> rows, GridCacheVersion dhtVer,
+ IgniteUuid dhtFutId) {
+ assert dhtVer != null;
+ assert dhtFutId != null;
+
+ EnlistOperation op = it.operation();
+
+ assert op != EnlistOperation.LOCK;
+
+ boolean keysOnly = op.isDeleteOrLock();
+
+ final ArrayList<KeyCacheObject> keys = new ArrayList<>(rows.size());
+ final ArrayList<Message> vals = keysOnly ? null : new ArrayList<>(rows.size());
+
+ for (Object row : rows) {
+ if (keysOnly)
+ keys.add(cctx.toCacheKeyObject(row));
+ else {
+ keys.add(cctx.toCacheKeyObject(((IgniteBiTuple)row).getKey()));
+ vals.add(cctx.toCacheObject(((IgniteBiTuple)row).getValue()));
+ }
+ }
+
+ try {
+ GridDhtTxRemote dhtTx = cctx.tm().tx(dhtVer);
+
+ if (dhtTx == null) {
+ dhtTx = new GridDhtTxRemote(cctx.shared(),
+ cctx.localNodeId(),
+ dhtFutId,
+ primaryId,
+ lockVer,
+ topVer,
+ dhtVer,
+ null,
+ cctx.systemTx(),
+ cctx.ioPolicy(),
+ PESSIMISTIC,
+ REPEATABLE_READ,
+ false,
+ tx.remainingTime(),
+ -1,
+ this.tx.subjectId(),
+ this.tx.taskNameHash(),
+ false);
+
+ dhtTx.mvccSnapshot(new MvccSnapshotWithoutTxs(mvccSnapshot.coordinatorVersion(),
+ mvccSnapshot.counter(), MVCC_OP_COUNTER_NA, mvccSnapshot.cleanupVersion()));
+
+ dhtTx = cctx.tm().onCreated(null, dhtTx);
+
+ if (dhtTx == null || !cctx.tm().onStarted(dhtTx)) {
+ throw new IgniteTxRollbackCheckedException("Failed to update backup " +
+ "(transaction has been completed): " + dhtVer);
+ }
+ }
+
+ dhtTx.mvccEnlistBatch(cctx, it.operation(), keys, vals, mvccSnapshot.withoutActiveTransactions());
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+
+ return;
+ }
+
+ sendNextBatches(primaryId);
+ }
+
+ /**
+ *
+ * @param node Node.
+ * @param batch Batch.
+ * @param first First mapping flag.
+ */
+ private void sendBatch(ClusterNode node, Batch batch, boolean first) throws IgniteCheckedException {
+ updateMappings(node);
+
+ boolean clientFirst = first && cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
+
+ int batchId = batchCntr.incrementAndGet();
+
+ if (node.isLocal())
+ enlistLocal(batchId, node.id(), batch);
+ else
+ sendBatch(batchId, node.id(), batch, clientFirst);
+ }
+
+ /**
+ * Send batch request to remote data node.
+ *
+ * @param batchId Id of a batch mini-future.
+ * @param nodeId Node id.
+ * @param batchFut Mini-future for the batch.
+ * @param clientFirst {@code true} if originating node is client and it is a first request to any data node.
+ */
+ private void sendBatch(int batchId, UUID nodeId, Batch batchFut, boolean clientFirst) throws IgniteCheckedException {
+ assert batchFut != null;
+
+ GridNearTxEnlistRequest req = new GridNearTxEnlistRequest(cctx.cacheId(),
+ threadId,
+ futId,
+ batchId,
+ tx.subjectId(),
+ topVer,
+ lockVer,
+ mvccSnapshot,
+ clientFirst,
+ remainingTime(),
+ tx.remainingTime(),
+ tx.taskNameHash(),
+ batchFut.rows(),
+ it.operation(),
+ needRes,
+ filter
+ );
+
+ sendRequest(req, nodeId);
+ }
+
+ /**
+ * @param req Request.
+ * @param nodeId Remote node ID
+ * @throws IgniteCheckedException if failed to send.
+ */
+ private void sendRequest(GridCacheMessage req, UUID nodeId) throws IgniteCheckedException {
+ IgniteInternalFuture<?> txSync = cctx.tm().awaitFinishAckAsync(nodeId, tx.threadId());
+
+ if (txSync == null || txSync.isDone())
+ cctx.io().send(nodeId, req, cctx.ioPolicy());
+ else
+ txSync.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> future) {
+ try {
+ cctx.io().send(nodeId, req, cctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ GridNearTxEnlistFuture.this.onDone(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * Enlist batch of entries to the transaction on local node.
+ *
+ * @param batchId Id of a batch mini-future.
+ * @param nodeId Node id.
+ * @param batch Batch.
+ */
+ private void enlistLocal(int batchId, UUID nodeId, Batch batch) throws IgniteCheckedException {
+ Collection<Object> rows = batch.rows();
+
+ GridDhtTxEnlistFuture fut = new GridDhtTxEnlistFuture(nodeId,
+ lockVer,
+ mvccSnapshot,
+ threadId,
+ futId,
+ batchId,
+ tx,
+ remainingTime(),
+ cctx,
+ rows,
+ it.operation(),
+ filter,
+ needRes);
+
+ updateLocalFuture(fut);
+
+ fut.listen(new CI1<IgniteInternalFuture<GridCacheReturn>>() {
+ @Override public void apply(IgniteInternalFuture<GridCacheReturn> fut) {
+ try {
+ clearLocalFuture((GridDhtTxAbstractEnlistFuture)fut);
+
+ GridNearTxEnlistResponse res = fut.error() == null ? createResponse(fut) : null;
+
+ if (checkResponse(nodeId, res, fut.error()))
+ sendNextBatches(nodeId);
+ }
+ catch (IgniteCheckedException e) {
+ checkResponse(nodeId, null, e);
+ }
+ finally {
+ CU.unwindEvicts(cctx);
+ }
+ }
+ });
+
+ fut.init();
+ }
+
+ /**
+ * @param nodeId Sender node id.
+ * @param res Response.
+ */
+ public void onResult(UUID nodeId, GridNearTxEnlistResponse res) {
+ if (checkResponse(nodeId, res, res.error())) {
+
+ Batch batch = batches.get(nodeId);
+
+ if (batch != null && !F.isEmpty(batch.localBackupRows()) && res.dhtFutureId() != null)
+ processBatchLocalBackupKeys(nodeId, batch.localBackupRows(), res.dhtVersion(), res.dhtFutureId());
+ else
+ sendNextBatches(nodeId);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ if (batches.keySet().contains(nodeId)) {
+ if (log.isDebugEnabled())
+ log.debug("Found unacknowledged batch for left node [nodeId=" + nodeId + ", fut=" +
+ this + ']');
+
+ ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to enlist keys " +
+ "(primary node left grid, retry transaction if possible) [node=" + nodeId + ']');
+
+ topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
+
+ processFailure(topEx, null);
+
+ batches.remove(nodeId);
+
+ if (batches.isEmpty()) // Wait for all pending requests.
+ onDone();
+
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Future does not have mapping for left node (ignoring) [nodeId=" + nodeId +
+ ", fut=" + this + ']');
+
+ return false;
+ }
+
+ /**
+ * @param nodeId Originating node ID.
+ * @param res Response.
+ * @param err Exception.
+ * @return {@code True} if future was completed by this call.
+ */
+ @SuppressWarnings("unchecked")
+ public boolean checkResponse(UUID nodeId, GridNearTxEnlistResponse res, Throwable err) {
+ assert res != null || err != null : this;
+
+ if (err == null && res.error() != null)
+ err = res.error();
+
+ if (X.hasCause(err, ClusterTopologyCheckedException.class))
+ tx.removeMapping(nodeId);
+
+ if (err != null)
+ processFailure(err, null);
+
+ if (ex != null) {
+ batches.remove(nodeId);
+
+ if (batches.isEmpty()) // Wait for all pending requests.
+ onDone();
+
+ return false;
+ }
+
+ assert res != null;
+
+ this.res = res.result();
+
+ assert this.res != null && (this.res.emptyResult() || needRes || !this.res.success());
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearTxEnlistFuture.class, this, super.toString());
+ }
+
+ /**
+ * A batch of rows
+ */
+ private static class Batch {
+ /** Node ID. */
+ @GridToStringExclude
+ private final ClusterNode node;
+
+ /** Rows. */
+ private List<Object> rows = new ArrayList<>();
+
+ /** Local backup rows. */
+ private List<Object> locBkpRows;
+
+ /** Readiness flag. Set when batch is full or no new rows are expected. */
+ private boolean ready;
+
+ /**
+ * @param node Cluster node.
+ */
+ private Batch(ClusterNode node) {
+ this.node = node;
+ }
+
+ /**
+ * @return Node.
+ */
+ public ClusterNode node() {
+ return node;
+ }
+
+ /**
+ * Adds a row.
+ *
+ * @param row Row.
+ * @param localBackup {@code true}, when the row key has local backup.
+ */
+ public void add(Object row, boolean localBackup) {
+ rows.add(row);
+
+ if (localBackup) {
+ if (locBkpRows == null)
+ locBkpRows = new ArrayList<>();
+
+ locBkpRows.add(row);
+ }
+ }
+
+ /**
+ * @return number of rows.
+ */
+ public int size() {
+ return rows.size();
+ }
+
+ /**
+ * @return Collection of rows.
+ */
+ public Collection<Object> rows() {
+ return rows;
+ }
+
+ /**
+ * @return Collection of local backup rows.
+ */
+ public List<Object> localBackupRows() {
+ return locBkpRows;
+ }
+
+ /**
+ * @return Readiness flag.
+ */
+ public boolean ready() {
+ return ready;
+ }
+
+ /**
+ * Sets readiness flag.
+ *
+ * @param ready Flag value.
+ */
+ public void ready(boolean ready) {
+ this.ready = ready;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
new file mode 100644
index 0000000..1d87023
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Request to enlist into transaction and acquire locks for entries produced with Cache API operations.
+ *
+ * One request per batch of entries is used.
+ */
+public class GridNearTxEnlistRequest extends GridCacheIdMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long threadId;
+
+ /** Future id. */
+ private IgniteUuid futId;
+
+ /** */
+ private boolean clientFirst;
+
+ /** */
+ private int miniId;
+
+ /** */
+ private UUID subjId;
+
+ /** */
+ private AffinityTopologyVersion topVer;
+
+ /** */
+ private GridCacheVersion lockVer;
+
+ /** Mvcc snapshot. */
+ private MvccSnapshot mvccSnapshot;
+
+ /** */
+ private long timeout;
+
+ /** */
+ private long txTimeout;
+
+ /** */
+ private int taskNameHash;
+
+ /** Rows to enlist. */
+ @GridDirectTransient
+ private Collection<Object> rows;
+
+ /** Serialized rows keys. */
+ @GridToStringExclude
+ private KeyCacheObject[] keys;
+
+ /** Serialized rows values. */
+ @GridToStringExclude
+ private CacheObject[] values;
+
+ /** Enlist operation. */
+ private EnlistOperation op;
+
+ /** Filter. */
+ @GridToStringExclude
+ private CacheEntryPredicate filter;
+
+ /** Need previous value flag. */
+ private boolean needRes;
+
+ /**
+ * Default constructor.
+ */
+ public GridNearTxEnlistRequest() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheId Cache id.
+ * @param threadId Thread id.
+ * @param futId Future id.
+ * @param miniId Mini-future id.
+ * @param subjId Transaction subject id.
+ * @param topVer Topology version.
+ * @param lockVer Lock version.
+ * @param mvccSnapshot Mvcc snapshot.
+ * @param clientFirst First client request flag.
+ * @param timeout Timeout.
+ * @param txTimeout Tx timeout.
+ * @param taskNameHash Task name hash.
+ * @param rows Rows.
+ * @param op Operation.
+ * @param filter Filter.
+ */
+ GridNearTxEnlistRequest(int cacheId,
+ long threadId,
+ IgniteUuid futId,
+ int miniId,
+ UUID subjId,
+ AffinityTopologyVersion topVer,
+ GridCacheVersion lockVer,
+ MvccSnapshot mvccSnapshot,
+ boolean clientFirst,
+ long timeout,
+ long txTimeout,
+ int taskNameHash,
+ Collection<Object> rows,
+ EnlistOperation op,
+ boolean needRes,
+ @Nullable CacheEntryPredicate filter) {
+ this.txTimeout = txTimeout;
+ this.filter = filter;
+ this.cacheId = cacheId;
+ this.threadId = threadId;
+ this.futId = futId;
+ this.miniId = miniId;
+ this.subjId = subjId;
+ this.topVer = topVer;
+ this.lockVer = lockVer;
+ this.mvccSnapshot = mvccSnapshot;
+ this.clientFirst = clientFirst;
+ this.timeout = timeout;
+ this.taskNameHash = taskNameHash;
+ this.rows = rows;
+ this.op = op;
+ this.needRes = needRes;
+ }
+
+ /**
+ * @return Thread id.
+ */
+ public long threadId() {
+ return threadId;
+ }
+
+ /**
+ * @return Future id.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public int miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return Subject id.
+ */
+ public UUID subjectId() {
+ return subjId;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Lock version.
+ */
+ public GridCacheVersion version() {
+ return lockVer;
+ }
+
+ /**
+ * @return MVCC snapshot.
+ */
+ public MvccSnapshot mvccSnapshot() {
+ return mvccSnapshot;
+ }
+
+ /**
+ * @return Timeout milliseconds.
+ */
+ public long timeout() {
+ return timeout;
+ }
+
+ /**
+ * @return Tx timeout milliseconds.
+ */
+ public long txTimeout() {
+ return txTimeout;
+ }
+
+ /**
+ * @return Task name hash.
+ */
+ public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return {@code True} if this is the first client request.
+ */
+ public boolean firstClientRequest() {
+ return clientFirst;
+ }
+
+ /**
+ * @return Collection of rows.
+ */
+ public Collection<Object> rows() {
+ return rows;
+ }
+
+ /**
+ * @return Operation.
+ */
+ public EnlistOperation operation() {
+ return op;
+ }
+
+ /**
+ * @return Need result flag.
+ */
+ public boolean needRes() {
+ return needRes;
+ }
+
+ /**
+ * @return Filter.
+ */
+ public CacheEntryPredicate filter() {
+ return filter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+ CacheObjectContext objCtx = cctx.cacheObjectContext();
+
+ if (rows != null && keys == null) {
+ keys = new KeyCacheObject[rows.size()];
+
+ int i = 0;
+
+ boolean keysOnly = op.isDeleteOrLock();
+
+ values = keysOnly ? null : new CacheObject[keys.length];
+
+ for (Object row : rows) {
+ Object key, val = null;
+
+ if (keysOnly)
+ key = row;
+ else {
+ key = ((IgniteBiTuple)row).getKey();
+ val = ((IgniteBiTuple)row).getValue();
+ }
+
+ assert key != null && (keysOnly || val != null) : "key=" + key + ", val=" + val;
+
+ KeyCacheObject key0 = cctx.toCacheKeyObject(key);
+
+ assert key0 != null;
+
+ key0.prepareMarshal(objCtx);
+
+ keys[i] = key0;
+
+ if (!keysOnly) {
+ CacheObject val0 = cctx.toCacheObject(val);
+
+ assert val0 != null;
+
+ val0.prepareMarshal(objCtx);
+
+ values[i] = val0;
+ }
+
+ i++;
+ }
+ }
+
+ if (filter != null)
+ filter.prepareMarshal(cctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (keys != null) {
+ rows = new ArrayList<>(keys.length);
+
+ CacheObjectContext objCtx = ctx.cacheContext(cacheId).cacheObjectContext();
+
+ for (int i = 0; i < keys.length; i++) {
+ keys[i].finishUnmarshal(objCtx, ldr);
+
+ if (op.isDeleteOrLock())
+ rows.add(keys[i]);
+ else {
+ if (values[i] != null)
+ values[i].finishUnmarshal(objCtx, ldr);
+
+ rows.add(new IgniteBiTuple<>(keys[i], values[i]));
+ }
+ }
+
+ keys = null;
+ values = null;
+ }
+
+ if (filter != null)
+ filter.finishUnmarshal(ctx.cacheContext(cacheId), ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeBoolean("clientFirst", clientFirst))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMessage("filter", filter))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeObjectArray("keys", keys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeMessage("lockVer", lockVer))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeInt("miniId", miniId))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeMessage("mvccSnapshot", mvccSnapshot))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeBoolean("needRes", needRes))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeUuid("subjId", subjId))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
+ if (!writer.writeLong("threadId", threadId))
+ return false;
+
+ writer.incrementState();
+
+ case 15:
+ if (!writer.writeLong("timeout", timeout))
+ return false;
+
+ writer.incrementState();
+
+ case 16:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 17:
+ if (!writer.writeLong("txTimeout", txTimeout))
+ return false;
+
+ writer.incrementState();
+
+ case 18:
+ if (!writer.writeObjectArray("values", values, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ clientFirst = reader.readBoolean("clientFirst");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ filter = reader.readMessage("filter");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ keys = reader.readObjectArray("keys", MessageCollectionItemType.MSG, KeyCacheObject.class);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ lockVer = reader.readMessage("lockVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ miniId = reader.readInt("miniId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ mvccSnapshot = reader.readMessage("mvccSnapshot");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ needRes = reader.readBoolean("needRes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ byte opOrd;
+
+ opOrd = reader.readByte("op");
+
+ if (!reader.isLastRead())
+ return false;
+
+ op = EnlistOperation.fromOrdinal(opOrd);
+
+ reader.incrementState();
+
+ case 12:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
+ threadId = reader.readLong("threadId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 15:
+ timeout = reader.readLong("timeout");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 16:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 17:
+ txTimeout = reader.readLong("txTimeout");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
+ values = reader.readObjectArray("values", MessageCollectionItemType.MSG, CacheObject.class);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearTxEnlistRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 19;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 159;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearTxEnlistRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java
new file mode 100644
index 0000000..4f4bbb6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.ExceptionAware;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A response to {@link GridNearTxEnlistRequest}.
+ */
+public class GridNearTxEnlistResponse extends GridCacheIdMessage implements ExceptionAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Error. */
+ @GridDirectTransient
+ private Throwable err;
+
+ /** Serialized error. */
+ private byte[] errBytes;
+
+ /** Mini future id. */
+ private int miniId;
+
+ /** Result. */
+ private GridCacheReturn res;
+
+ /** */
+ private GridCacheVersion lockVer;
+
+ /** */
+ private GridCacheVersion dhtVer;
+
+ /** */
+ private IgniteUuid dhtFutId;
+
+ /** New DHT nodes involved into transaction. */
+ @GridDirectCollection(UUID.class)
+ private Collection<UUID> newDhtNodes;
+
+ /**
+ * Default constructor.
+ */
+ public GridNearTxEnlistResponse() {
+ // No-op.
+ }
+
+ /**
+ * Constructor for normal result.
+ *
+ * @param cacheId Cache id.
+ * @param futId Future id.
+ * @param miniId Mini future id.
+ * @param lockVer Lock version.
+ * @param res Result.
+ * @param dhtVer Dht version.
+ * @param dhtFutId Dht future id.
+ * @param newDhtNodes New DHT nodes involved into transaction.
+ */
+ public GridNearTxEnlistResponse(int cacheId,
+ IgniteUuid futId,
+ int miniId,
+ GridCacheVersion lockVer,
+ GridCacheReturn res,
+ GridCacheVersion dhtVer,
+ IgniteUuid dhtFutId,
+ Set<UUID> newDhtNodes) {
+ this.cacheId = cacheId;
+ this.futId = futId;
+ this.miniId = miniId;
+ this.lockVer = lockVer;
+ this.res = res;
+ this.dhtVer = dhtVer;
+ this.dhtFutId = dhtFutId;
+ this.newDhtNodes = newDhtNodes;
+ }
+
+ /**
+ * Constructor for error result.
+ *
+ * @param cacheId Cache id.
+ * @param futId Future id.
+ * @param miniId Mini future id.
+ * @param lockVer Lock version.
+ * @param err Error.
+ */
+ public GridNearTxEnlistResponse(int cacheId, IgniteUuid futId, int miniId, GridCacheVersion lockVer,
+ Throwable err) {
+ this.cacheId = cacheId;
+ this.futId = futId;
+ this.miniId = miniId;
+ this.lockVer = lockVer;
+ this.err = err;
+ }
+
+ /**
+ * @return Loc version.
+ */
+ public GridCacheVersion version() {
+ return lockVer;
+ }
+
+ /**
+ * @return Future id.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future id.
+ */
+ public int miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return Result.
+ */
+ public GridCacheReturn result() {
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Throwable error() {
+ return err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+
+ /**
+ * @return Dht version.
+ */
+ public GridCacheVersion dhtVersion() {
+ return dhtVer;
+ }
+
+ /**
+ * @return Dht future id.
+ */
+ public IgniteUuid dhtFutureId() {
+ return dhtFutId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 11;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeIgniteUuid("dhtFutId", dhtFutId))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMessage("dhtVer", dhtVer))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeMessage("lockVer", lockVer))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeInt("miniId", miniId))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeCollection("newDhtNodes", newDhtNodes, MessageCollectionItemType.UUID))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeMessage("res", res))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ dhtFutId = reader.readIgniteUuid("dhtFutId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ dhtVer = reader.readMessage("dhtVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ lockVer = reader.readMessage("lockVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ miniId = reader.readInt("miniId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ newDhtNodes = reader.readCollection("newDhtNodes", MessageCollectionItemType.UUID);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ res = reader.readMessage("res");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearTxEnlistResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 160;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ if (err != null && errBytes == null)
+ errBytes = U.marshal(ctx.marshaller(), err);
+
+ if (res != null)
+ res.prepareMarshal(cctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ if (errBytes != null)
+ err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+ if (res != null)
+ res.finishUnmarshal(cctx, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearTxEnlistResponse.class, this);
+ }
+}