You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/05/18 11:35:45 UTC

[ignite] branch master updated: IGNITE-12989 TxRecovery should log statuses to TimeBag (#7795)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 902a949  IGNITE-12989 TxRecovery should log statuses to TimeBag (#7795)
902a949 is described below

commit 902a94991c083cac2d829ea5464c1b7028d44c4d
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Mon May 18 14:35:24 2020 +0300

    IGNITE-12989 TxRecovery should log statuses to TimeBag (#7795)
---
 .../cache/transactions/IgniteTxManager.java        | 61 ++++++++++++++++++++--
 ...GridExchangeFreeCellularSwitchAbstractTest.java |  6 +++
 ...ridExchangeFreeCellularSwitchIsolationTest.java | 22 ++++++++
 3 files changed, 85 insertions(+), 4 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 36f1a4e..189328d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteCompute;
@@ -93,6 +94,7 @@ import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedExceptio
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
+import org.apache.ignite.internal.util.TimeBag;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgnitePair;
@@ -2386,14 +2388,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @param tx Transaction.
      * @param failedNodeIds Failed nodes IDs.
      */
-    public void commitIfPrepared(IgniteInternalTx tx, Set<UUID> failedNodeIds) {
+    public IgniteInternalFuture<Boolean> commitIfPrepared(IgniteInternalTx tx, Set<UUID> failedNodeIds) {
         assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx;
         assert !F.isEmpty(tx.transactionNodes()) : tx;
         assert tx.nearXidVersion() != null : tx;
 
         // Transaction will be completed by finish message.
         if (!tx.markFinalizing(RECOVERY_FINISH))
-            return;
+            return null;
 
         GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture(
             cctx,
@@ -2407,6 +2409,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             log.info("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
 
         fut.prepare();
+
+        return fut;
     }
 
     /**
@@ -2975,6 +2979,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         /** */
         private final MvccCoordinator mvccCrd;
 
+        /** Time bag to measure and store tx recovery stages times. */
+        private final TimeBag timeBag = new TimeBag(log.isInfoEnabled());
+
+        /** Prepared tx to be recovered count. */
+        private final AtomicLong preparedTxCnt = new AtomicLong();
+
+        /** Recovery finished future. */
+        private final GridCompoundFuture<Boolean, ?> doneFut = new GridCompoundFuture<>();
+
         /**
          * @param node Failed node.
          * @param mvccCrd Mvcc coordinator at time of node failure.
@@ -2986,6 +2999,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         /** {@inheritDoc} */
         @Override public void run() {
+            timeBag.finishGlobalStage("Started");
+
             try {
                 cctx.kernalContext().gateway().readLock();
             }
@@ -3017,14 +3032,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                         // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx.
                         if (tx.originatingNodeId().equals(evtNodeId)) {
                             if (tx.state() == PREPARED)
-                                commitIfPrepared(tx, Collections.singleton(evtNodeId));
+                                processPrepared(tx, evtNodeId);
                             else {
                                 IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
 
                                 if (prepFut != null) {
                                     prepFut.listen(fut -> {
                                         if (tx.state() == PREPARED)
-                                            commitIfPrepared(tx, Collections.singleton(evtNodeId));
+                                            processPrepared(tx, evtNodeId);
                                             // If we could not mark tx as rollback, it means that transaction is being committed.
                                         else if (tx.setRollbackOnly())
                                             tx.rollbackAsync();
@@ -3043,6 +3058,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     }
                 }
 
+                timeBag.finishGlobalStage("Initialized");
+
+                doneFut.markInitialized();
+
+                if (log.isInfoEnabled() && preparedTxCnt.get() > 0)
+                    doneFut.listen(fut -> finishAndRecordTimings());
+
                 if (allTxFinFut == null)
                     return;
 
@@ -3078,6 +3100,37 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 cctx.kernalContext().gateway().readUnlock();
             }
         }
+
+        /**
+         * @param tx Tx.
+         * @param failedNode Failed node.
+         */
+        private void processPrepared(IgniteInternalTx tx, UUID failedNode) {
+            IgniteInternalFuture<Boolean> fut = commitIfPrepared(tx, Collections.singleton(failedNode));
+
+            if (fut != null)
+                doneFut.add(fut);
+
+            preparedTxCnt.incrementAndGet();
+        }
+
+        /**
+         *
+         */
+        private void finishAndRecordTimings() {
+            timeBag.finishGlobalStage("Finished");
+
+            StringBuilder timingsToLog = new StringBuilder();
+
+            timingsToLog.append("TxRecovery Status and Timings [txs=").append(preparedTxCnt.get());
+
+            for (String stageTiming : timeBag.stagesTimings())
+                timingsToLog.append(", ").append(stageTiming);
+
+            timingsToLog.append("]");
+
+            log.info(timingsToLog.toString());
+        }
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java
index 8795f64..1f5ec6c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -58,6 +59,9 @@ public abstract class GridExchangeFreeCellularSwitchAbstractTest extends GridCom
     /** Replicated cache name. */
     protected static final String REPL_CACHE_NAME = "replicated";
 
+    /** */
+    protected final ListeningTestLogger listeningLog = new ListeningTestLogger(log);
+
     /**
      * {@inheritDoc}
      */
@@ -82,6 +86,8 @@ public abstract class GridExchangeFreeCellularSwitchAbstractTest extends GridCom
 
         cfg.setDataStorageConfiguration(dsCfg);
 
+        cfg.setGridLogger(listeningLog);
+
         return cfg;
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java
index e45399f..42a4fb2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java
@@ -37,11 +37,15 @@ import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.transactions.Transaction;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.testframework.LogListener.matches;
+
 /**
  *
  */
@@ -71,6 +75,14 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
     public void testOnlyAffectedNodesWaitForRecovery() throws Exception {
         int nodes = 6;
 
+        String recoveryStatusMsg = "TxRecovery Status and Timings [txs=";
+
+        LogListener lsnrAny = matches(recoveryStatusMsg).build(); // Any.
+        LogListener lsnrBackup = matches(recoveryStatusMsg).times((nodes / 2) - 1).build(); // Cell 1 (backups).
+        LogListener lsnrNear = matches(recoveryStatusMsg).times((nodes / 2)).build(); // Cell 2 (near).
+
+        listeningLog.registerListener(lsnrAny);
+
         startGridsMultiThreaded(nodes);
 
         blockRecoveryMessages();
@@ -240,6 +252,10 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
             nearNodes, 1,
             vers);
 
+        assertFalse(lsnrAny.check());
+
+        listeningLog.registerListener(lsnrNear);
+
         failed.close(); // Stopping node.
 
         awaitForSwitchOnNodeLeft(failed);
@@ -375,6 +391,10 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
             nearNodes, 0,
             vers);
 
+        assertTrue(waitForCondition(lsnrNear::check, 5000));
+
+        listeningLog.registerListener(lsnrBackup);
+
         // Partitioned recovery.
         for (Ignite ignite : G.allGrids()) {
             TestRecordingCommunicationSpi spi =
@@ -411,6 +431,8 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
             nearNodes, 0,
             vers);
 
+        assertTrue(waitForCondition(lsnrBackup::check, 5000));
+
         for (IgniteInternalFuture<?> fut : futs)
             fut.get();