You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/04 16:03:52 UTC
[44/50] [abbrv] ignite git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/ignite into ignite-5797
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --cc modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 722b749,2f7e6c0..afecd17
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@@ -44,19 -43,10 +43,23 @@@ import org.apache.ignite.internal.GridD
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
+ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
+ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
+ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 18a3729,4cb68da..ae369cf
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@@ -61,9 -66,9 +66,10 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
+ import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.trace.TraceProcessor;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
import org.apache.ignite.internal.util.StripedExecutor;
@@@ -420,21 -439,16 +440,23 @@@ public interface GridKernalContext exte
public DataStructuresProcessor dataStructures();
/**
+ * Gets trace processor.
+ *
+ * @return Trace processor.
+ */
+ public TraceProcessor trace();
+
+ /**
- * Sets segmented flag to {@code true} when node is stopped due to segmentation issues.
+ * Checks whether this node is invalid due to a critical error or not.
+ *
+ * @return {@code True} if this node is invalid, {@code false} otherwise.
*/
- public void markSegmented();
+ public boolean invalid();
/**
- * Gets segmented flag.
+ * Checks whether this node detected its segmentation from the rest of the grid.
*
- * @return {@code True} if network is currently segmented, {@code false} otherwise.
+ * @return {@code True} if this node has segmented, {@code false} otherwise.
*/
public boolean segmented();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 58522b8,a0e3f93..33d1d4c
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@@ -78,9 -85,9 +85,10 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
+ import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.trace.TraceProcessor;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
import org.apache.ignite.internal.util.StripedExecutor;
@@@ -283,12 -291,16 +292,20 @@@ public class GridKernalContextImpl impl
@GridToStringExclude
private DataStructuresProcessor dataStructuresProc;
+ /** Cache mvcc coordinators. */
+ @GridToStringExclude
+ private MvccProcessor coordProc;
+
+ /** */
+ @GridToStringExclude
+ private IgniteAuthenticationProcessor authProc;
+
/** */
@GridToStringExclude
+ private TraceProcessor traceProc;
+
+ /** */
+ @GridToStringExclude
private List<GridComponent> comps = new LinkedList<>();
/** */
@@@ -581,11 -618,17 +623,19 @@@
else if (comp instanceof PlatformProcessor)
platformProc = (PlatformProcessor)comp;
else if (comp instanceof PoolProcessor)
- poolProc = (PoolProcessor) comp;
+ poolProc = (PoolProcessor)comp;
else if (comp instanceof GridMarshallerMappingProcessor)
mappingProc = (GridMarshallerMappingProcessor)comp;
+ else if (comp instanceof MvccProcessor)
+ coordProc = (MvccProcessor)comp;
+ else if (comp instanceof PdsFoldersResolver)
+ pdsFolderRslvr = (PdsFoldersResolver)comp;
+ else if (comp instanceof GridInternalSubscriptionProcessor)
+ internalSubscriptionProc = (GridInternalSubscriptionProcessor)comp;
+ else if (comp instanceof IgniteAuthenticationProcessor)
+ authProc = (IgniteAuthenticationProcessor)comp;
+ else if (comp instanceof TraceProcessor)
+ traceProc = (TraceProcessor)comp;
else if (!(comp instanceof DiscoveryNodeValidationProcessor
|| comp instanceof PlatformPluginProcessor))
assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();
@@@ -851,13 -903,8 +910,13 @@@
}
/** {@inheritDoc} */
- @Override public IgniteLogger log(Class<?> cls) {
- return log(cls.getName());
++ @Override public TraceProcessor trace() {
++ return traceProc;
+ }
+
+ /** {@inheritDoc} */
- @Override public void markSegmented() {
- segFlag = true;
+ @Override public IgniteLogger log(String ctgr) {
+ return config().getGridLogger().getLogger(ctgr);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 15ddb77,6b1c995..15e0868
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@@ -145,9 -163,9 +163,10 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
+ import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.trace.TraceProcessor;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.suggestions.JvmConfigurationSuggestions;
import org.apache.ignite.internal.suggestions.OsConfigurationSuggestions;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index 70c5eca,fe61aec..10eddec
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@@ -23,7 -23,7 +23,8 @@@ import java.nio.ByteBuffer
import org.apache.ignite.internal.ExecutorAwareMessage;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+ import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
+import org.apache.ignite.internal.processors.trace.IgniteTraceAware;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 2433705,54efb47..a573506
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@@ -141,9 -176,9 +176,10 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
+ import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest;
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
import org.apache.ignite.internal.util.GridByteArrayList;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.GridLongList;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index c8d5546,9283939..2d084a7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@@ -32,12 -33,16 +33,16 @@@ import org.apache.ignite.internal.Inval
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+ import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+ import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture;
+ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
+ import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@@ -354,7 -371,8 +377,9 @@@ public final class GridDhtTxFinishFutur
tx.activeCachesDeploymentEnabled(),
false,
false,
+ tx.mvccSnapshot(),
- tx.filterUpdateCountersForBackupNode(n));
++ tx.filterUpdateCountersForBackupNode(n),
+ tx.nodeTrace() != null ? new EventsTrace() : null);
try {
cctx.io().send(n, req, tx.ioPolicy());
@@@ -458,7 -484,8 +491,9 @@@
updCntrs,
false,
false,
+ mvccSnapshot,
- commit ? null : tx.filterUpdateCountersForBackupNode(n));
++ commit ? null : tx.filterUpdateCountersForBackupNode(n),
+ tx.nodeTrace() != null ? new EventsTrace() : null);
req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
@@@ -528,7 -555,8 +563,9 @@@
tx.activeCachesDeploymentEnabled(),
false,
false,
+ mvccSnapshot,
- null);
++ null,
+ tx.nodeTrace() != null ? new EventsTrace() : null);
req.writeVersion(tx.writeVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 9bee5dd,61896b5..96cf37c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@@ -25,8 -25,8 +25,9 @@@ import org.apache.ignite.cache.CacheWri
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
+ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@@ -123,7 -135,8 +136,9 @@@ public class GridDhtTxFinishRequest ext
boolean addDepInfo,
boolean retVal,
boolean waitRemoteTxs,
+ MvccSnapshot mvccSnapshot,
- Collection<PartitionUpdateCountersMessage> updCntrs
++ Collection<PartitionUpdateCountersMessage> updCntrs,
+ EventsTrace eventsTrace
) {
super(
xidVer,
@@@ -212,7 -228,8 +233,9 @@@
Collection<Long> updateIdxs,
boolean retVal,
boolean waitRemoteTxs,
+ MvccSnapshot mvccSnapshot,
- Collection<PartitionUpdateCountersMessage> updCntrs
++ Collection<PartitionUpdateCountersMessage> updCntrs,
+ EventsTrace eventsTrace
) {
this(nearNodeId,
futId,
@@@ -238,7 -255,8 +261,9 @@@
addDepInfo,
retVal,
waitRemoteTxs,
+ mvccSnapshot,
- updCntrs);
++ updCntrs,
+ eventsTrace);
if (updateIdxs != null && !updateIdxs.isEmpty()) {
partUpdateCnt = new GridLongList(updateIdxs.size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index b7f264f,a091d44..751ec3f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@@ -37,9 -38,7 +38,8 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
- import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 0d80adf,ffa383b..fa301d7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@@ -40,9 -45,8 +45,10 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
+ import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
+import org.apache.ignite.internal.processors.trace.IgniteTraceAware;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.GridLeanSet;
@@@ -98,9 -109,11 +111,14 @@@ public abstract class GridDhtTxLocalAda
/** Nodes where transactions were started on lock step. */
private Set<ClusterNode> lockTxNodes;
+ /** Enlist or lock future what is currently in progress. */
+ @SuppressWarnings("UnusedDeclaration")
+ @GridToStringExclude
+ protected volatile IgniteInternalFuture<?> lockFut;
+
+ /** */
+ protected EventsTrace eventsTrace;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@@ -846,29 -857,72 +871,95 @@@
}
/**
+ * @return Lock future.
+ */
+ public IgniteInternalFuture<?> lockFuture() {
+ return lockFut;
+ }
+
+ /**
+ * Atomically updates lock future.
+ *
+ * @param oldFut Old future.
+ * @param newFut New future.
+ * @return {@code true} If future was changed.
+ */
+ public boolean updateLockFuture(IgniteInternalFuture<?> oldFut, IgniteInternalFuture<?> newFut) {
+ return LOCK_FUT_UPD.compareAndSet(this, oldFut, newFut);
+ }
+
+ /**
+ * Clears lock future.
+ *
+ * @param cond Clear lock condition.
+ */
+ public void clearLockFuture(@Nullable IgniteInternalFuture cond) {
+ while (true) {
+ IgniteInternalFuture f = lockFut;
+
+ if (f == null
+ || f == ROLLBACK_FUT
+ || (cond != null && f != cond)
+ || updateLockFuture(f, null))
+ return;
+ }
+ }
+
+ /**
+ * @param f Future to finish.
+ * @param err Error.
+ * @param clearLockFut {@code True} if need to clear lock future.
+ * @return Finished future.
+ */
+ public <T> GridFutureAdapter<T> finishFuture(GridFutureAdapter<T> f, Throwable err, boolean clearLockFut) {
+ if (clearLockFut)
+ clearLockFuture(null);
+
+ f.onDone(err);
+
+ return f;
+ }
+
+ /**
+ * Prepare async rollback.
+ *
+ * @return Current lock future or null if it's safe to roll back.
+ */
+ @Nullable public IgniteInternalFuture<?> tryRollbackAsync() {
+ while (true) {
+ final IgniteInternalFuture fut = lockFut;
+
+ if (fut == ROLLBACK_FUT)
+ return null;
+ else if (updateLockFuture(fut, ROLLBACK_FUT))
+ return fut;
+ }
+ }
+
+ /**
+ * @return Node trace.
+ */
+ public EventsTrace nodeTrace() {
+ return eventsTrace;
+ }
+
+ /**
+ * @param eventsTrace Node trace.
+ */
+ public void nodeTrace(EventsTrace eventsTrace) {
+ this.eventsTrace = eventsTrace;
+ }
+
+ /**
+ * @param rmtNodeId Remote node ID.
+ * @param eventsTrace Node trace to collect.
+ */
+ public void collectNodeTrace(UUID rmtNodeId, EventsTrace eventsTrace) {
+ if (this.eventsTrace != null && eventsTrace != null)
+ this.eventsTrace.addRemoteTrace(rmtNodeId, eventsTrace);
+ }
+
+ /**
* @param prepFut Prepare future.
* @return If transaction if finished on prepare step returns future which is completed after transaction finish.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 4be7c0d,0edf63f..185093d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@@ -859,9 -894,10 +897,11 @@@ public final class GridDhtTxPrepareFutu
prepErr,
null,
tx.onePhaseCommit(),
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ tx.nodeTrace());
+ res.mvccSnapshot(tx.mvccSnapshot());
+
if (prepErr == null) {
if (tx.needReturnValue() || tx.nearOnOriginatingNode() || tx.hasInterceptor())
addDhtValues(res);
@@@ -1236,235 -1296,269 +1300,271 @@@
return;
if (last) {
- if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
- for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
- if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
- tx.onePhaseCommit(false);
+ if (waitCrdCntrFut != null) {
+ skipInit = true;
- break;
- }
- }
- }
-
- int miniId = 0;
-
- assert tx.transactionNodes() != null;
-
- final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
+ waitCrdCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccSnapshot>>() {
+ @Override public void apply(IgniteInternalFuture<MvccSnapshot> fut) {
+ try {
+ fut.get();
- // Create mini futures.
- for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
- assert !dhtMapping.empty();
+ sendPrepareRequests();
- ClusterNode n = dhtMapping.primary();
+ markInitialized();
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to get mvcc version for tx [txId=" + tx.nearXidVersion() +
+ ", err=" + e + ']', e);
- assert !n.isLocal();
+ GridNearTxPrepareResponse res = createPrepareResponse(e);
- GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id());
+ onDone(res, res.error());
+ }
+ }
+ });
+ }
+ else
+ sendPrepareRequests();
+ }
+ }
+ finally {
+ if (!skipInit)
+ markInitialized();
+ }
+ }
- Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes();
+ /**
+ *
+ */
+ private void sendPrepareRequests() {
+ if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
+ for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+ if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+ tx.onePhaseCommit(false);
- Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes();
+ break;
+ }
+ }
+ }
- if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
- continue;
+ assert !tx.txState().mvccEnabled(cctx) || !tx.onePhaseCommit() || tx.mvccSnapshot() != null;
- if (tx.remainingTime() == -1)
- return;
+ int miniId = 0;
- MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+ assert tx.transactionNodes() != null;
- add(fut); // Append new future.
+ final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
- assert txNodes != null;
+ // Do not need process active transactions on backups.
+ MvccSnapshot mvccSnapshot = tx.mvccSnapshot();
- GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
- futId,
- fut.futureId(),
- tx.topologyVersion(),
- tx,
- timeout,
- dhtWrites,
- nearWrites,
- txNodes,
- tx.nearXidVersion(),
- true,
- tx.onePhaseCommit(),
- tx.subjectId(),
- tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled(),
- tx.storeWriteThrough(),
- retVal,
- tx.nodeTrace() != null ? new EventsTrace() : null);
+ if (mvccSnapshot != null)
+ mvccSnapshot = mvccSnapshot.withoutActiveTransactions();
- int idx = 0;
+ // Create mini futures.
+ for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
+ assert !dhtMapping.empty() || dhtMapping.queryUpdate();
- for (IgniteTxEntry entry : dhtWrites) {
- try {
- GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
+ ClusterNode n = dhtMapping.primary();
- GridCacheContext<?, ?> cacheCtx = cached.context();
+ assert !n.isLocal();
- // Do not invalidate near entry on originating transaction node.
- req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
- cached.readerId(n.id()) != null);
+ GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id());
- if (cached.isNewLocked()) {
- List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
- tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
+ Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes();
- // Do not preload if local node is a partition owner.
- if (!owners.contains(cctx.localNode()))
- req.markKeyForPreload(idx);
- }
+ Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes();
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- assert false : "Got removed exception on entry with dht local candidate: " + entry;
- }
+ if (!dhtMapping.queryUpdate() && F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
+ continue;
- idx++;
- }
+ MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+
+ add(fut); // Append new future.
+
+ assert req.transactionNodes() != null;
+
+ GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
+ futId,
+ fut.futureId(),
+ tx.topologyVersion(),
+ tx,
+ timeout,
+ dhtWrites,
+ nearWrites,
+ this.req.transactionNodes(),
+ tx.nearXidVersion(),
+ true,
+ tx.onePhaseCommit(),
+ tx.subjectId(),
+ tx.taskNameHash(),
+ tx.activeCachesDeploymentEnabled(),
+ tx.storeWriteThrough(),
+ retVal,
+ mvccSnapshot,
- tx.filterUpdateCountersForBackupNode(n));
++ tx.filterUpdateCountersForBackupNode(n),
++ tx.nodeTrace() != null ? new EventsTrace() : null);
+
+ req.queryUpdate(dhtMapping.queryUpdate());
+
+ int idx = 0;
+
+ for (IgniteTxEntry entry : dhtWrites) {
+ try {
+ GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
- if (!F.isEmpty(nearWrites)) {
- for (IgniteTxEntry entry : nearWrites) {
- try {
- if (entry.explicitVersion() == null) {
- GridCacheMvccCandidate added = entry.cached().candidate(version());
+ GridCacheContext<?, ?> cacheCtx = cached.context();
- assert added != null : "Missing candidate for cache entry:" + entry;
- assert added.dhtLocal();
+ // Do not invalidate near entry on originating transaction node.
+ req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
+ cached.readerId(n.id()) != null);
- if (added.ownerVersion() != null)
- req.owned(entry.txKey(), added.ownerVersion());
- }
+ if (cached.isNewLocked()) {
+ List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
+ tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- assert false : "Got removed exception on entry with dht local candidate: " + entry;
- }
- }
+ // Do not preload if local node is a partition owner.
+ if (!owners.contains(cctx.localNode()))
+ req.markKeyForPreload(idx);
}
- assert req.transactionNodes() != null;
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ assert false : "Got removed exception on entry with dht local candidate: " + entry;
+ }
+ idx++;
+ }
+
+ if (!F.isEmpty(nearWrites)) {
+ for (IgniteTxEntry entry : nearWrites) {
try {
- cctx.io().send(n, req, tx.ioPolicy());
+ if (entry.explicitVersion() == null) {
+ GridCacheMvccCandidate added = entry.cached().candidate(version());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + n.id() + ']');
- }
- }
- catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
- }
- catch (IgniteCheckedException e) {
- if (!cctx.kernalContext().isStopping()) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + n.id() + ']');
- }
+ assert added != null : "Missing candidate for cache entry:" + entry;
+ assert added.dhtLocal();
- fut.onResult(e);
- }
- else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + n.id() +
- ", err=" + e + ']');
- }
+ if (added.ownerVersion() != null)
+ req.owned(entry.txKey(), added.ownerVersion());
}
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ assert false : "Got removed exception on entry with dht local candidate: " + entry;
}
}
+ }
- for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
- if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
- if (tx.remainingTime() == -1)
- return;
+ assert req.transactionNodes() != null;
- MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
+ try {
+ cctx.io().send(n, req, tx.ioPolicy());
- add(fut); // Append new future.
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + n.id() + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + n.id() + ']');
+ }
- GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
- futId,
- fut.futureId(),
- tx.topologyVersion(),
- tx,
- timeout,
- null,
- nearMapping.writes(),
- tx.transactionNodes(),
- tx.nearXidVersion(),
- true,
- tx.onePhaseCommit(),
- tx.subjectId(),
- tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled(),
- tx.storeWriteThrough(),
- retVal,
- tx.nodeTrace() != null ? new EventsTrace() : null);
-
- for (IgniteTxEntry entry : nearMapping.entries()) {
- if (CU.writes().apply(entry)) {
- try {
- if (entry.explicitVersion() == null) {
- GridCacheMvccCandidate added = entry.cached().candidate(version());
+ fut.onResult(e);
+ }
+ }
- assert added != null : "Null candidate for non-group-lock entry " +
- "[added=" + added + ", entry=" + entry + ']';
- assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
- "[added=" + added + ", entry=" + entry + ']';
+ for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+ if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+ if (tx.remainingTime() == -1)
+ return;
- if (added != null && added.ownerVersion() != null)
- req.owned(entry.txKey(), added.ownerVersion());
- }
+ MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
+
+ add(fut); // Append new future.
+
+ GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
+ futId,
+ fut.futureId(),
+ tx.topologyVersion(),
+ tx,
+ timeout,
+ null,
+ nearMapping.writes(),
+ tx.transactionNodes(),
+ tx.nearXidVersion(),
+ true,
+ tx.onePhaseCommit(),
+ tx.subjectId(),
+ tx.taskNameHash(),
+ tx.activeCachesDeploymentEnabled(),
+ tx.storeWriteThrough(),
+ retVal,
+ mvccSnapshot,
- null);
++ null,
++ tx.nodeTrace() != null ? new EventsTrace() : null);
+
+ for (IgniteTxEntry entry : nearMapping.entries()) {
+ if (CU.writes().apply(entry)) {
+ try {
+ if (entry.explicitVersion() == null) {
+ GridCacheMvccCandidate added = entry.cached().candidate(version());
- break;
- } catch (GridCacheEntryRemovedException ignore) {
- assert false : "Got removed exception on entry with dht local candidate: " + entry;
- }
+ assert added != null : "Null candidate for non-group-lock entry " +
+ "[added=" + added + ", entry=" + entry + ']';
+ assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
+ "[added=" + added + ", entry=" + entry + ']';
+
+ if (added != null && added.ownerVersion() != null)
+ req.owned(entry.txKey(), added.ownerVersion());
}
+
+ break;
+ } catch (GridCacheEntryRemovedException ignore) {
+ assert false : "Got removed exception on entry with dht local candidate: " + entry;
}
+ }
+ }
- assert req.transactionNodes() != null;
+ assert req.transactionNodes() != null;
- try {
- cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
+ try {
+ cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.primary().id() + ']');
- }
- }
- catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + nearMapping.primary().id() + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ if (!cctx.kernalContext().isStopping()) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + nearMapping.primary().id() + ']');
}
- catch (IgniteCheckedException e) {
- if (!cctx.kernalContext().isStopping()) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.primary().id() + ']');
- }
- fut.onResult(e);
- }
- else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.primary().id() +
- ", err=" + e + ']');
- }
- }
+ fut.onResult(e);
+ }
+ else {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + nearMapping.primary().id() +
+ ", err=" + e + ']');
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 1de6cb1,30e8ceb..e8d6bbd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@@ -143,8 -155,8 +156,10 @@@ public class GridDhtTxPrepareRequest ex
boolean addDepInfo,
boolean storeWriteThrough,
boolean retVal,
+ MvccSnapshot mvccSnapshot,
- Collection<PartitionUpdateCountersMessage> counters) {
++ Collection<PartitionUpdateCountersMessage> counters,
+ EventsTrace eventsTrace
+ ) {
super(tx,
timeout,
null,
@@@ -174,9 -187,7 +191,12 @@@
nearNodeId = tx.nearNodeId();
+ skipCompletedVers = tx.xidVersion() == tx.nearXidVersion();
++
++
+ this.eventsTrace = eventsTrace;
+
+ recordTracePoint(TracePoint.DHT_PREPARE_REQUEST_CREATED);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 87499a8,140c1d5..3f61b1a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@@ -40,10 -42,7 +42,8 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
- import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
- import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@@ -556,8 -583,8 +586,9 @@@ public class GridNearOptimisticSerializ
tx.subjectId(),
tx.taskNameHash(),
m.clientFirst(),
+ txNodes.size() == 1,
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null);
for (IgniteTxEntry txEntry : writes) {
if (txEntry.op() == TRANSFORM)
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index bd7f2a2,06d7a8c..42ec5ca
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@@ -44,7 -48,7 +48,8 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
+ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
@@@ -538,8 -573,8 +576,9 @@@ public class GridNearOptimisticTxPrepar
tx.subjectId(),
tx.taskNameHash(),
m.clientFirst(),
+ true,
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
- cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null);
++ cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null));
for (IgniteTxEntry txEntry : m.entries()) {
if (txEntry.op() == TRANSFORM)
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 45ff1df,85a48a3..37fd8de
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@@ -33,12 -34,13 +34,15 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
- import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+ import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotResponseListener;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
+import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.C1;
@@@ -213,8 -228,10 +232,11 @@@ public class GridNearPessimisticTxPrepa
tx.subjectId(),
tx.taskNameHash(),
false,
+ true,
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
- cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null);
++ cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null));
+
+ req.queryUpdate(m.queryUpdate());
for (IgniteTxEntry txEntry : writes) {
if (txEntry.op() == TRANSFORM)
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index aeecf87,4a4d8e3..fd8ca5a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@@ -44,10 -49,9 +49,11 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
+import org.apache.ignite.internal.processors.trace.IgniteTraceAware;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+ import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
@@@ -392,15 -401,12 +413,14 @@@ public final class GridNearTxFinishFutu
fut.getClass() == CheckRemoteTxMiniFuture.class;
}
- /**
- * Initializes future.
- *
- * @param commit Commit flag.
- */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- void finish(boolean commit) {
+ /** {@inheritDoc} */
+ @Override @SuppressWarnings("ForLoopReplaceableByForEach")
+ public void finish(final boolean commit, final boolean clearThreadMap, final boolean onTimeout) {
+ tx.recordTracePoint(IgniteTraceAware.TracePoint.TX_COMMIT);
+
+ if (!cctx.mvcc().addFuture(this, futureId()))
+ return;
+
if (tx.onNeedCheckBackup()) {
assert tx.onePhaseCommit();
@@@ -724,8 -806,8 +819,9 @@@
tx.size(),
tx.subjectId(),
tx.taskNameHash(),
+ tx.mvccSnapshot(),
- tx.activeCachesDeploymentEnabled()
+ tx.activeCachesDeploymentEnabled(),
+ cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null
);
// If this is the primary node for the keys.
@@@ -860,7 -947,8 +961,9 @@@
tx.activeCachesDeploymentEnabled(),
!waitRemoteTxs && (tx.needReturnValue() && tx.implicit()),
waitRemoteTxs,
+ null,
- null);
++ null,
+ cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null);
finishReq.checkCommitted(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 5fb9991,6b5aa90..56200eb
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@@ -24,9 -24,10 +24,11 @@@ import java.util.UUID
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
+ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@@ -88,9 -92,8 +93,10 @@@ public class GridNearTxFinishRequest ex
int txSize,
@Nullable UUID subjId,
int taskNameHash,
+ MvccSnapshot mvccSnapshot,
- boolean addDepInfo) {
+ boolean addDepInfo,
+ EventsTrace eventsTrace
+ ) {
super(
xidVer,
futId,
@@@ -115,7 -117,7 +121,9 @@@
explicitLock(explicitLock);
storeEnabled(storeEnabled);
+ this.mvccSnapshot = mvccSnapshot;
++
+ recordTracePoint(TracePoint.NEAR_FINISH_REQUEST_CREATED);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a9a2e99,111f5d2..7be33cc
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@@ -66,8 -70,11 +70,12 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxy;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+ import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyRollbackOnlyImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+ import org.apache.ignite.internal.processors.query.EnlistOperation;
+ import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
+ import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
@@@ -225,14 -255,23 +256,27 @@@ public class GridNearTxLocal extends Gr
false,
txSize,
subjId,
- taskNameHash);
+ taskNameHash,
+ ctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null);
+
+ recordTracePoint(TracePoint.TX_CREATE);
+
+ this.lb = lb;
+
mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl();
+ this.mvccOp = mvccOp;
+
initResult();
+
+ trackTimeout = timeout() > 0 && !implicit() && cctx.time().addTimeoutObject(this);
+ }
+
+ /**
+ * @return Mvcc query version tracker.
+ */
+ public MvccQueryTracker mvccQueryTracker() {
+ return mvccTracker;
}
/** {@inheritDoc} */
@@@ -3099,8 -3714,14 +3719,16 @@@
if (!PREP_FUT_UPD.compareAndSet(this, null, fut))
return prepFut;
+ if (trackTimeout) {
+ prepFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ GridNearTxLocal.this.removeTimeoutHandler();
+ }
+ });
+ }
+
+ recordTracePoint(TracePoint.TX_PREPARE);
+
if (timeout == -1) {
fut.onDone(this, timeoutException());
@@@ -3153,55 -3771,45 +3778,49 @@@
if (log.isDebugEnabled())
log.debug("Committing near local tx: " + this);
- if (fastFinish()) {
- state(PREPARING);
- state(PREPARED);
- state(COMMITTING);
+ final NearTxFinishFuture fut;
+ final NearTxFinishFuture fut0 = finishFut;
- cctx.tm().fastFinishTx(this, true);
+ boolean fastFinish;
- state(COMMITTED);
+ if (fut0 != null || !FINISH_FUT_UPD.compareAndSet(this, null, fut = finishFuture(fastFinish = fastFinish(), true)))
+ return chainFinishFuture(finishFut, true, true, false);
- recordTracePoint(TracePoint.TX_END);
-
- return new GridFinishedFuture<>((IgniteInternalTx)this);
- }
-
- final IgniteInternalFuture<?> prepareFut = prepareNearTxLocal();
-
- GridNearTxFinishFuture fut = commitFut;
-
- if (fut == null &&
- !COMMIT_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, true)))
- return commitFut;
-
- cctx.mvcc().addFuture(fut, fut.futureId());
+ if (!fastFinish) {
+ final IgniteInternalFuture<?> prepareFut = prepareNearTxLocal();
- prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- GridNearTxFinishFuture fut0 = commitFut;
+ prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ try {
+ // Make sure that here are no exceptions.
+ prepareFut.get();
- try {
- // Make sure that here are no exceptions.
- prepareFut.get();
+ fut.finish(true, true, false);
+ }
+ catch (Error | RuntimeException e) {
+ COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
- fut0.finish(true);
- }
- catch (Error | RuntimeException e) {
- COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
+ fut.finish(false, true, false);
- fut0.finish(false);
+ throw e;
+ }
+ catch (IgniteCheckedException e) {
+ COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
- throw e;
+ if (!(e instanceof NodeStoppingException))
+ fut.finish(false, true, true);
+ else
+ fut.onNodeStop(e);
+ }
}
- catch (IgniteCheckedException e) {
- COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
+ });
+ }
- else
++ else {
++ // TODO is this needed?
++ recordTracePoint(TracePoint.TX_END);
+
- if (!(e instanceof NodeStoppingException))
- fut0.finish(false);
- }
- }
- });
+ fut.finish(true, false, false);
++ }
return fut;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 3a31c62,55c809d..3a7a788
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@@ -117,8 -124,8 +125,9 @@@ public class GridNearTxPrepareRequest e
@Nullable UUID subjId,
int taskNameHash,
boolean firstClientReq,
+ boolean allowWaitTopFut,
- boolean addDepInfo
+ boolean addDepInfo,
+ EventsTrace eventsTrace
) {
super(tx,
timeout,
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index a7e9ca3,e9865df..2f259e2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@@ -33,9 -33,9 +33,10 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareResponse;
+ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.trace.EventsTrace;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 6b19db1,3f35c5f..a7a4bd0
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@@ -68,9 -80,9 +80,10 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
+ import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.trace.TraceProcessor;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
import org.apache.ignite.internal.util.StripedExecutor;
@@@ -351,12 -468,14 +469,19 @@@ public class StandaloneGridKernalContex
}
/** {@inheritDoc} */
+ @Override public TraceProcessor trace() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
- @Override public void markSegmented() { }
+ @Override public MvccProcessor coordinators() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean invalid() {
+ return false;
+ }
/** {@inheritDoc} */
@Override public boolean segmented() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index fb1f1e2,314bb52..7630136
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@@ -347,53 -382,77 +383,78 @@@ public class IgniteTxHandler
}
try {
- if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req)) {
- if (txPrepareMsgLog.isDebugEnabled()) {
- txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" +
- "txId=" + req.version() +
- ", node=" + nearNodeId +
- ", reqTopVer=" + req.topologyVersion() +
- ", locTopVer=" + top.topologyVersion() +
- ", req=" + req + ']');
- }
+ if (top != null ) {
+ boolean retry = false;
- GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
- req.partition(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.version(),
- req.version(),
- null,
- null,
- top.topologyVersion(),
- req.onePhaseCommit(),
- req.deployInfo() != null,
- req.nodeTrace());
+ GridDhtTopologyFuture topFut = top.topologyVersionFuture();
- try {
- ctx.io().send(nearNodeId, res, req.policy());
+ if (!req.allowWaitTopologyFuture() && !topFut.isDone()) {
+ retry = true;
if (txPrepareMsgLog.isDebugEnabled()) {
- txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() +
- ", node=" + nearNodeId + ']');
+ txPrepareMsgLog.debug("Topology change is in progress, need remap transaction [" +
+ "txId=" + req.version() +
+ ", node=" + nearNode.id() +
+ ", reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.readyTopologyVersion() +
+ ", req=" + req + ']');
}
}
- catch (ClusterTopologyCheckedException ignored) {
+
+ if (!retry && needRemap(req.topologyVersion(), top.readyTopologyVersion(), req)) {
+ retry = true;
+
if (txPrepareMsgLog.isDebugEnabled()) {
- txPrepareMsgLog.debug("Failed to send remap response for near prepare, node failed [" +
+ txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" +
"txId=" + req.version() +
- ", node=" + nearNodeId + ']');
+ ", node=" + nearNode.id() +
+ ", reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.readyTopologyVersion() +
+ ", req=" + req + ']');
}
}
- catch (IgniteCheckedException e) {
- U.error(txPrepareMsgLog, "Failed to send remap response for near prepare " +
- "[txId=" + req.version() +
- ", node=" + nearNodeId +
- ", req=" + req + ']', e);
+
+ if (retry) {
+ GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.version(),
+ req.version(),
+ null,
+ null,
+ top.lastTopologyChangeVersion(),
+ req.onePhaseCommit(),
- req.deployInfo() != null);
++ req.deployInfo() != null,
++ req.nodeTrace());
+
+ try {
+ ctx.io().send(nearNode, res, req.policy());
+
+ if (txPrepareMsgLog.isDebugEnabled()) {
+ txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() +
+ ", node=" + nearNode.id() + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ if (txPrepareMsgLog.isDebugEnabled()) {
+ txPrepareMsgLog.debug("Failed to send remap response for near prepare, node failed [" +
+ "txId=" + req.version() +
+ ", node=" + nearNode.id() + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(txPrepareMsgLog, "Failed to send remap response for near prepare " +
+ "[txId=" + req.version() +
+ ", node=" + nearNode.id() +
+ ", req=" + req + ']', e);
+ }
+
+ return new GridFinishedFuture<>(res);
}
- return new GridFinishedFuture<>(res);
+ assert topFut.isDone();
}
tx = new GridDhtTxLocal(
@@@ -788,8 -943,7 +945,8 @@@
req.threadId(),
req.futureId(),
req.miniId(),
- new IgniteCheckedException("Transaction has been already completed."),
- new IgniteTxRollbackCheckedException("Transaction has been already completed or not started yet."));
++ new IgniteTxRollbackCheckedException("Transaction has been already completed or not started yet."),
+ req.nodeTrace());
try {
ctx.io().send(nodeId, res, req.policy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index ae533cb,e4c96b4..ae6c4e9
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@@ -55,8 -55,8 +55,9 @@@ import org.apache.ignite.configuration.
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.trace.IgniteTraceAware;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
+ import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;