You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/23 10:41:09 UTC
[07/12] ignite git commit: IGNITE-1525 Return value for cache
operation can be lost with onePhaseCommit
IGNITE-1525 Return value for cache operation can be lost with onePhaseCommit
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b72d18d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b72d18d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b72d18d
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 9b72d18dd94ec1383653f00474c102804c02790a
Parents: c3eff6b
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Sep 19 18:07:20 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Sep 19 18:07:20 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 12 +
.../communication/GridIoMessageFactory.java | 6 +
.../GridCacheReturnCompletableWrapper.java | 101 +++++++++
.../cache/GridDeferredAckMessageSender.java | 219 ++++++++++++++++++
.../GridDistributedTxRemoteAdapter.java | 59 +++--
.../distributed/dht/GridDhtTxFinishFuture.java | 12 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 33 ++-
.../dht/GridDhtTxFinishResponse.java | 52 ++++-
.../dht/GridDhtTxOnePhaseCommitAckRequest.java | 134 +++++++++++
.../distributed/dht/GridDhtTxPrepareFuture.java | 6 +-
.../dht/GridDhtTxPrepareRequest.java | 93 +++++---
.../cache/distributed/dht/GridDhtTxRemote.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 227 +++++--------------
...arOptimisticSerializableTxPrepareFuture.java | 4 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 7 +-
.../GridNearPessimisticTxPrepareFuture.java | 4 +-
.../near/GridNearTxFinishFuture.java | 112 +++++++--
.../cache/transactions/IgniteTxAdapter.java | 46 +++-
.../cache/transactions/IgniteTxHandler.java | 163 ++++++++++---
.../transactions/IgniteTxLocalAdapter.java | 19 +-
.../cache/transactions/IgniteTxManager.java | 154 ++++++++++++-
.../IgniteCachePutRetryAbstractSelfTest.java | 39 +++-
...gniteCachePutRetryTransactionalSelfTest.java | 75 +++++-
.../config/benchmark-client-mode.properties | 2 +
.../config/benchmark-tx-win.properties | 2 +
.../yardstick/config/benchmark-tx.properties | 2 +
.../yardstick/config/benchmark-win.properties | 2 +
modules/yardstick/config/benchmark.properties | 2 +
.../cache/IgniteGetAndPutBenchmark.java | 41 ++++
.../cache/IgniteGetAndPutTxBenchmark.java | 70 ++++++
.../cache/IgniteInvokeTxBenchmark.java | 40 ++++
31 files changed, 1405 insertions(+), 339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 7c428a6..ab6403f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -290,6 +290,18 @@ public final class IgniteSystemProperties {
public static final String IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT = "IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT";
/**
+ * One phase commit deferred ack request timeout.
+ */
+ public static final String IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
+ "IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT";
+
+ /**
+ * One phase commit deferred ack request buffer size.
+ */
+ public static final String IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
+ "IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE";
+
+ /**
* If this property set then debug console will be opened for H2 indexing SPI.
*/
public static final String IGNITE_H2_DEBUG_CONSOLE = "IGNITE_H2_DEBUG_CONSOLE";
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5f60215..8b8a734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRe
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
@@ -160,6 +161,11 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -27:
+ msg = new GridDhtTxOnePhaseCommitAckRequest();
+
+ break;
+
case -26:
msg = new TxLockList();
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
new file mode 100644
index 0000000..8ceaf71
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides initialized GridCacheReturn.
+ */
+public class GridCacheReturnCompletableWrapper {
+ /** Completable wrapper upd. */
+ private static final AtomicReferenceFieldUpdater<GridCacheReturnCompletableWrapper, Object> COMPLETABLE_WRAPPER_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(GridCacheReturnCompletableWrapper.class, Object.class, "o");
+
+ /** */
+ private volatile Object o;
+
+ /** Node id. */
+ private final UUID nodeId;
+
+ /**
+ * @param nodeId Node id.
+ */
+ public GridCacheReturnCompletableWrapper(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return ID of node initiated tx or {@code null} if this node is local.
+ */
+ @Nullable public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Marks as initialized.
+ *
+ * @param ret Return.
+ */
+ public void initialize(GridCacheReturn ret) {
+ final Object obj = this.o;
+
+ if (obj == null) {
+ boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, null, ret);
+
+ if (!res)
+ initialize(ret);
+ }
+ else if (obj instanceof GridFutureAdapter) {
+ ((GridFutureAdapter)obj).onDone(ret);
+
+ boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, obj, ret);
+
+ assert res;
+ }
+ else
+ throw new IllegalStateException("GridCacheReturnCompletableWrapper can't be reinitialized");
+ }
+
+ /**
+ * Allows wait for properly initialized value.
+ */
+ public IgniteInternalFuture<GridCacheReturn> fut() {
+ final Object obj = this.o;
+
+ if (obj instanceof GridCacheReturn)
+ return new GridFinishedFuture<>((GridCacheReturn)obj);
+ else if (obj instanceof IgniteInternalFuture)
+ return (IgniteInternalFuture)obj;
+ else if (obj == null) {
+ boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, null, new GridFutureAdapter<>());
+
+ if (res)
+ return (IgniteInternalFuture)this.o;
+ else
+ return fut();
+ }
+ else
+ throw new IllegalStateException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
new file mode 100644
index 0000000..7145dc2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+/**
+ *
+ */
+public abstract class GridDeferredAckMessageSender {
+ /** Deferred message buffers. */
+ private ConcurrentMap<UUID, DeferredAckMessageBuffer> deferredAckMsgBuffers = new ConcurrentHashMap8<>();
+
+ /** Timeout processor. */
+ private GridTimeoutProcessor time;
+
+ /** Closure processor. */
+ public GridClosureProcessor closure;
+
+ /**
+ * @param time Time.
+ * @param closure Closure.
+ */
+ public GridDeferredAckMessageSender(GridTimeoutProcessor time,
+ GridClosureProcessor closure) {
+ this.time = time;
+ this.closure = closure;
+ }
+
+ /**
+ *
+ */
+ public abstract int getTimeout();
+
+ /**
+ *
+ */
+ public abstract int getBufferSize();
+
+ /**
+ *
+ */
+ public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers);
+
+ /**
+ *
+ */
+ public void stop() {
+ for (DeferredAckMessageBuffer buf : deferredAckMsgBuffers.values())
+ buf.finish0();
+ }
+
+ /**
+ * @param nodeId Node ID to send message to.
+ * @param ver Version to ack.
+ */
+ public void sendDeferredAckMessage(UUID nodeId, GridCacheVersion ver) {
+ while (true) {
+ DeferredAckMessageBuffer buf = deferredAckMsgBuffers.get(nodeId);
+
+ if (buf == null) {
+ buf = new DeferredAckMessageBuffer(nodeId);
+
+ DeferredAckMessageBuffer old = deferredAckMsgBuffers.putIfAbsent(nodeId, buf);
+
+ if (old == null) {
+ // We have successfully added buffer to map.
+ time.addTimeoutObject(buf);
+ }
+ else
+ buf = old;
+ }
+
+ if (!buf.add(ver))
+ // Some thread is sending filled up buffer, we can remove it.
+ deferredAckMsgBuffers.remove(nodeId, buf);
+ else
+ break;
+ }
+ }
+
+ /**
+ * Deferred message buffer.
+ */
+ private class DeferredAckMessageBuffer extends ReentrantReadWriteLock implements GridTimeoutObject {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Filled atomic flag. */
+ private AtomicBoolean guard = new AtomicBoolean(false);
+
+ /** Versions. */
+ private ConcurrentLinkedDeque8<GridCacheVersion> vers = new ConcurrentLinkedDeque8<>();
+
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Timeout ID. */
+ private final IgniteUuid timeoutId;
+
+ /** End time. */
+ private final long endTime;
+
+ /**
+ * @param nodeId Node ID to send message to.
+ */
+ private DeferredAckMessageBuffer(UUID nodeId) {
+ this.nodeId = nodeId;
+
+ timeoutId = IgniteUuid.fromUuid(nodeId);
+
+ endTime = U.currentTimeMillis() + getTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return timeoutId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ if (guard.compareAndSet(false, true)) {
+ closure.runLocalSafe(new Runnable() {
+ @Override public void run() {
+ writeLock().lock();
+
+ try {
+ finish0();
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Adds deferred request to buffer.
+ *
+ * @param ver Version to send.
+ * @return {@code True} if request was handled, {@code false} if this buffer is filled and cannot be used.
+ */
+ public boolean add(GridCacheVersion ver) {
+ readLock().lock();
+
+ boolean snd = false;
+
+ try {
+ if (guard.get())
+ return false;
+
+ vers.add(ver);
+
+ if (vers.sizex() > getBufferSize() && guard.compareAndSet(false, true))
+ snd = true;
+ }
+ finally {
+ readLock().unlock();
+ }
+
+ if (snd) {
+ // Wait all threads in read lock to finish.
+ writeLock().lock();
+
+ try {
+ finish0();
+
+ time.removeTimeoutObject(this);
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Sends deferred notification message and removes this buffer from pending responses map.
+ */
+ private void finish0() {
+ finish(nodeId, vers);
+
+ deferredAckMsgBuffers.remove(nodeId, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 9d9862a..4adfa8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
@@ -448,7 +450,25 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap();
+ GridCacheReturnCompletableWrapper wrapper = null;
+
if (!F.isEmpty(writeMap)) {
+ GridCacheReturn ret = null;
+
+ if (!near() && !local() && onePhaseCommit()) {
+ if (needReturnValue()) {
+ ret = new GridCacheReturn(null, cctx.localNodeId().equals(otherNodeId()), true, null, true);
+
+ UUID origNodeId = otherNodeId(); // Originating node.
+
+ cctx.tm().addCommittedTxReturn(this,
+ wrapper = new GridCacheReturnCompletableWrapper(
+ !cctx.localNodeId().equals(origNodeId) ? origNodeId : null));
+ }
+ else
+ cctx.tm().addCommittedTx(this, this.nearXidVersion(), null);
+ }
+
// Register this transaction as completed prior to write-phase to
// ensure proper lock ordering for removed entries.
cctx.tm().addCommittedTx(this);
@@ -457,13 +477,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
batchStoreCommit(writeMap().values());
- // Node that for near transactions we grab all entries.
- for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) {
- GridCacheContext cacheCtx = txEntry.context();
+ try {
+ // Node that for near transactions we grab all entries.
+ for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) {
+ GridCacheContext cacheCtx = txEntry.context();
- boolean replicate = cacheCtx.isDrEnabled();
+ boolean replicate = cacheCtx.isDrEnabled();
- try {
while (true) {
try {
GridCacheEntryEx cached = txEntry.cached();
@@ -486,7 +506,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
txEntry.cached().unswap(false);
IgniteBiTuple<GridCacheOperation, CacheObject> res =
- applyTransformClosures(txEntry, false);
+ applyTransformClosures(txEntry, false, ret);
GridCacheOperation op = res.get1();
CacheObject val = res.get2();
@@ -672,21 +692,26 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
}
}
- catch (Throwable ex) {
- // In case of error, we still make the best effort to commit,
- // as there is no way to rollback at this point.
- err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
- "(all transaction entries will be invalidated): " + CU.txString(this), ex);
+ }
+ catch (Throwable ex) {
+ // In case of error, we still make the best effort to commit,
+ // as there is no way to rollback at this point.
+ err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
+ "(all transaction entries will be invalidated): " + CU.txString(this), ex);
- U.error(log, "Commit failed.", err);
+ U.error(log, "Commit failed.", err);
- uncommit();
+ uncommit();
- state(UNKNOWN);
+ state(UNKNOWN);
- if (ex instanceof Error)
- throw (Error)ex;
- }
+ if (ex instanceof Error)
+ throw (Error)ex;
+
+ }
+ finally {
+ if (wrapper != null)
+ wrapper.initialize(ret);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index d2e26b4..ac2ab41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -351,7 +351,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.size(),
tx.subjectId(),
tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ false,
+ false);
try {
cctx.io().send(n, req, tx.ioPolicy());
@@ -448,7 +450,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.subjectId(),
tx.taskNameHash(),
tx.activeCachesDeploymentEnabled(),
- updCntrs);
+ updCntrs,
+ false,
+ false);
req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
@@ -516,7 +520,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.size(),
tx.subjectId(),
tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ false,
+ false);
req.writeVersion(tx.writeVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 2d98e0d..c618a18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -46,6 +46,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** */
public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+ /** */
+ public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x02;
+
/** Near node ID. */
private UUID nearNodeId;
@@ -141,7 +144,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
int txSize,
@Nullable UUID subjId,
int taskNameHash,
- boolean addDepInfo
+ boolean addDepInfo,
+ boolean retVal,
+ boolean waitRemoteTxs
) {
super(
xidVer,
@@ -172,6 +177,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
this.sysInvalidate = sysInvalidate;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
+
+ needReturnValue(retVal);
+ waitRemoteTransactions(waitRemoteTxs);
}
/**
@@ -224,11 +232,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
@Nullable UUID subjId,
int taskNameHash,
boolean addDepInfo,
- Collection<Long> updateIdxs
+ Collection<Long> updateIdxs,
+ boolean retVal,
+ boolean waitRemoteTxs
) {
this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
- subjId, taskNameHash, addDepInfo);
+ subjId, taskNameHash, addDepInfo, retVal, waitRemoteTxs);
if (updateIdxs != null && !updateIdxs.isEmpty()) {
partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -339,6 +349,23 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
}
+ /**
+ * @return Flag indicating whether transaction needs return value.
+ */
+ public boolean needReturnValue() {
+ return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param retVal Need return value.
+ */
+ public void needReturnValue(boolean retVal) {
+ if (retVal)
+ flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
+ else
+ flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 78dc16f..0618172 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -19,9 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.io.Externalizable;
import java.nio.ByteBuffer;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -51,6 +52,9 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** Flag indicating if this is a check-committed response. */
private boolean checkCommitted;
+ /** Cache return value. */
+ private GridCacheReturn retVal;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -112,6 +116,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
if (checkCommittedErr != null && checkCommittedErrBytes == null)
checkCommittedErrBytes = ctx.marshaller().marshal(checkCommittedErr);
+
+ if (retVal != null && retVal.cacheId() != 0) {
+ GridCacheContext cctx = ctx.cacheContext(retVal.cacheId());
+
+ assert cctx != null : retVal.cacheId();
+
+ retVal.prepareMarshal(cctx);
+ }
}
/** {@inheritDoc} */
@@ -121,6 +133,28 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
if (checkCommittedErrBytes != null && checkCommittedErr == null)
checkCommittedErr = ctx.marshaller().unmarshal(checkCommittedErrBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+ if (retVal != null && retVal.cacheId() != 0) {
+ GridCacheContext cctx = ctx.cacheContext(retVal.cacheId());
+
+ assert cctx != null : retVal.cacheId();
+
+ retVal.finishUnmarshal(cctx, ldr);
+ }
+ }
+
+ /**
+ * @param retVal Return value.
+ */
+ public void returnValue(GridCacheReturn retVal) {
+ this.retVal = retVal;
+ }
+
+ /**
+ * @return Return value.
+ */
+ public GridCacheReturn returnValue() {
+ return retVal;
}
/** {@inheritDoc} */
@@ -161,6 +195,12 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
writer.incrementState();
+ case 8:
+ if (!writer.writeMessage("retVal", retVal))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -201,6 +241,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
+ case 8:
+ retVal = reader.readMessage("retVal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridDhtTxFinishResponse.class);
@@ -213,6 +261,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 9;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
new file mode 100644
index 0000000..0c8ae69
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * One Phase Commit Near transaction ack request.
+ */
+public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Lock or transaction versions. */
+ @GridToStringInclude
+ @GridDirectCollection(GridCacheVersion.class)
+ protected Collection<GridCacheVersion> vers;
+
+ /**
+ * Default constructor.
+ */
+ public GridDhtTxOnePhaseCommitAckRequest() {
+ // No-op.
+ }
+
+ /**
+ *
+ * @param vers Near Tx xid Versions.
+ */
+ public GridDhtTxOnePhaseCommitAckRequest(Collection<GridCacheVersion> vers) {
+ this.vers = vers;
+ }
+
+ /**
+ * @return Version.
+ */
+ public Collection<GridCacheVersion> versions() {
+ return vers;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtTxOnePhaseCommitAckRequest.class, this, super.toString());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeCollection("vers", vers, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ vers = reader.readCollection("vers", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtTxOnePhaseCommitAckRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -27;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ec73bff..1dbda69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1245,7 +1245,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
tx.onePhaseCommit(),
tx.subjectId(),
tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ retVal);
int idx = 0;
@@ -1356,7 +1357,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
tx.onePhaseCommit(),
tx.subjectId(),
tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ retVal);
for (IgniteTxEntry entry : nearMapping.entries()) {
if (CU.writes().apply(entry)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 1cdc96f..a8f2087 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -52,6 +52,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01;
+
/** Max order. */
private UUID nearNodeId;
@@ -100,6 +103,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** Preload keys. */
private BitSet preloadKeys;
+ /** */
+ private byte flags;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -118,6 +124,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
* @param txNodes Transaction nodes mapping.
* @param nearXidVer Near transaction ID.
* @param last {@code True} if this is last prepare request for node.
+ * @param retVal Need return value flag.
* @param addDepInfo Deployment info flag.
*/
public GridDhtTxPrepareRequest(
@@ -134,7 +141,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
boolean onePhaseCommit,
UUID subjId,
int taskNameHash,
- boolean addDepInfo) {
+ boolean addDepInfo,
+ boolean retVal) {
super(tx, timeout, null, dhtWrites, txNodes, onePhaseCommit, addDepInfo);
assert futId != null;
@@ -149,12 +157,31 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
this.subjId = subjId;
this.taskNameHash = taskNameHash;
+ needReturnValue(retVal);
+
invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : dhtWrites.size());
nearNodeId = tx.nearNodeId();
}
/**
+ * @return Flag indicating whether transaction needs return value.
+ */
+ public boolean needReturnValue() {
+ return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param retVal Need return value.
+ */
+ public void needReturnValue(boolean retVal) {
+ if (retVal)
+ flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
+ else
+ flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
+ }
+
+ /**
* @return {@code True} if this is last prepare request for node.
*/
public boolean last() {
@@ -348,78 +375,84 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
switch (writer.state()) {
case 23:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 24:
- if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
+ if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
case 25:
- if (!writer.writeBoolean("last", last))
+ if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
return false;
writer.incrementState();
case 26:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeBoolean("last", last))
return false;
writer.incrementState();
case 27:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 28:
- if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
case 29:
- if (!writer.writeMessage("nearXidVer", nearXidVer))
+ if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 30:
- if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
case 31:
- if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 32:
- if (!writer.writeBitSet("preloadKeys", preloadKeys))
+ if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 33:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBitSet("preloadKeys", preloadKeys))
return false;
writer.incrementState();
case 34:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 35:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 36:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -442,7 +475,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
switch (reader.state()) {
case 23:
- futId = reader.readIgniteUuid("futId");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -450,7 +483,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 24:
- invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
+ futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
return false;
@@ -458,7 +491,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 25:
- last = reader.readBoolean("last");
+ invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
if (!reader.isLastRead())
return false;
@@ -466,7 +499,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 26:
- miniId = reader.readIgniteUuid("miniId");
+ last = reader.readBoolean("last");
if (!reader.isLastRead())
return false;
@@ -474,7 +507,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 27:
- nearNodeId = reader.readUuid("nearNodeId");
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -482,7 +515,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 28:
- nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
+ nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
return false;
@@ -490,7 +523,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 29:
- nearXidVer = reader.readMessage("nearXidVer");
+ nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -498,7 +531,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 30:
- ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
+ nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
return false;
@@ -506,7 +539,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 31:
- ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
+ ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -514,7 +547,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 32:
- preloadKeys = reader.readBitSet("preloadKeys");
+ ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -522,7 +555,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 33:
- subjId = reader.readUuid("subjId");
+ preloadKeys = reader.readBitSet("preloadKeys");
if (!reader.isLastRead())
return false;
@@ -530,7 +563,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 34:
- taskNameHash = reader.readInt("taskNameHash");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -538,6 +571,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 35:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 36:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -557,6 +598,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 36;
+ return 37;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index dc27eb1..6ad20c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -189,9 +189,9 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
commitVer,
sys,
plc,
- concurrency,
- isolation,
- invalidate,
+ concurrency,
+ isolation,
+ invalidate,
timeout,
txSize,
subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1e45fa7..30a3d57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -29,9 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
@@ -60,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
+import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -82,7 +80,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -102,11 +99,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
@@ -144,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
/** Pending */
- private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
+ private GridDeferredAckMessageSender deferredUpdateMessageSender;
/** */
private GridNearAtomicCache<K, V> near;
@@ -240,6 +235,53 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public void start() throws IgniteCheckedException {
super.start();
+ deferredUpdateMessageSender = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) {
+ @Override public int getTimeout() {
+ return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
+ }
+
+ @Override public int getBufferSize() {
+ return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE;
+ }
+
+ @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
+ GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
+ vers, ctx.deploymentEnabled());
+
+ try {
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ ctx.io().send(nodeId, msg, ctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() +
+ ", node=" + nodeId + ']');
+ }
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+ catch (IllegalStateException ignored) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
+ "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Failed to send deferred DHT update response, node left [" +
+ "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send deferred DHT update response to remote node [" +
+ "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e);
+ }
+ }
+ };
+
CacheMetricsImpl m = new CacheMetricsImpl(ctx);
if (ctx.dht().near() != null)
@@ -405,8 +447,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public void stop() {
- for (DeferredResponseBuffer buf : pendingResponses.values())
- buf.finish();
+ deferredUpdateMessageSender.stop();
}
/**
@@ -3208,28 +3249,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param ver Version to ack.
*/
private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) {
- while (true) {
- DeferredResponseBuffer buf = pendingResponses.get(nodeId);
-
- if (buf == null) {
- buf = new DeferredResponseBuffer(nodeId);
-
- DeferredResponseBuffer old = pendingResponses.putIfAbsent(nodeId, buf);
-
- if (old == null) {
- // We have successfully added buffer to map.
- ctx.time().addTimeoutObject(buf);
- }
- else
- buf = old;
- }
-
- if (!buf.addResponse(ver))
- // Some thread is sending filled up buffer, we can remove it.
- pendingResponses.remove(nodeId, buf);
- else
- break;
- }
+ deferredUpdateMessageSender.sendDeferredAckMessage(nodeId, ver);
}
/**
@@ -3452,149 +3472,4 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return Collections.emptyList();
}
}
-
- /**
- * Deferred response buffer.
- */
- private class DeferredResponseBuffer extends ReentrantReadWriteLock implements GridTimeoutObject {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Filled atomic flag. */
- private AtomicBoolean guard = new AtomicBoolean(false);
-
- /** Response versions. */
- private ConcurrentLinkedDeque8<GridCacheVersion> respVers = new ConcurrentLinkedDeque8<>();
-
- /** Node ID. */
- private final UUID nodeId;
-
- /** Timeout ID. */
- private final IgniteUuid timeoutId;
-
- /** End time. */
- private final long endTime;
-
- /**
- * @param nodeId Node ID to send message to.
- */
- private DeferredResponseBuffer(UUID nodeId) {
- this.nodeId = nodeId;
-
- timeoutId = IgniteUuid.fromUuid(nodeId);
-
- endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid timeoutId() {
- return timeoutId;
- }
-
- /** {@inheritDoc} */
- @Override public long endTime() {
- return endTime;
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- if (guard.compareAndSet(false, true)) {
- ctx.closures().runLocalSafe(new Runnable() {
- @Override public void run() {
- writeLock().lock();
-
- try {
- finish();
- }
- finally {
- writeLock().unlock();
- }
- }
- });
- }
- }
-
- /**
- * Adds deferred response to buffer.
- *
- * @param ver Version to send.
- * @return {@code True} if response was handled, {@code false} if this buffer is filled and cannot be used.
- */
- public boolean addResponse(GridCacheVersion ver) {
- readLock().lock();
-
- boolean snd = false;
-
- try {
- if (guard.get())
- return false;
-
- respVers.add(ver);
-
- if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true))
- snd = true;
- }
- finally {
- readLock().unlock();
- }
-
- if (snd) {
- // Wait all threads in read lock to finish.
- writeLock().lock();
-
- try {
- finish();
-
- ctx.time().removeTimeoutObject(this);
- }
- finally {
- writeLock().unlock();
- }
- }
-
- return true;
- }
-
- /**
- * Sends deferred notification message and removes this buffer from pending responses map.
- */
- private void finish() {
- GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
- respVers, ctx.deploymentEnabled());
-
- try {
- ctx.kernalContext().gateway().readLock();
-
- try {
- ctx.io().send(nodeId, msg, ctx.ioPolicy());
-
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() +
- ", node=" + nodeId + ']');
- }
- }
- finally {
- ctx.kernalContext().gateway().readUnlock();
- }
- }
- catch (IllegalStateException ignored) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
- }
- }
- catch (ClusterTopologyCheckedException ignored) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Failed to send deferred DHT update response, node left [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send deferred DHT update response to remote node [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e);
- }
-
- pendingResponses.remove(nodeId, this);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index d251528..4cbfb27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -526,7 +526,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
) {
GridCacheContext cacheCtx = entry.context();
- List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+ List<ClusterNode> nodes = cacheCtx.isLocal() ?
+ cacheCtx.affinity().nodes(entry.key(), topVer) :
+ cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
txMapping.addMapping(nodes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index e17a76c..91cfbda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -27,7 +27,6 @@ import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
@@ -599,9 +598,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
GridCacheEntryEx cached0 = entry.cached();
if (cached0.isDht())
- nodes = cacheCtx.affinity().nodes(cached0.partition(), topVer);
+ nodes = cacheCtx.topology().nodes(cached0.partition(), topVer);
else
- nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+ nodes = cacheCtx.isLocal() ?
+ cacheCtx.affinity().nodes(entry.key(), topVer) :
+ cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
txMapping.addMapping(nodes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 34b8281..5c09398 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -193,7 +193,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
GridCacheContext cacheCtx = txEntry.context();
- List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer);
+ List<ClusterNode> nodes = cacheCtx.isLocal() ?
+ cacheCtx.affinity().nodes(txEntry.key(), topVer) :
+ cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
ClusterNode primary = F.first(nodes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index bb5d482..46604c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -34,6 +34,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
@@ -76,6 +78,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
public static final IgniteProductVersion PRIMARY_SYNC_TXS_SINCE = IgniteProductVersion.fromString("1.6.0");
/** */
+ public static final IgniteProductVersion ACK_DHT_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.6.8");
+
+ /** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
@@ -251,6 +256,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
assert f.node().id().equals(nodeId);
+ if (res.returnValue() != null)
+ tx.implicitSingleResult(res.returnValue());
+
f.onDhtFinishResponse(res);
}
}
@@ -432,6 +440,50 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
catch (IgniteCheckedException e) {
onDone(e);
}
+ finally {
+ if (commit &&
+ tx.onePhaseCommit() &&
+ !tx.writeMap().isEmpty()) // Readonly operations require no ack.
+ ackBackup();
+ }
+ }
+
+ /**
+ *
+ */
+ private void ackBackup() {
+ if (mappings.empty())
+ return;
+
+ if (!tx.needReturnValue() || !tx.implicit())
+ return; // GridCacheReturn was not saved at backup.
+
+ GridDistributedTxMapping mapping = mappings.singleMapping();
+
+ if (mapping != null) {
+ UUID nodeId = mapping.node().id();
+
+ Collection<UUID> backups = tx.transactionNodes().get(nodeId);
+
+ if (!F.isEmpty(backups)) {
+ assert backups.size() == 1 : backups;
+
+ UUID backupId = F.first(backups);
+
+ ClusterNode backup = cctx.discovery().node(backupId);
+
+ // Nothing to do if backup has left the grid.
+ if (backup == null) {
+ // No-op.
+ }
+ else if (backup.isLocal())
+ cctx.tm().removeTxReturn(tx.xidVersion());
+ else {
+ if (ACK_DHT_ONE_PHASE_SINCE.compareToIgnoreTimestamp(backup.version()) <= 0)
+ cctx.tm().sendDeferredAckResponse(backupId, tx.xidVersion());
+ }
+ }
+ }
}
/**
@@ -475,23 +527,48 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
readyNearMappingFromBackup(mapping);
if (committed) {
- if (tx.syncMode() == FULL_SYNC) {
- GridCacheVersion nearXidVer = tx.nearXidVersion();
+ try {
+ if (tx.needReturnValue() && tx.implicit()) {
+ GridCacheReturnCompletableWrapper wrapper =
+ cctx.tm().getCommittedTxReturn(tx.xidVersion());
- assert nearXidVer != null : tx;
+ assert wrapper != null : tx.xidVersion();
- IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer);
+ GridCacheReturn retVal = wrapper.fut().get();
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- mini.onDone(tx);
- }
- });
+ assert retVal != null;
+
+ tx.implicitSingleResult(retVal);
+ }
- return;
+ if (tx.syncMode() == FULL_SYNC) {
+ GridCacheVersion nearXidVer = tx.nearXidVersion();
+
+ assert nearXidVer != null : tx;
+
+ IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer);
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ mini.onDone(tx);
+ }
+ });
+
+ return;
+ }
+
+ mini.onDone(tx);
}
+ catch (IgniteCheckedException e) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Near finish fut, failed to finish [" +
+ "txId=" + tx.nearXidVersion() +
+ ", node=" + backup.id() +
+ ", err=" + e + ']');
+ }
- mini.onDone(tx);
+ mini.onDone(e);
+ }
}
else {
ClusterTopologyCheckedException cause =
@@ -504,7 +581,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
else {
- GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId());
+ GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId(), false);
// Preserve old behavior, otherwise response is not sent.
if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) > 0)
@@ -765,9 +842,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/**
* @param miniId Mini future ID.
+ * @param waitRemoteTxs Wait for remote txs.
* @return Finish request.
*/
- private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) {
+ private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId, boolean waitRemoteTxs) {
GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
cctx.localNodeId(),
futureId(),
@@ -791,7 +869,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
0,
null,
0,
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ !waitRemoteTxs && (tx.needReturnValue() && tx.implicit()),
+ waitRemoteTxs);
finishReq.checkCommitted(true);
@@ -872,9 +952,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
add(mini);
- GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId());
-
- req.waitRemoteTransactions(true);
+ GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId(), true);
for (UUID backupId : backups) {
ClusterNode backup = cctx.discovery().node(backupId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eb2989e..18c3011 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
@@ -151,6 +152,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
@GridToStringExclude
protected GridCacheSharedContext<?, ?> cctx;
+ /** Need return value. */
+ protected boolean needRetVal;
+
/**
* End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
* assigned to this transaction at the end of write phase.
@@ -695,6 +699,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/**
+ * @return Flag indicating whether transaction needs return value.
+ */
+ public boolean needReturnValue() {
+ return needRetVal;
+ }
+
+ /**
+ * @param needRetVal Need return value flag.
+ */
+ public void needReturnValue(boolean needRetVal) {
+ this.needRetVal = needRetVal;
+ }
+
+ /**
* Gets remaining allowed transaction time.
*
* @return Remaining transaction time. {@code 0} if timeout isn't specified. {@code -1} if time is out.
@@ -1285,7 +1303,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
if (intercept || !F.isEmpty(e.entryProcessors()))
e.cached().unswap(false);
- IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false);
+ IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false, null);
GridCacheContext cacheCtx = e.context();
@@ -1443,13 +1461,15 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
/**
* @param txEntry Entry to process.
* @param metrics {@code True} if metrics should be updated.
+ * @param ret Optional return value to initialize.
* @return Tuple containing transformation results.
* @throws IgniteCheckedException If failed to get previous value for transform.
* @throws GridCacheEntryRemovedException If entry was concurrently deleted.
*/
protected IgniteBiTuple<GridCacheOperation, CacheObject> applyTransformClosures(
IgniteTxEntry txEntry,
- boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException {
+ boolean metrics,
+ @Nullable GridCacheReturn ret) throws GridCacheEntryRemovedException, IgniteCheckedException {
GridCacheContext cacheCtx = txEntry.context();
assert cacheCtx != null;
@@ -1457,8 +1477,12 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
if (isSystemInvalidate())
return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null);
- if (F.isEmpty(txEntry.entryProcessors()))
+ if (F.isEmpty(txEntry.entryProcessors())) {
+ if (ret != null)
+ ret.value(cacheCtx, txEntry.value(), txEntry.keepBinary());
+
return F.t(txEntry.op(), txEntry.value());
+ }
else {
T2<GridCacheOperation, CacheObject> calcVal = txEntry.entryProcessorCalculatedValue();
@@ -1508,17 +1532,27 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(
txEntry.key(), key, cacheVal, val, ver, keepBinary, txEntry.cached());
+ Object procRes = null;
+ Exception err = null;
+
try {
EntryProcessor<Object, Object, Object> processor = t.get1();
- processor.process(invokeEntry, t.get2());
+ procRes = processor.process(invokeEntry, t.get2());
val = invokeEntry.getValue();
key = invokeEntry.key();
}
- catch (Exception ignore) {
- // No-op.
+ catch (Exception e) {
+ err = e;
+ }
+
+ if (ret != null) {
+ if (err != null || procRes != null)
+ ret.addEntryProcessResult(txEntry.context(), txEntry.key(), null, procRes, err, keepBinary);
+ else
+ ret.invokeResult(true);
}
modified |= invokeEntry.modified();