You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/05/18 11:35:45 UTC
[ignite] branch master updated: IGNITE-12989 TxRecovery should log
statuses to TimeBag (#7795)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 902a949 IGNITE-12989 TxRecovery should log statuses to TimeBag (#7795)
902a949 is described below
commit 902a94991c083cac2d829ea5464c1b7028d44c4d
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Mon May 18 14:35:24 2020 +0300
IGNITE-12989 TxRecovery should log statuses to TimeBag (#7795)
---
.../cache/transactions/IgniteTxManager.java | 61 ++++++++++++++++++++--
...GridExchangeFreeCellularSwitchAbstractTest.java | 6 +++
...ridExchangeFreeCellularSwitchIsolationTest.java | 22 ++++++++
3 files changed, 85 insertions(+), 4 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 36f1a4e..189328d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteCompute;
@@ -93,6 +94,7 @@ import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedExceptio
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
+import org.apache.ignite.internal.util.TimeBag;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgnitePair;
@@ -2386,14 +2388,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param tx Transaction.
* @param failedNodeIds Failed nodes IDs.
*/
- public void commitIfPrepared(IgniteInternalTx tx, Set<UUID> failedNodeIds) {
+ public IgniteInternalFuture<Boolean> commitIfPrepared(IgniteInternalTx tx, Set<UUID> failedNodeIds) {
assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx;
assert !F.isEmpty(tx.transactionNodes()) : tx;
assert tx.nearXidVersion() != null : tx;
// Transaction will be completed by finish message.
if (!tx.markFinalizing(RECOVERY_FINISH))
- return;
+ return null;
GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture(
cctx,
@@ -2407,6 +2409,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
log.info("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
fut.prepare();
+
+ return fut;
}
/**
@@ -2975,6 +2979,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** */
private final MvccCoordinator mvccCrd;
+ /** Time bag to measure and store tx recovery stages times. */
+ private final TimeBag timeBag = new TimeBag(log.isInfoEnabled());
+
+ /** Prepared tx to be recovered count. */
+ private final AtomicLong preparedTxCnt = new AtomicLong();
+
+ /** Recovery finished future. */
+ private final GridCompoundFuture<Boolean, ?> doneFut = new GridCompoundFuture<>();
+
/**
* @param node Failed node.
* @param mvccCrd Mvcc coordinator at time of node failure.
@@ -2986,6 +2999,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** {@inheritDoc} */
@Override public void run() {
+ timeBag.finishGlobalStage("Started");
+
try {
cctx.kernalContext().gateway().readLock();
}
@@ -3017,14 +3032,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// Check prepare only if originating node ID failed. Otherwise parent node will finish this tx.
if (tx.originatingNodeId().equals(evtNodeId)) {
if (tx.state() == PREPARED)
- commitIfPrepared(tx, Collections.singleton(evtNodeId));
+ processPrepared(tx, evtNodeId);
else {
IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
if (prepFut != null) {
prepFut.listen(fut -> {
if (tx.state() == PREPARED)
- commitIfPrepared(tx, Collections.singleton(evtNodeId));
+ processPrepared(tx, evtNodeId);
// If we could not mark tx as rollback, it means that transaction is being committed.
else if (tx.setRollbackOnly())
tx.rollbackAsync();
@@ -3043,6 +3058,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
}
+ timeBag.finishGlobalStage("Initialized");
+
+ doneFut.markInitialized();
+
+ if (log.isInfoEnabled() && preparedTxCnt.get() > 0)
+ doneFut.listen(fut -> finishAndRecordTimings());
+
if (allTxFinFut == null)
return;
@@ -3078,6 +3100,37 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
cctx.kernalContext().gateway().readUnlock();
}
}
+
+ /**
+ * @param tx Tx.
+ * @param failedNode Failed node.
+ */
+ private void processPrepared(IgniteInternalTx tx, UUID failedNode) {
+ IgniteInternalFuture<Boolean> fut = commitIfPrepared(tx, Collections.singleton(failedNode));
+
+ if (fut != null)
+ doneFut.add(fut);
+
+ preparedTxCnt.incrementAndGet();
+ }
+
+ /**
+ *
+ */
+ private void finishAndRecordTimings() {
+ timeBag.finishGlobalStage("Finished");
+
+ StringBuilder timingsToLog = new StringBuilder();
+
+ timingsToLog.append("TxRecovery Status and Timings [txs=").append(preparedTxCnt.get());
+
+ for (String stageTiming : timeBag.stagesTimings())
+ timingsToLog.append(", ").append(stageTiming);
+
+ timingsToLog.append("]");
+
+ log.info(timingsToLog.toString());
+ }
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java
index 8795f64..1f5ec6c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -58,6 +59,9 @@ public abstract class GridExchangeFreeCellularSwitchAbstractTest extends GridCom
/** Replicated cache name. */
protected static final String REPL_CACHE_NAME = "replicated";
+ /** */
+ protected final ListeningTestLogger listeningLog = new ListeningTestLogger(log);
+
/**
* {@inheritDoc}
*/
@@ -82,6 +86,8 @@ public abstract class GridExchangeFreeCellularSwitchAbstractTest extends GridCom
cfg.setDataStorageConfiguration(dsCfg);
+ cfg.setGridLogger(listeningLog);
+
return cfg;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java
index e45399f..42a4fb2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java
@@ -37,11 +37,15 @@ import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.testframework.LogListener.matches;
+
/**
*
*/
@@ -71,6 +75,14 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
public void testOnlyAffectedNodesWaitForRecovery() throws Exception {
int nodes = 6;
+ String recoveryStatusMsg = "TxRecovery Status and Timings [txs=";
+
+ LogListener lsnrAny = matches(recoveryStatusMsg).build(); // Any.
+ LogListener lsnrBackup = matches(recoveryStatusMsg).times((nodes / 2) - 1).build(); // Cell 1 (backups).
+ LogListener lsnrNear = matches(recoveryStatusMsg).times((nodes / 2)).build(); // Cell 2 (near).
+
+ listeningLog.registerListener(lsnrAny);
+
startGridsMultiThreaded(nodes);
blockRecoveryMessages();
@@ -240,6 +252,10 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
nearNodes, 1,
vers);
+ assertFalse(lsnrAny.check());
+
+ listeningLog.registerListener(lsnrNear);
+
failed.close(); // Stopping node.
awaitForSwitchOnNodeLeft(failed);
@@ -375,6 +391,10 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
nearNodes, 0,
vers);
+ assertTrue(waitForCondition(lsnrNear::check, 5000));
+
+ listeningLog.registerListener(lsnrBackup);
+
// Partitioned recovery.
for (Ignite ignite : G.allGrids()) {
TestRecordingCommunicationSpi spi =
@@ -411,6 +431,8 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
nearNodes, 0,
vers);
+ assertTrue(waitForCondition(lsnrBackup::check, 5000));
+
for (IgniteInternalFuture<?> fut : futs)
fut.get();