You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/05/07 08:42:25 UTC

[ignite] branch master updated: IGNITE-11626 InitNewCoordinatorFuture should be reported in diagnostic output

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e5a16f8  IGNITE-11626 InitNewCoordinatorFuture should be reported in diagnostic output
e5a16f8 is described below

commit e5a16f85fab927860b803cdc2e04aab5246a0ac1
Author: mstepachev <ma...@gmail.com>
AuthorDate: Tue May 7 11:41:54 2019 +0300

    IGNITE-11626 InitNewCoordinatorFuture should be reported in diagnostic output
    
    Signed-off-by: Ivan Rakov <ir...@apache.org>
---
 .../preloader/GridDhtPartitionsExchangeFuture.java |  12 ++-
 .../dht/preloader/InitNewCoordinatorFuture.java    |  27 +++++-
 ...LongRunningInitNewCrdFutureDiagnosticsTest.java | 100 +++++++++++++++++++++
 .../apache/ignite/testframework/LogListener.java   |  18 ++++
 .../test/ListeningTestLoggerTest.java              |   5 +-
 5 files changed, 156 insertions(+), 6 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 33a5a8c..0ec1b7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -4908,14 +4908,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** {@inheritDoc} */
     @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext diagCtx) {
         if (!isDone()) {
-            ClusterNode crd;
-            Set<UUID> remaining;
+            final ClusterNode crd;
+            final Set<UUID> remaining;
+            final InitNewCoordinatorFuture newCrdFut;
 
             synchronized (mux) {
                 crd = this.crd;
                 remaining = new HashSet<>(this.remaining);
+                newCrdFut = this.newCrdFut;
             }
 
+            if(newCrdFut != null)
+                newCrdFut.addDiagnosticRequest(diagCtx);
+
             if (crd != null) {
                 if (!crd.isLocal()) {
                     diagCtx.exchangeInfo(crd.id(), initialVersion(), "Exchange future waiting for coordinator " +
@@ -4938,7 +4943,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         return "GridDhtPartitionsExchangeFuture [topVer=" + initialVersion() +
             ", evt=" + (firstDiscoEvt != null ? IgniteUtils.gridEventName(firstDiscoEvt.type()) : -1) +
             ", evtNode=" + (firstDiscoEvt != null ? firstDiscoEvt.eventNode() : null) +
-            ", done=" + isDone() + ']';
+            ", done=" + isDone() +
+            ", newCrdFut=" + this.newCrdFut + ']';
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index 5909a05..06e33ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -29,6 +29,8 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
+import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
@@ -47,7 +49,10 @@ import static org.apache.ignite.internal.processors.cache.GridCachePartitionExch
 /**
  *
  */
-public class InitNewCoordinatorFuture extends GridCompoundFuture {
+public class InitNewCoordinatorFuture extends GridCompoundFuture implements IgniteDiagnosticAware {
+    /** */
+    private final ClusterNode locNode;
+
     /** */
     private GridDhtPartitionsFullMessage fullMsg;
 
@@ -80,6 +85,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
      */
     InitNewCoordinatorFuture(GridCacheSharedContext cctx) {
         this.log = cctx.logger(getClass());
+        this.locNode = cctx.localNode();
     }
 
     /**
@@ -346,4 +352,23 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
         if (done)
             restoreStateFut.onDone();
     }
+
+    /** {@inheritDoc} */
+    @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext diagCtx) {
+        if (!isDone()) {
+            synchronized (this) {
+                diagCtx.exchangeInfo(locNode.id(), initTopVer, "InitNewCoordinatorFuture waiting for " +
+                    "GridDhtPartitionsSingleMessages from nodes=" + awaited);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "InitNewCoordinatorFuture [" +
+            "initTopVer=" + initTopVer +
+            ", awaited=" + awaited +
+            ", joinedNodes=" + joinedNodes +
+            ']';
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridLongRunningInitNewCrdFutureDiagnosticsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridLongRunningInitNewCrdFutureDiagnosticsTest.java
new file mode 100644
index 0000000..a08f529
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridLongRunningInitNewCrdFutureDiagnosticsTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.GridAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test class for diagnostics of long running {@link InitNewCoordinatorFuture}.
+ */
+public class GridLongRunningInitNewCrdFutureDiagnosticsTest extends GridCommonAbstractTest  {
+    /** Node with diagnostic logger. */
+    private static final int NODE_WITH_DIAGNOSTIC_LOG = 2;
+
+    /** Test logger. */
+    private final ListeningTestLogger log = new ListeningTestLogger(false, GridAbstractTest.log);
+
+    /** Test recording communication spi. */
+    private TestRecordingCommunicationSpi testRecordingCommSpi;
+
+    /** */
+    private static final String DIAGNOSTIC_MESSAGE = "InitNewCoordinatorFuture waiting for GridDhtPartitionsSingleMessages from nodes=";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        testRecordingCommSpi = new TestRecordingCommunicationSpi();
+
+        return super.getConfiguration(igniteInstanceName)
+            .setGridLogger(log)
+            .setConsistentId(igniteInstanceName)
+            .setCommunicationSpi(testRecordingCommSpi);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        log.clearListeners();
+
+        stopAllGrids();
+    }
+
+    /** */
+    @Test
+    public void receiveDiagnosticLogForLongRunningFuture() throws Exception {
+        LogListener lsnr = expectLogEvent(DIAGNOSTIC_MESSAGE, 1);
+
+        startGrid(0);
+        startGrid(1);
+        startGrid(NODE_WITH_DIAGNOSTIC_LOG);
+        startGrid(3);
+
+        testRecordingCommSpi.blockMessages(GridDhtPartitionsSingleMessage.class, getTestIgniteInstanceName(1));
+        testRecordingCommSpi.blockMessages(GridDhtPartitionsSingleMessage.class, getTestIgniteInstanceName(NODE_WITH_DIAGNOSTIC_LOG));
+        testRecordingCommSpi.blockMessages(GridDhtPartitionsSingleMessage.class, getTestIgniteInstanceName(3));
+
+        stopGrid(0); // It makes PME future on node#1
+        stopGrid(1); // It makes new crd init future inside PME future on node#2
+
+        assertTrue(lsnr.check(getTestTimeout()));
+    }
+
+    /**
+     * @param evtMsg Event message.
+     * @param cnt Number of expected events.
+     */
+    private LogListener expectLogEvent(String evtMsg, int cnt) {
+        LogListener lsnr = LogListener.matches(s -> s.startsWith(evtMsg)).atLeast(cnt).build();
+
+        log.registerListener(lsnr);
+
+        return lsnr;
+    }
+
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java b/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
index 0aca805..45845c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/LogListener.java
@@ -70,6 +70,24 @@ public abstract class LogListener implements Consumer<String> {
     public abstract boolean check();
 
     /**
+     * Checks that all conditions are met with timeout.
+     *
+     * @return {@code True} if all conditions are met.
+     */
+    public boolean check(long millis) throws InterruptedException {
+        long startTime = System.currentTimeMillis();
+
+        while (startTime + millis >= System.currentTimeMillis()) {
+            if (check())
+                return true;
+
+            Thread.sleep(1000);
+        }
+
+        return check();
+    }
+
+    /**
      * Reset listener state.
      */
     public abstract void reset();
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/test/ListeningTestLoggerTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/test/ListeningTestLoggerTest.java
index 0be0a00..d7b2d89 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/test/ListeningTestLoggerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/test/ListeningTestLoggerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.testframework.test;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
@@ -171,7 +172,7 @@ public class ListeningTestLoggerTest extends GridCommonAbstractTest {
         log.info("Ignored message.");
         log.info("Target message.");
 
-        assertThrowsWithCause(lsnr::check, AssertionError.class);
+        assertThrowsWithCause((Callable<Object>)lsnr::check, AssertionError.class);
 
         // Check custom exception.
         LogListener lsnr2 = LogListener.matches(msg -> {
@@ -183,7 +184,7 @@ public class ListeningTestLoggerTest extends GridCommonAbstractTest {
         log.info("1");
         log.info("2");
 
-        assertThrowsWithCause(lsnr2::check, IllegalStateException.class);
+        assertThrowsWithCause((Callable<Object>)lsnr2::check, IllegalStateException.class);
     }
 
     /**