You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/12/28 21:32:02 UTC
[ignite] branch master updated: IGNITE-13917 Fixed
dumpLongRunningOperations invocation before exchange manager was fully
started. Fixes #8619
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 10ad34b IGNITE-13917 Fixed dumpLongRunningOperations invocation before exchange manager was fully started. Fixes #8619
10ad34b is described below
commit 10ad34bee552bb554ff91828611b5b22f9755e7f
Author: sergeyuttsel <ut...@gmail.com>
AuthorDate: Tue Dec 29 00:31:07 2020 +0300
IGNITE-13917 Fixed dumpLongRunningOperations invocation before exchange manager was fully started. Fixes #8619
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../cache/GridCachePartitionExchangeManager.java | 15 +-
...dCachePartitionExchangeManagerWarningsTest.java | 216 +++++++++++++++------
2 files changed, 168 insertions(+), 63 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2a5cdfb..850231a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -191,6 +192,9 @@ import static org.apache.ignite.internal.processors.tracing.SpanType.EXCHANGE_FU
* Partition exchange manager.
*/
public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
+ /** Prefix of error message for dumping long running operations. */
+ public static final String FAILED_DUMP_MSG = "Failed to dump debug information: ";
+
/** @see IgniteSystemProperties#IGNITE_EXCHANGE_HISTORY_SIZE */
public static final int DFLT_EXCHANGE_HISTORY_SIZE = 1_000;
@@ -309,6 +313,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private final ReentrantLock dumpLongRunningOpsLock = new ReentrantLock();
+ /** Latch that is used to guarantee that this manager fully started and all variables initialized. */
+ private final CountDownLatch startLatch = new CountDownLatch(1);
+
/** Discovery listener. */
private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
@Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -532,6 +539,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
rebalanced = clusterReg.booleanMetric(REBALANCED,
"True if the cluster has achieved fully rebalanced state. Note that an inactive cluster always has" +
" this metric in False regardless of the real partitions state.");
+
+ startLatch.countDown();
}
/**
@@ -2427,6 +2436,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (!dumpLongRunningOpsLock.tryLock())
return;
+ startLatch.await();
+
try {
if (U.currentTimeMillis() < nextLongRunningOpsDumpTime)
return;
@@ -2459,7 +2470,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
catch (Exception e) {
- U.error(diagnosticLog, "Failed to dump debug information: " + e, e);
+ U.error(diagnosticLog, FAILED_DUMP_MSG + e, e);
}
}
@@ -3412,7 +3423,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
dumpDebugInfo(exchFut);
}
catch (Exception e) {
- U.error(diagnosticLog, "Failed to dump debug information: " + e, e);
+ U.error(diagnosticLog, FAILED_DUMP_MSG + e, e);
}
nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, dumpTimeout);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java
index 8f2560d..7c5b4b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCachePartitionExchangeManagerWarningsTest.java
@@ -17,31 +17,39 @@
package org.apache.ignite.internal;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lifecycle.LifecycleBean;
+import org.apache.ignite.lifecycle.LifecycleEventType;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -50,17 +58,25 @@ import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
/**
* Test exchange manager warnings.
*/
public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbstractTest {
- /** */
- private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+ /** Long running operations timeout. */
+ private static final String LONG_OPERATIONS_DUMP_TIMEOUT = "1000";
- /** */
+ /** Atomic cache name. */
private static final String CACHE_NAME = "TEST_CACHE";
+ /** Transactional cache name. */
+ private static final String TX_CACHE_NAME = "TX_TEST_CACHE";
+
+ /** Lifecycle bean. */
+ private LifecycleBean lifecycleBean;
+
/** */
private String oldLongOpsDumpTimeout;
@@ -88,36 +104,61 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs
testLog.clearListeners();
testLog = null;
+
+ lifecycleBean = null;
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCommunicationSpi(new CustomTcpCommunicationSpi());
+
+ if (testLog != null)
+ cfg.setGridLogger(testLog);
+
+ cfg.setLifecycleBeans(lifecycleBean);
+
+ CacheConfiguration atomicCfg = new CacheConfiguration(CACHE_NAME)
+ .setBackups(1)
+ .setAtomicityMode(ATOMIC);
+ CacheConfiguration txCfg = new CacheConfiguration(TX_CACHE_NAME)
+ .setBackups(1)
+ .setAtomicityMode(TRANSACTIONAL);
+
+ cfg.setCacheConfiguration(atomicCfg, txCfg);
+
+ return cfg;
}
/**
* @throws Exception If failed.
*/
@Test
+ @WithSystemProperty(key = IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, value = LONG_OPERATIONS_DUMP_TIMEOUT)
public void testLongRunningCacheFutures() throws Exception {
- long timeout = 1000;
-
- System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, Long.toString(timeout));
+ long timeout = Long.parseLong(LONG_OPERATIONS_DUMP_TIMEOUT);
testLog = new CustomTestLogger(false, log, "future");
int longRunFuturesCnt = 1000;
- try (Ignite srv1 = start("srv-1", false, false)) {
- try (Ignite srv2 = start("srv-2", false, false)) {
- try (Ignite client = start("client", true, false)) {
- try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME)) {
- streamer.allowOverwrite(true);
+ startGrids(2);
- for (int i = 0; i < longRunFuturesCnt; i++)
- streamer.addData(i, i);
- }
+ Ignite client = startClientGrid(3);
+ try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME)) {
+ streamer.allowOverwrite(true);
- Thread.sleep(timeout * 2);
- }
- }
+ for (int i = 0; i < longRunFuturesCnt; i++)
+ streamer.addData(i, i);
}
+ doSleep(timeout * 2);
+
+ stopAllGrids();
+
assertTrue("Warnings were not found", testLog.warningsTotal() > 0);
assertTrue("Too much warnings in the logs: " + testLog.warningsTotal(),
@@ -128,10 +169,9 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs
* @throws Exception If failed.
*/
@Test
+ @WithSystemProperty(key = IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, value = LONG_OPERATIONS_DUMP_TIMEOUT)
public void testLongRunningTransactions() throws Exception {
- long timeout = 1000;
-
- System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, Long.toString(timeout));
+ long timeout = Long.parseLong(LONG_OPERATIONS_DUMP_TIMEOUT);
testLog = new CustomTestLogger(false, log, "transaction");
@@ -139,25 +179,25 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs
ExecutorService excSvc = Executors.newFixedThreadPool(transactions);
- try (Ignite srv1 = start("srv", false, true)) {
+ try (Ignite srv1 = startGrid(0)) {
CountDownLatch txStarted = new CountDownLatch(transactions);
CountDownLatch stopTx = new CountDownLatch(1);
for (int i = 0; i < transactions; i++)
- excSvc.submit(new AsyncTransaction(srv1, CACHE_NAME, i, txStarted, stopTx));
+ excSvc.submit(new AsyncTransaction(srv1, TX_CACHE_NAME, i, txStarted, stopTx));
- if (!txStarted.await(10000, TimeUnit.MILLISECONDS))
+ if (!txStarted.await(10_000, TimeUnit.MILLISECONDS))
fail("Unable to start transactions");
- Thread.sleep(timeout * 2);
+ doSleep(timeout * 2);
stopTx.countDown();
}
finally {
excSvc.shutdown();
- if (!excSvc.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ if (!excSvc.awaitTermination(10_000, TimeUnit.MILLISECONDS))
fail("Unable to wait for thread pool termination.");
}
@@ -167,42 +207,96 @@ public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbs
testLog.warningsTotal() < transactions);
}
- /**
- * Start Ignite node.
- *
- * @param instanceName Ignite instance name.
- * @param clientMode Client mode flag.
- * @param transactional Transactional cache flag.
- * @throws Exception If failed.
- */
- private Ignite start(String instanceName, boolean clientMode, boolean transactional) throws Exception {
- return Ignition.start(getConfiguration(instanceName, clientMode, transactional));
- }
+ @Test
+ public void testDumpLongRunningOperationsWaitForFullyInitializedExchangeManager() throws Exception {
+ long waitingTimeout = 5_000;
+
+ PrintStream errStream = System.err;
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch waitLatch = new CountDownLatch(1);
+
+ try {
+ // GridCachePartitionExchangeManager#dumpLongRunningOperations() uses diagnostic log,
+ // which can be non-initialized, and so, error messgaes are logged into standard error output stream.
+ ByteArrayOutputStream testOut = new ByteArrayOutputStream(16 * 1024);
+ System.setErr(new PrintStream(testOut));
+
+ AtomicReference<IgniteInternalFuture<?>> dumpOpsFut = new AtomicReference<>();
+ IgniteInternalFuture<Ignite> startFut = null;
+
+ // Lyficycle bean allows to register DatabaseLifecycleListener and trigger dumpLongRunningOperations
+ // before GridCachePartitionExchangeManager is started.
+ lifecycleBean = new LifecycleBean() {
+ /** Ignite instance. */
+ @IgniteInstanceResource
+ IgniteEx ignite;
+
+ /** {@inheritDoc} */
+ @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteException {
+ if (evt == LifecycleEventType.BEFORE_NODE_START) {
+ ignite.context().internalSubscriptionProcessor()
+ .registerDatabaseListener(new DatabaseLifecycleListener() {
+ @Override public void onInitDataRegions(
+ IgniteCacheDatabaseSharedManager mgr
+ ) throws IgniteCheckedException {
+ dumpOpsFut.set(
+ GridTestUtils.runAsync(
+ () -> ignite.context().cache().context().exchange().dumpLongRunningOperations(1_000)));
+
+ // Let's allow to check that dumpLongRunningOperations is triggered.
+ startLatch.countDown();
+
+ // Wait for the check
+ try {
+ if (!waitLatch.await(waitingTimeout * 3, TimeUnit.MILLISECONDS))
+ throw new IgniteCheckedException("Failed to wait for a check of dumpLongRunningOperations");
+ }
+ catch (InterruptedException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+ });
+ }
+ }
+ };
- /**
- * Create Ignite configuration.
- *
- * @param instanceName Ignite instance name.
- * @param clientMode Client mode flag.
- * @param transactional Transactional cache flag.
- * @throws Exception If failed.
- */
- private IgniteConfiguration getConfiguration(String instanceName, boolean clientMode, boolean transactional) throws Exception {
- CacheConfiguration cacheCfg = new CacheConfiguration(CACHE_NAME)
- .setBackups(1)
- .setAtomicityMode(transactional ? CacheAtomicityMode.TRANSACTIONAL : CacheAtomicityMode.ATOMIC);
+ startFut = GridTestUtils.runAsync(new Callable<Ignite>() {
+ @Override public Ignite call() throws Exception {
+ return startGrid(0);
+ }
+ });
- IgniteConfiguration cfg = getConfiguration()
- .setCacheConfiguration(cacheCfg)
- .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER))
- .setCommunicationSpi(new CustomTcpCommunicationSpi())
- .setIgniteInstanceName(instanceName)
- .setClientMode(clientMode);
+ assertTrue("Server node did not start in " + waitingTimeout + " ms.",
+ startLatch.await(waitingTimeout, TimeUnit.MILLISECONDS));
- if (testLog != null)
- cfg.setGridLogger(testLog);
+ // Check that dumpLongRunningOperations did not produce any error.
+ if (GridTestUtils.waitForCondition(() -> dumpOpsFut.get().isDone(), waitingTimeout)) {
+ // Check that error output stream does not contain NullPointerException.
+ String output = testOut.toString();
- return cfg;
+ assertTrue("Unexpected error [err=" + output + ']', output.isEmpty());
+ }
+
+ // Unblock starting the node.
+ waitLatch.countDown();
+
+ assertTrue(
+ "Dumping log running operations is not completed yet.",
+ GridTestUtils.waitForCondition(() -> dumpOpsFut.get().isDone(), waitingTimeout));
+
+ // Check that error output stream does not contain any error.
+ String output = testOut.toString();
+
+ assertTrue("Unexpected error [err=" + output + ']', output.isEmpty());
+
+ startFut.get(waitingTimeout, TimeUnit.MILLISECONDS);
+ }
+ finally {
+ startLatch.countDown();
+ waitLatch.countDown();
+
+ System.setErr(errStream);
+ }
}
/**