You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/12/28 21:32:02 UTC

[ignite] branch master updated: IGNITE-13917 Fixed dumpLongRunningOperations invocation before exchange manager was fully started. Fixes #8619

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 10ad34b  IGNITE-13917 Fixed dumpLongRunningOperations invocation before exchange manager was fully started. Fixes #8619
10ad34b is described below

commit 10ad34bee552bb554ff91828611b5b22f9755e7f
Author: sergeyuttsel <ut...@gmail.com>
AuthorDate: Tue Dec 29 00:31:07 2020 +0300

    IGNITE-13917 Fixed dumpLongRunningOperations invocation before exchange manager was fully started. Fixes #8619
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../cache/GridCachePartitionExchangeManager.java   |  15 +-
 ...dCachePartitionExchangeManagerWarningsTest.java | 216 +++++++++++++++------
 2 files changed, 168 insertions(+), 63 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2a5cdfb..850231a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -191,6 +192,9 @@ import static org.apache.ignite.internal.processors.tracing.SpanType.EXCHANGE_FU
  * Partition exchange manager.
  */
 public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
+    /** Prefix of error message for dumping long running operations. */
+    public static final String FAILED_DUMP_MSG = "Failed to dump debug information: ";
+
     /** @see IgniteSystemProperties#IGNITE_EXCHANGE_HISTORY_SIZE */
     public static final int DFLT_EXCHANGE_HISTORY_SIZE = 1_000;
 
@@ -309,6 +313,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** */
     private final ReentrantLock dumpLongRunningOpsLock = new ReentrantLock();
 
+    /** Latch that is used to guarantee that this manager fully started and all variables initialized. */
+    private final CountDownLatch startLatch = new CountDownLatch(1);
+
     /** Discovery listener. */
     private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
         @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -532,6 +539,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         rebalanced = clusterReg.booleanMetric(REBALANCED,
             "True if the cluster has achieved fully rebalanced state. Note that an inactive cluster always has" +
             " this metric in False regardless of the real partitions state.");
+
+        startLatch.countDown();
     }
 
     /**
@@ -2427,6 +2436,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             if (!dumpLongRunningOpsLock.tryLock())
                 return;
 
+            startLatch.await();
+
             try {
                 if (U.currentTimeMillis() < nextLongRunningOpsDumpTime)
                     return;
@@ -2459,7 +2470,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
         }
         catch (Exception e) {
-            U.error(diagnosticLog, "Failed to dump debug information: " + e, e);
+            U.error(diagnosticLog, FAILED_DUMP_MSG + e, e);
         }
     }
 
@@ -3412,7 +3423,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                         dumpDebugInfo(exchFut);
                                     }
                                     catch (Exception e) {
-                                        U.error(diagnosticLog, "Failed to dump debug information: " + e, e);
+                                        U.error(diagnosticLog, FAILED_DUMP_MSG + e, e);
                                     }
 
                                     nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, dumpTimeout);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java
index 8f2560d..7c5b4b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java
@@ -17,31 +17,39 @@
 
 package org.apache.ignite.internal;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lifecycle.LifecycleBean;
+import org.apache.ignite.lifecycle.LifecycleEventType;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -50,17 +58,25 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 
 /**
  * Test exchange manager warnings.
  */
 public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+    /** Long running operations timeout. */
+    private static final String LONG_OPERATIONS_DUMP_TIMEOUT = "1000";
 
-    /** */
+    /** Atomic cache name. */
     private static final String CACHE_NAME = "TEST_CACHE";
 
+    /** Transactional cache name. */
+    private static final String TX_CACHE_NAME = "TX_TEST_CACHE";
+
+    /** Lifecycle bean. */
+    private LifecycleBean lifecycleBean;
+
     /** */
     private String oldLongOpsDumpTimeout;
 
@@ -88,36 +104,61 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs
             testLog.clearListeners();
 
         testLog = null;
+
+        lifecycleBean = null;
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCommunicationSpi(new CustomTcpCommunicationSpi());
+
+        if (testLog != null)
+            cfg.setGridLogger(testLog);
+
+        cfg.setLifecycleBeans(lifecycleBean);
+
+        CacheConfiguration atomicCfg = new CacheConfiguration(CACHE_NAME)
+            .setBackups(1)
+            .setAtomicityMode(ATOMIC);
+        CacheConfiguration txCfg = new CacheConfiguration(TX_CACHE_NAME)
+            .setBackups(1)
+            .setAtomicityMode(TRANSACTIONAL);
+
+        cfg.setCacheConfiguration(atomicCfg, txCfg);
+
+        return cfg;
     }
 
     /**
      * @throws Exception If failed.
      */
     @Test
