You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/09/19 15:41:02 UTC
[06/10] ignite git commit: IGNITE-1525 Return value for cache
operation can be lost with onePhaseCommit
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e67e60f..a5b2202 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
@@ -175,6 +177,12 @@ public class IgniteTxHandler {
}
});
+ ctx.io().addHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
+ @Override public void apply(UUID nodeId, GridCacheMessage msg) {
+ processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg);
+ }
+ });
+
ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg);
@@ -882,7 +890,7 @@ public class IgniteTxHandler {
* @param nodeId Sender node ID.
* @param req Request.
*/
- protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest req) {
+ protected final void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() +
", dhtTxId=" + req.version() +
@@ -918,14 +926,15 @@ public class IgniteTxHandler {
if (dhtTx != null) {
dhtTx.onePhaseCommit(true);
+ dhtTx.needReturnValue(req.needReturnValue());
- finish(nodeId, dhtTx, req);
+ finish(dhtTx, req);
}
if (nearTx != null) {
nearTx.onePhaseCommit(true);
- finish(nodeId, nearTx, req);
+ finish(nearTx, req);
}
}
}
@@ -950,38 +959,60 @@ public class IgniteTxHandler {
req.deployInfo() != null);
}
- try {
- // Reply back to sender.
- ctx.io().send(nodeId, res, req.policy());
+ if (req.onePhaseCommit()) {
+ IgniteInternalFuture completeFut;
- if (txPrepareMsgLog.isDebugEnabled()) {
- txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() +
- ", dhtTxId=" + req.version() +
- ", node=" + nodeId + ']');
- }
- }
- catch (IgniteCheckedException e) {
- if (e instanceof ClusterTopologyCheckedException) {
- if (txPrepareMsgLog.isDebugEnabled()) {
- txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() +
- ", dhtTxId=" + req.version() +
- ", node=" + nodeId + ']');
- }
- }
- else {
- U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" +
- "txId=" + req.nearXidVersion() +
- ", dhtTxId=" + req.version() +
- ", node=" + nodeId +
- ", err=" + e.getMessage() + ']');
+ IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
+ null : dhtTx.done() ? null : dhtTx.finishFuture();
+
+ final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
+ null : nearTx.done() ? null : nearTx.finishFuture();
+
+ if (dhtFin != null && nearFin != null) {
+ GridCompoundFuture fut = new GridCompoundFuture();
+
+ fut.add(dhtFin);
+ fut.add(nearFin);
+
+ fut.markInitialized();
+
+ completeFut = fut;
}
+ else
+ completeFut = dhtFin != null ? dhtFin : nearFin;
- if (nearTx != null)
- nearTx.rollback();
+ if (completeFut != null) {
+ final GridDhtTxPrepareResponse res0 = res;
+ final GridDhtTxRemote dhtTx0 = dhtTx;
+ final GridNearTxRemote nearTx0 = nearTx;
- if (dhtTx != null)
- dhtTx.rollback();
+ completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+ sendReply(nodeId, req, res0, dhtTx0, nearTx0);
+ }
+ });
+ }
+ else
+ sendReply(nodeId, req, res, dhtTx, nearTx);
}
+ else
+ sendReply(nodeId, req, res, dhtTx, nearTx);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param req Request.
+ */
+ protected final void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
+ final GridDhtTxOnePhaseCommitAckRequest req) {
+ assert nodeId != null;
+ assert req != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Processing dht tx one phase commit ack request [nodeId=" + nodeId + ", req=" + req + ']');
+
+ for (GridCacheVersion ver : req.versions())
+ ctx.tm().removeTxReturn(ver);
}
/**
@@ -1139,12 +1170,10 @@ public class IgniteTxHandler {
}
/**
- * @param nodeId Node ID.
* @param tx Transaction.
* @param req Request.
*/
protected void finish(
- UUID nodeId,
GridDistributedTxRemoteAdapter tx,
GridDhtTxPrepareRequest req) throws IgniteTxHeuristicCheckedException {
assert tx != null : "No transaction for one-phase commit prepare request: " + req;
@@ -1177,6 +1206,52 @@ public class IgniteTxHandler {
}
/**
+ * @param nodeId Node id.
+ * @param req Request.
+ * @param res Response.
+ * @param dhtTx Dht tx.
+ * @param nearTx Near tx.
+ */
+ protected void sendReply(UUID nodeId,
+ GridDhtTxPrepareRequest req,
+ GridDhtTxPrepareResponse res,
+ GridDhtTxRemote dhtTx,
+ GridNearTxRemote nearTx) {
+ try {
+ // Reply back to sender.
+ ctx.io().send(nodeId, res, req.policy());
+
+ if (txPrepareMsgLog.isDebugEnabled()) {
+ txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() +
+ ", dhtTxId=" + req.version() +
+ ", node=" + nodeId + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (e instanceof ClusterTopologyCheckedException) {
+ if (txPrepareMsgLog.isDebugEnabled()) {
+ txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() +
+ ", dhtTxId=" + req.version() +
+ ", node=" + nodeId + ']');
+ }
+ }
+ else {
+ U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" +
+ "txId=" + req.nearXidVersion() +
+ ", dhtTxId=" + req.version() +
+ ", node=" + nodeId +
+ ", err=" + e.getMessage() + ']');
+ }
+
+ if (nearTx != null)
+ nearTx.rollback();
+
+ if (dhtTx != null)
+ dhtTx.rollback();
+ }
+ }
+
+ /**
* Sends tx finish response to remote node, if response is requested.
*
* @param nodeId Node id that originated finish request.
@@ -1191,7 +1266,26 @@ public class IgniteTxHandler {
if (req.checkCommitted()) {
res.checkCommitted(true);
- if (!committed) {
+ if (committed) {
+ if (req.needReturnValue()) {
+ try {
+ GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version());
+
+ if (wrapper != null)
+ res.returnValue(wrapper.fut().get());
+ else
+ assert !ctx.discovery().alive(nodeId) : nodeId;
+ }
+ catch (IgniteCheckedException e) {
+ if (txFinishMsgLog.isDebugEnabled()) {
+ txFinishMsgLog.debug("Failed to gain entry processor return value. [txId=" + nearTxId +
+ ", dhtTxId=" + req.version() +
+ ", node=" + nodeId + ']');
+ }
+ }
+ }
+ }
+ else {
ClusterTopologyCheckedException cause =
new ClusterTopologyCheckedException("Primary node left grid.");
@@ -1492,8 +1586,7 @@ public class IgniteTxHandler {
* @param req Request.
*/
protected void processCheckPreparedTxRequest(final UUID nodeId,
- final GridCacheTxRecoveryRequest req)
- {
+ final GridCacheTxRecoveryRequest req) {
if (txRecoveryMsgLog.isDebugEnabled()) {
txRecoveryMsgLog.debug("Received tx recovery request [txId=" + req.nearXidVersion() +
", node=" + nodeId + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 637f322..fe69536 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -151,9 +151,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
/** Commit error. */
protected volatile Throwable commitErr;
- /** Need return value. */
- protected boolean needRetVal;
-
/** Implicit transaction result. */
protected GridCacheReturn implicitRes;
@@ -355,13 +352,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
- * @return Flag indicating whether transaction needs return value.
- */
- public boolean needReturnValue() {
- return needRetVal;
- }
-
- /**
* @return {@code True} if transaction participates in a cache that has an interceptor configured.
*/
public boolean hasInterceptor() {
@@ -369,13 +359,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
- * @param needRetVal Need return value flag.
- */
- public void needReturnValue(boolean needRetVal) {
- this.needRetVal = needRetVal;
- }
-
- /**
* @param snd {@code True} if values in tx entries should be replaced with transformed values and sent
* to remote nodes.
*/
@@ -703,7 +686,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
txEntry.cached().unswap(false);
IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
- true);
+ true, null);
GridCacheVersion dhtVer = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index f9357f9..a1580a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -49,7 +50,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -57,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLo
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
@@ -87,8 +92,11 @@ import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
import org.jsr166.ConcurrentLinkedHashMap;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
@@ -123,6 +131,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** Tx salvage timeout (default 3s). */
private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);
+ /** One phase commit deferred ack request timeout. */
+ public static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
+ Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT, 500);
+
+ /** One phase commit deferred ack request buffer size. */
+ private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
+ Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256);
+
/** Version in which deadlock detection introduced. */
public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19");
@@ -160,7 +176,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
/** Committed local transactions. */
- private final ConcurrentLinkedHashMap<GridCacheVersion, Boolean> completedVersHashMap =
+ private final ConcurrentLinkedHashMap<GridCacheVersion, Object> completedVersHashMap =
new ConcurrentLinkedHashMap<>(
Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
0.75f,
@@ -168,6 +184,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
PER_SEGMENT_Q);
+ /** Pending one phase commit ack requests sender. */
+ private GridDeferredAckMessageSender deferredAckMessageSender;
+
/** Transaction finish synchronizer. */
private GridCacheTxFinishSync txFinishSync;
@@ -209,6 +228,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
for (TxDeadlockFuture fut : deadlockDetectFuts.values())
fut.onNodeLeft(nodeId);
+
+ for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
+ Object obj = entry.getValue();
+
+ if (obj instanceof GridCacheReturnCompletableWrapper &&
+ nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId()))
+ removeTxReturn(entry.getKey());
+ }
}
},
EVT_NODE_FAILED, EVT_NODE_LEFT);
@@ -237,6 +264,33 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
txFinishSync = new GridCacheTxFinishSync<>(cctx);
txHnd = new IgniteTxHandler(cctx);
+
+ deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+ @Override public int getTimeout() {
+ return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
+ }
+
+ @Override public int getBufferSize() {
+ return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
+ }
+
+ @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
+ GridDhtTxOnePhaseCommitAckRequest ackReq = new GridDhtTxOnePhaseCommitAckRequest(vers);
+
+ cctx.kernalContext().gateway().readLock();
+
+ try {
+ cctx.io().send(nodeId, ackReq, GridIoPolicy.SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to send one phase commit ack to backup node [backup=" +
+ nodeId + ']', e);
+ }
+ finally {
+ cctx.kernalContext().gateway().readUnlock();
+ }
+ }
+ };
}
/** {@inheritDoc} */
@@ -898,9 +952,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
*/
public void addCommittedTx(IgniteInternalTx tx) {
addCommittedTx(tx, tx.xidVersion(), tx.nearXidVersion());
+ }
- if (!tx.local() && !tx.near() && tx.onePhaseCommit())
- addCommittedTx(tx, tx.nearXidVersion(), null);
+ /**
+ * @param tx Committed transaction.
+ */
+ public void addCommittedTxReturn(IgniteInternalTx tx, GridCacheReturnCompletableWrapper ret) {
+ addCommittedTxReturn(tx.nearXidVersion(), null, ret);
}
/**
@@ -925,7 +983,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (nearXidVer != null)
xidVer = new CommittedVersion(xidVer, nearXidVer);
- Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, true);
+ Object committed0 = completedVersHashMap.putIfAbsent(xidVer, true);
if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
Boolean b = completedVersSorted.putIfAbsent(xidVer, true);
@@ -933,7 +991,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert b == null;
}
- return committed0 == null || committed0;
+ Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
+
+ return committed0 == null || committed;
+ }
+
+ /**
+ * @param xidVer Completed transaction version.
+ * @param nearXidVer Optional near transaction ID.
+ * @param retVal Invoke result.
+ */
+ private void addCommittedTxReturn(
+ GridCacheVersion xidVer,
+ @Nullable GridCacheVersion nearXidVer,
+ GridCacheReturnCompletableWrapper retVal
+ ) {
+ assert retVal != null;
+
+ if (nearXidVer != null)
+ xidVer = new CommittedVersion(xidVer, nearXidVer);
+
+ Object prev = completedVersHashMap.putIfAbsent(xidVer, retVal);
+
+ assert prev == null || Boolean.FALSE.equals(prev) : prev; // Can be rolled back.
}
/**
@@ -945,7 +1025,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
IgniteInternalTx tx,
GridCacheVersion xidVer
) {
- Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, false);
+ Object committed0 = completedVersHashMap.putIfAbsent(xidVer, false);
if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
Boolean b = completedVersSorted.putIfAbsent(xidVer, false);
@@ -953,7 +1033,47 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert b == null;
}
- return committed0 == null || !committed0;
+ Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
+
+ return committed0 == null || !committed;
+ }
+
+ /**
+ * @param xidVer xidVer Completed transaction version.
+ * @return Tx result.
+ */
+ public GridCacheReturnCompletableWrapper getCommittedTxReturn(GridCacheVersion xidVer) {
+ Object retVal = completedVersHashMap.get(xidVer);
+
+ // Will gain true in regular case or GridCacheReturn in onePhaseCommit case.
+ if (!Boolean.TRUE.equals(retVal)) {
+ assert !Boolean.FALSE.equals(retVal); // Method should be used only after 'committed' checked.
+
+ GridCacheReturnCompletableWrapper res = (GridCacheReturnCompletableWrapper)retVal;
+
+ removeTxReturn(xidVer);
+
+ return res;
+ }
+ else
+ return null;
+ }
+
+ /**
+ * @param xidVer xidVer Completed transaction version.
+ */
+ public void removeTxReturn(GridCacheVersion xidVer) {
+ Object prev = completedVersHashMap.get(xidVer);
+
+ if (Boolean.FALSE.equals(prev)) // Tx can be rolled back.
+ return;
+
+ assert prev instanceof GridCacheReturnCompletableWrapper:
+ prev + " instead of GridCacheReturnCompletableWrapper";
+
+ boolean res = completedVersHashMap.replace(xidVer, prev, true);
+
+ assert res;
}
/**
@@ -1086,7 +1206,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* so we don't do it here.
*/
- Boolean committed = completedVersHashMap.get(tx.xidVersion());
+ Object committed0 = completedVersHashMap.get(tx.xidVersion());
+
+ Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
// 1. Make sure that committed version has been recorded.
if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
@@ -1672,12 +1794,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
boolean committed = false;
- for (Map.Entry<GridCacheVersion, Boolean> entry : completedVersHashMap.entrySet()) {
+ for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
if (entry.getKey() instanceof CommittedVersion) {
CommittedVersion comm = (CommittedVersion)entry.getKey();
if (comm.nearVer.equals(xidVer)) {
- committed = entry.getValue();
+ committed = !entry.getValue().equals(Boolean.FALSE);
break;
}
@@ -1809,8 +1931,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// Not all transactions were found. Need to scan committed versions to check
// if transaction was already committed.
- for (Map.Entry<GridCacheVersion, Boolean> e : completedVersHashMap.entrySet()) {
- if (!e.getValue())
+ for (Map.Entry<GridCacheVersion, Object> e : completedVersHashMap.entrySet()) {
+ if (e.getValue().equals(Boolean.FALSE))
continue;
GridCacheVersion ver = e.getKey();
@@ -2137,6 +2259,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param nodeId Node ID to send message to.
+ * @param ver Version to ack.
+ */
+ public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) {
+ deferredAckMessageSender.sendDeferredAckMessage(nodeId, ver);
+ }
+
+ /**
* @return Collection of active transaction deadlock detection futures.
*/
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index e611723..c3d194b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,7 +31,6 @@ import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
-
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
@@ -43,8 +43,10 @@ import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -59,6 +61,7 @@ import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
import static org.apache.ignite.testframework.GridTestUtils.TestMemoryMode;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
@@ -70,7 +73,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final long DURATION = 60_000;
+ protected static final long DURATION = 60_000;
/** */
protected static final int GRID_CNT = 4;
@@ -78,8 +81,8 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
/**
* @return Keys count for the test.
*/
- private int keysCount() {
- return 10_000;
+ protected int keysCount() {
+ return 2_000;
}
/**
@@ -249,12 +252,17 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
+ Random rnd = new Random();
+
while (!finished.get()) {
stopGrid(3);
U.sleep(300);
startGrid(3);
+
+ if (rnd.nextBoolean()) // OPC possible only when there is no migration from one backup to another.
+ awaitPartitionMapExchange();
}
return null;
@@ -456,6 +464,29 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
assertTrue("Unexpected atomic futures: " + futs, futs.isEmpty());
}
+
+ checkOnePhaseCommitReturnValuesCleaned();
+ }
+
+ /**
+ *
+ */
+ protected void checkOnePhaseCommitReturnValuesCleaned() throws IgniteInterruptedCheckedException {
+ U.sleep(DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT);
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ IgniteKernal ignite = (IgniteKernal)grid(i);
+
+ IgniteTxManager tm = ignite.context().cache().context().tm();
+
+ Map completedVersHashMap = U.field(tm, "completedVersHashMap");
+
+ for (Object o : completedVersHashMap.values()) {
+ assertTrue("completedVersHashMap contains" + o.getClass() + " instead of boolean. " +
+ "These values should be replaced by boolean after onePhaseCommit finished. " +
+ "[node=" + i + "]", o instanceof Boolean);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 9204bc8..9bfde27 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.HashSet;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
@@ -88,16 +89,6 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
}
}
- /** {@inheritDoc} */
- @Override public void testGetAndPut() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1525");
- }
-
- /** {@inheritDoc} */
- @Override public void testInvoke() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1525");
- }
-
/**
* @throws Exception If failed.
*/
@@ -217,6 +208,70 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
}
/**
+ *
+ */
+ public void testOriginatingNodeFailureForcesOnePhaseCommitDataCleanup() throws Exception {
+ ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP, false));
+
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ final int keysCnt = keysCount();
+
+ IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Random rnd = new Random();
+
+ while (!finished.get()) {
+ stopGrid(0);
+
+ U.sleep(300);
+
+ startGrid(0);
+
+ if (rnd.nextBoolean()) // OPC possible only when there is no migration from one backup to another.
+ awaitPartitionMapExchange();
+ }
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<Object> fut2 = runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int iter = 0;
+
+ while (!finished.get()) {
+ try {
+ IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
+
+ Integer val = ++iter;
+
+ for (int i = 0; i < keysCnt; i++)
+ cache.invoke(i, new SetEntryProcessor(val));
+ }
+ catch (Exception e) {
+ // No-op.
+ }
+ }
+
+ return null;
+ }
+ });
+
+ try {
+ U.sleep(DURATION);
+ }
+ finally {
+ finished.set(true);
+
+ fut.get();
+ fut2.get();
+ }
+
+ checkOnePhaseCommitReturnValuesCleaned();
+ }
+
+ /**
* Callable to process inside transaction.
*/
private static class ProcessCallable implements Callable<Void> {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-client-mode.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-client-mode.properties b/modules/yardstick/config/benchmark-client-mode.properties
index ba5525f..f7c8347 100644
--- a/modules/yardstick/config/benchmark-client-mode.properties
+++ b/modules/yardstick/config/benchmark-client-mode.properties
@@ -70,6 +70,8 @@ CONFIGS="\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds ${ver}atomic-put-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds ${ver}atomic-put-get-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds ${ver}tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds ${ver}tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds ${ver}tx-invoke-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds ${ver}tx-put-get-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds ${ver}sql-query-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds ${ver}sql-query-join-1-backup,\
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-tx-win.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-tx-win.properties b/modules/yardstick/config/benchmark-tx-win.properties
index 73b857d..54a40b1 100644
--- a/modules/yardstick/config/benchmark-tx-win.properties
+++ b/modules/yardstick/config/benchmark-tx-win.properties
@@ -54,6 +54,8 @@ set DRIVER_HOSTS=localhost
:: Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute).
set CONFIGS=^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapBenchmark -sn IgniteNode -ds tx-put-offheap-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapValuesBenchmark -sn IgniteNode -ds tx-put-offheap-val-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,^
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-tx.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-tx.properties b/modules/yardstick/config/benchmark-tx.properties
index f3dbc24..0d5bb02 100644
--- a/modules/yardstick/config/benchmark-tx.properties
+++ b/modules/yardstick/config/benchmark-tx.properties
@@ -59,6 +59,8 @@ nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo ${DRIVER_HOSTS}
# Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute).
CONFIGS="\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapBenchmark -sn IgniteNode -ds tx-put-offheap-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapValuesBenchmark -sn IgniteNode -ds tx-put-offheap-val-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,\
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-win.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-win.properties b/modules/yardstick/config/benchmark-win.properties
index b6ecd67..b75b5d6 100644
--- a/modules/yardstick/config/benchmark-win.properties
+++ b/modules/yardstick/config/benchmark-win.properties
@@ -59,6 +59,8 @@ set CONFIGS=^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds atomic-put-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds atomic-put-get-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds sql-query-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds sql-query-join-1-backup,^
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark.properties b/modules/yardstick/config/benchmark.properties
index 67ef5ef..cfc1499 100644
--- a/modules/yardstick/config/benchmark.properties
+++ b/modules/yardstick/config/benchmark.properties
@@ -71,6 +71,8 @@ CONFIGS="\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds ${ver}atomic-put-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds ${ver}atomic-put-get-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds ${ver}tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds ${ver}tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds ${ver}tx-invoke-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds ${ver}tx-put-get-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds ${ver}sql-query-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds ${ver}sql-query-join-1-backup,\
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
new file mode 100644
index 0000000..40e563c
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+
+/**
+ * Ignite benchmark that performs invoke operations.
+ */
+public class IgniteGetAndPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ int key = nextRandom(args.range());
+
+ cache.getAndPut(key, new SampleValue(key));
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Object> cache() {
+ return ignite().cache("atomic");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
new file mode 100644
index 0000000..49ae985
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.yardstick.IgniteBenchmarkUtils;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+
+/**
+ * Ignite benchmark that performs invoke operations.
+ */
+public class IgniteGetAndPutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
+ /** */
+ private IgniteTransactions txs;
+
+ /** */
+ private Callable<Void> clo;
+
+ /** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK"))
+ ignite().compute().broadcast(new WaitMapExchangeFinishCallable());
+
+ txs = ignite().transactions();
+
+ clo = new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int key = nextRandom(args.range());
+
+ cache.getAndPut(key, new SampleValue(key));
+
+ return null;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Object> cache() {
+ return ignite().cache("tx");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
index 8f05598..64dc6b8 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
@@ -17,12 +17,52 @@
package org.apache.ignite.yardstick.cache;
+import java.util.Map;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.yardstick.IgniteBenchmarkUtils;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
/**
* Ignite benchmark that performs invoke operations.
*/
public class IgniteInvokeTxBenchmark extends IgniteInvokeBenchmark {
+ /** */
+ private IgniteTransactions txs;
+
+ /** */
+ private Callable<Void> clo;
+
+ /** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK"))
+ ignite().compute().broadcast(new WaitMapExchangeFinishCallable());
+
+ txs = ignite().transactions();
+
+ clo = new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int key = nextRandom(args.range());
+
+ cache.invoke(key, new SetValueEntryProcessor(new SampleValue(key)));
+
+ return null;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo);
+
+ return true;
+ }
+
/** {@inheritDoc} */
@Override protected IgniteCache<Integer, Object> cache() {
return ignite().cache("tx");