You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/05/07 08:42:25 UTC
[ignite] branch master updated: IGNITE-11626
InitNewCoordinatorFuture should be reported in diagnostic output
This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e5a16f8 IGNITE-11626 InitNewCoordinatorFuture should be reported in diagnostic output
e5a16f8 is described below
commit e5a16f85fab927860b803cdc2e04aab5246a0ac1
Author: mstepachev <ma...@gmail.com>
AuthorDate: Tue May 7 11:41:54 2019 +0300
IGNITE-11626 InitNewCoordinatorFuture should be reported in diagnostic output
Signed-off-by: Ivan Rakov <ir...@apache.org>
---
.../preloader/GridDhtPartitionsExchangeFuture.java | 12 ++-
.../dht/preloader/InitNewCoordinatorFuture.java | 27 +++++-
...LongRunningInitNewCrdFutureDiagnosticsTest.java | 100 +++++++++++++++++++++
.../apache/ignite/testframework/LogListener.java | 18 ++++
.../test/ListeningTestLoggerTest.java | 5 +-
5 files changed, 156 insertions(+), 6 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 33a5a8c..0ec1b7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -4908,14 +4908,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** {@inheritDoc} */
@Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext diagCtx) {
if (!isDone()) {
- ClusterNode crd;
- Set<UUID> remaining;
+ final ClusterNode crd;
+ final Set<UUID> remaining;
+ final InitNewCoordinatorFuture newCrdFut;
synchronized (mux) {
crd = this.crd;
remaining = new HashSet<>(this.remaining);
+ newCrdFut = this.newCrdFut;
}
+ if(newCrdFut != null)
+ newCrdFut.addDiagnosticRequest(diagCtx);
+
if (crd != null) {
if (!crd.isLocal()) {
diagCtx.exchangeInfo(crd.id(), initialVersion(), "Exchange future waiting for coordinator " +
@@ -4938,7 +4943,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return "GridDhtPartitionsExchangeFuture [topVer=" + initialVersion() +
", evt=" + (firstDiscoEvt != null ? IgniteUtils.gridEventName(firstDiscoEvt.type()) : -1) +
", evtNode=" + (firstDiscoEvt != null ? firstDiscoEvt.eventNode() : null) +
- ", done=" + isDone() + ']';
+ ", done=" + isDone() +
+ ", newCrdFut=" + this.newCrdFut + ']';
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index 5909a05..06e33ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -29,6 +29,8 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
+import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
@@ -47,7 +49,10 @@ import static org.apache.ignite.internal.processors.cache.GridCachePartitionExch
/**
*
*/
-public class InitNewCoordinatorFuture extends GridCompoundFuture {
+public class InitNewCoordinatorFuture extends GridCompoundFuture implements IgniteDiagnosticAware {
+ /** */
+ private final ClusterNode locNode;
+
/** */
private GridDhtPartitionsFullMessage fullMsg;
@@ -80,6 +85,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
*/
InitNewCoordinatorFuture(GridCacheSharedContext cctx) {
this.log = cctx.logger(getClass());
+ this.locNode = cctx.localNode();
}
/**
@@ -346,4 +352,23 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
if (done)
restoreStateFut.onDone();
}
+
+ /** {@inheritDoc} */
+ @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext diagCtx) {
+ if (!isDone()) {
+ synchronized (this) {
+ diagCtx.exchangeInfo(locNode.id(), initTopVer, "InitNewCoordinatorFuture waiting for " +
+ "GridDhtPartitionsSingleMessages from nodes=" + awaited);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "InitNewCoordinatorFuture [" +
+ "initTopVer=" + initTopVer +
+ ", awaited=" + awaited +
+ ", joinedNodes=" + joinedNodes +
+ ']';
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridLongRunningInitNewCrdFutureDiagnosticsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridLongRunningInitNewCrdFutureDiagnosticsTest.java
new file mode 100644
index 0000000..a08f529
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridLongRunningInitNewCrdFutureDiagnosticsTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.GridAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test class for diagnostics of long running {@link InitNewCoordinatorFuture}.
+ */
+public class GridLongRunningInitNewCrdFutureDiagnosticsTest extends GridCommonAbstractTest {
+ /** Node with diagnostic logger. */
+ private static final int NODE_WITH_DIAGNOSTIC_LOG = 2;
+
+ /** Test logger. */
+ private final ListeningTestLogger log = new ListeningTestLogger(false, GridAbstractTest.log);
+
+ /** Test recording communication spi. */
+ private TestRecordingCommunicationSpi testRecordingCommSpi;
+
+ /** */
+ private static final String DIAGNOSTIC_MESSAGE = "InitNewCoordinatorFuture waiting for GridDhtPartitionsSingleMessages from nodes=";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ testRecordingCommSpi = new TestRecordingCommunicationSpi();
+
+ return super.getConfiguration(igniteInstanceName)
+ .setGridLogger(log)
+ .setConsistentId(igniteInstanceName)
+ .setCommunicationSpi(testRecordingCommSpi);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ log.clearListeners();
+
+ stopAllGrids();
+ }
+
+ /** */
+ @Test
+ public void receiveDiagnosticLogForLongRunningFuture() throws Exception {
+ LogListener lsnr = expectLogEvent(DIAGNOSTIC_MESSAGE, 1);
+
+ startGrid(0);
+ startGrid(1);
+ startGrid(NODE_WITH_DIAGNOSTIC_LOG);
+ startGrid(3);
+
+ testRecordingCommSpi.blockMessages(GridDhtPartitionsSingleMessage.class, getTestIgniteInstanceName(1));
+ testRecordingCommSpi.blockMessages(GridDhtPartitionsSingleMessage.class, getTestIgniteInstanceName(NODE_WITH_DIAGNOSTIC_LOG));
+ testRecordingCommSpi.blockMessages(GridDhtPartitionsSingleMessage.class, getTestIgniteInstanceName(3));
+
+ stopGrid(0); // It makes PME future on node#1
+ stopGrid(1); // It makes new crd init future inside PME future on node#2
+
+ assertTrue(lsnr.check(getTestTimeout()));
+ }
+
+ /**
+ * @param evtMsg Event message.
+ * @param cnt Number of expected events.
+ */
+ private LogListener expectLogEvent(String evtMsg, int cnt) {
+ LogListener lsnr = LogListener.matches(s -> s.startsWith(evtMsg)).atLeast(cnt).build();
+
+ log.registerListener(lsnr);
+
+ return lsnr;
+ }
+
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java b/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
index 0aca805..45845c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
@@ -70,6 +70,24 @@ public abstract class LogListener implements Consumer<String> {
public abstract boolean check();
/**
+ * Checks that all conditions are met with timeout.
+ *
+ * @return {@code True} if all conditions are met.
+ */
+ public boolean check(long millis) throws InterruptedException {
+ long startTime = System.currentTimeMillis();
+
+ while (startTime + millis >= System.currentTimeMillis()) {
+ if (check())
+ return true;
+
+ Thread.sleep(1000);
+ }
+
+ return check();
+ }
+
+ /**
* Reset listener state.
*/
public abstract void reset();
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/test/ListeningTestLoggerTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/test/ListeningTestLoggerTest.java
index 0be0a00..d7b2d89 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/test/ListeningTestLoggerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/test/ListeningTestLoggerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.testframework.test;
+import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@@ -171,7 +172,7 @@ public class ListeningTestLoggerTest extends GridCommonAbstractTest {
log.info("Ignored message.");
log.info("Target message.");
- assertThrowsWithCause(lsnr::check, AssertionError.class);
+ assertThrowsWithCause((Callable<Object>)lsnr::check, AssertionError.class);
// Check custom exception.
LogListener lsnr2 = LogListener.matches(msg -> {
@@ -183,7 +184,7 @@ public class ListeningTestLoggerTest extends GridCommonAbstractTest {
log.info("1");
log.info("2");
- assertThrowsWithCause(lsnr2::check, IllegalStateException.class);
+ assertThrowsWithCause((Callable<Object>)lsnr2::check, IllegalStateException.class);
}
/**