+    @WithSystemProperty(key = IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, value = LONG_OPERATIONS_DUMP_TIMEOUT)
     public void testLongRunningCacheFutures() throws Exception {
-        long timeout = 1000;
-
-        System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, Long.toString(timeout));
+        long timeout = Long.parseLong(LONG_OPERATIONS_DUMP_TIMEOUT);
 
         testLog = new CustomTestLogger(false, log, "future");
 
         int longRunFuturesCnt = 1000;
 
-        try (Ignite srv1 = start("srv-1", false, false)) {
-            try (Ignite srv2 = start("srv-2", false, false)) {
-                try (Ignite client = start("client", true, false)) {
-                    try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME)) {
-                        streamer.allowOverwrite(true);
+        startGrids(2);
 
-                        for (int i = 0; i < longRunFuturesCnt; i++)
-                            streamer.addData(i, i);
-                    }
+        Ignite client = startClientGrid(3);
+        try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME)) {
+            streamer.allowOverwrite(true);
 
-                    Thread.sleep(timeout * 2);
-                }
-            }
+            for (int i = 0; i < longRunFuturesCnt; i++)
+                streamer.addData(i, i);
         }
 
+        doSleep(timeout * 2);
+
+        stopAllGrids();
+
         assertTrue("Warnings were not found", testLog.warningsTotal() > 0);
 
         assertTrue("Too much warnings in the logs: " + testLog.warningsTotal(),
@@ -128,10 +169,9 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs
      * @throws Exception If failed.
      */
     @Test
+    @WithSystemProperty(key = IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, value = LONG_OPERATIONS_DUMP_TIMEOUT)
     public void testLongRunningTransactions() throws Exception {
-        long timeout = 1000;
-
-        System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, Long.toString(timeout));
+        long timeout = Long.parseLong(LONG_OPERATIONS_DUMP_TIMEOUT);
 
         testLog = new CustomTestLogger(false, log, "transaction");
 
@@ -139,25 +179,25 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs
 
         ExecutorService excSvc = Executors.newFixedThreadPool(transactions);
 
-        try (Ignite srv1 = start("srv", false, true)) {
+        try (Ignite srv1 = startGrid(0)) {
             CountDownLatch txStarted = new CountDownLatch(transactions);
 
             CountDownLatch stopTx = new CountDownLatch(1);
 
             for (int i = 0; i < transactions; i++)
-                excSvc.submit(new AsyncTransaction(srv1, CACHE_NAME, i, txStarted, stopTx));
+                excSvc.submit(new AsyncTransaction(srv1, TX_CACHE_NAME, i, txStarted, stopTx));
 
-            if (!txStarted.await(10000, TimeUnit.MILLISECONDS))
+            if (!txStarted.await(10_000, TimeUnit.MILLISECONDS))
                 fail("Unable to start transactions");
 
-            Thread.sleep(timeout * 2);
+            doSleep(timeout * 2);
 
             stopTx.countDown();
         }
         finally {
             excSvc.shutdown();
 
-            if (!excSvc.awaitTermination(10000, TimeUnit.MILLISECONDS))
+            if (!excSvc.awaitTermination(10_000, TimeUnit.MILLISECONDS))
                 fail("Unable to wait for thread pool termination.");
         }
 
@@ -167,42 +207,96 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs
             testLog.warningsTotal() < transactions);
     }
 
-    /**
-     * Start Ignite node.
-     *
-     * @param instanceName Ignite instance name.
-     * @param clientMode Client mode flag.
-     * @param transactional Transactional cache flag.
-     * @throws Exception If failed.
-     */
-    private Ignite start(String instanceName, boolean clientMode, boolean transactional) throws Exception {
-        return Ignition.start(getConfiguration(instanceName, clientMode, transactional));
-    }
+    @Test
+    public void testDumpLongRunningOperationsWaitForFullyInitializedExchangeManager() throws Exception {
+        long waitingTimeout = 5_000;
+
+        PrintStream errStream = System.err;
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch waitLatch = new CountDownLatch(1);
+
+        try {
+            // GridCachePartitionExchangeManager#dumpLongRunningOperations() uses diagnostic log,
+            // which can be non-initialized, and so, error messgaes are logged into standard error output stream.
+            ByteArrayOutputStream testOut = new ByteArrayOutputStream(16 * 1024);
+            System.setErr(new PrintStream(testOut));
+
+            AtomicReference<IgniteInternalFuture<?>> dumpOpsFut = new AtomicReference<>();
+            IgniteInternalFuture<Ignite> startFut = null;
+
+            // Lyficycle bean allows to register DatabaseLifecycleListener and trigger dumpLongRunningOperations
+            // before GridCachePartitionExchangeManager is started.
+            lifecycleBean = new LifecycleBean() {
+                /** Ignite instance. */
+                @IgniteInstanceResource
+                IgniteEx ignite;
+
+                /** {@inheritDoc} */
+                @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteException {
+                    if (evt == LifecycleEventType.BEFORE_NODE_START) {
+                        ignite.context().internalSubscriptionProcessor()
+                            .registerDatabaseListener(new DatabaseLifecycleListener() {
+                                @Override public void onInitDataRegions(
+                                    IgniteCacheDatabaseSharedManager mgr
+                                ) throws IgniteCheckedException {
+                                    dumpOpsFut.set(
+                                        GridTestUtils.runAsync(
+                                            () -> ignite.context().cache().context().exchange().dumpLongRunningOperations(1_000)));
+
+                                    // Let's allow to check that dumpLongRunningOperations is triggered.
+                                    startLatch.countDown();
+
+                                    // Wait for the check
+                                    try {
+                                        if (!waitLatch.await(waitingTimeout * 3, TimeUnit.MILLISECONDS))
+                                            throw new IgniteCheckedException("Failed to wait for a check of dumpLongRunningOperations");
+                                    }
+                                    catch (InterruptedException e) {
+                                        throw new IgniteCheckedException(e);
+                                    }
+                                }
+                            });
+                    }
+                }
+            };
 
-    /**
-     * Create Ignite configuration.
-     *
-     * @param instanceName Ignite instance name.
-     * @param clientMode Client mode flag.
-     * @param transactional Transactional cache flag.
-     * @throws Exception If failed.
-     */
-    private IgniteConfiguration getConfiguration(String instanceName, boolean clientMode, boolean transactional) throws Exception {
-        CacheConfiguration cacheCfg = new CacheConfiguration(CACHE_NAME)
-            .setBackups(1)
-            .setAtomicityMode(transactional ? CacheAtomicityMode.TRANSACTIONAL : CacheAtomicityMode.ATOMIC);
+            startFut = GridTestUtils.runAsync(new Callable<Ignite>() {
+                @Override public Ignite call() throws Exception {
+                    return startGrid(0);
+                }
+            });
 
-        IgniteConfiguration cfg = getConfiguration()
-            .setCacheConfiguration(cacheCfg)
-            .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER))
-            .setCommunicationSpi(new CustomTcpCommunicationSpi())
-            .setIgniteInstanceName(instanceName)
-            .setClientMode(clientMode);
+            assertTrue("Server node did not start in " + waitingTimeout + " ms.",
+                startLatch.await(waitingTimeout, TimeUnit.MILLISECONDS));
 
-        if (testLog != null)
-            cfg.setGridLogger(testLog);
+            // Check that dumpLongRunningOperations did not produce any error.
+            if (GridTestUtils.waitForCondition(() -> dumpOpsFut.get().isDone(), waitingTimeout)) {
+                // Check that error output stream does not contain NullPointerException.
+                String output = testOut.toString();
 
-        return cfg;
+                assertTrue("Unexpected error [err=" + output + ']', output.isEmpty());
+            }
+
+            // Unblock starting the node.
+            waitLatch.countDown();
+
+            assertTrue(
+                "Dumping log running operations is not completed yet.",
+                GridTestUtils.waitForCondition(() -> dumpOpsFut.get().isDone(), waitingTimeout));
+
+            // Check that error output stream does not contain any error.
+            String output = testOut.toString();
+
+            assertTrue("Unexpected error [err=" + output + ']', output.isEmpty());
+
+            startFut.get(waitingTimeout, TimeUnit.MILLISECONDS);
+        }
+        finally {
+            startLatch.countDown();
+            waitLatch.countDown();
+
+            System.setErr(errStream);
+        }
     }
 
     /**