You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/06/07 13:29:56 UTC

[ignite] branch master updated: IGNITE-14474 Expanded log for errors with GridDhtPartitionSupplyMessage in GridDhtPartitionDemander. Fixes #9004

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 48f71eb  IGNITE-14474 Expanded log for errors with GridDhtPartitionSupplyMessage in GridDhtPartitionDemander. Fixes #9004
48f71eb is described below

commit 48f71eb1e41e911128301aaed847385fb6278b1f
Author: RodionSmolnikovGG <81...@users.noreply.github.com>
AuthorDate: Mon Jun 7 16:29:02 2021 +0300

    IGNITE-14474 Expanded log for errors with GridDhtPartitionSupplyMessage in GridDhtPartitionDemander. Fixes #9004
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../dht/preloader/GridDhtPartitionDemander.java    | 27 +++++++++++++++-------
 ...acheRebalancingUnmarshallingFailedSelfTest.java | 22 ++++++++++++++++--
 2 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index c012b5a..aaad997 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -536,22 +537,31 @@ public class GridDhtPartitionDemander {
                 log.debug("Received supply message [" + demandRoutineInfo(nodeId, supplyMsg) + ']');
 
             // Check whether there were error during supplying process.
+            Throwable msgExc = null;
+
+            final GridDhtPartitionTopology top = grp.topology();
+
             if (supplyMsg.classError() != null)
-                errMsg = supplyMsg.classError().getMessage();
+                msgExc = supplyMsg.classError();
             else if (supplyMsg.error() != null)
-                errMsg = supplyMsg.error().getMessage();
+                msgExc = supplyMsg.error();
 
-            if (errMsg != null) {
-                U.warn(log, "Rebalancing routine has failed [" +
-                    demandRoutineInfo(nodeId, supplyMsg) + ", err=" + errMsg + ']');
+            if (msgExc != null) {
+                GridDhtPartitionMap partMap = top.localPartitionMap();
+
+                Set<Integer> unstableParts = supplyMsg.infos().keySet().stream()
+                    .filter(p -> partMap.get(p) == MOVING)
+                    .collect(Collectors.toSet());
+
+                U.error(log, "Rebalancing routine has failed, some partitions could be unavailable for reading" +
+                    " [" + demandRoutineInfo(nodeId, supplyMsg) +
+                    ", unavailablePartitions=" + S.compact(unstableParts) + ']', msgExc);
 
                 fut.error(nodeId);
 
                 return;
             }
 
-            final GridDhtPartitionTopology top = grp.topology();
-
             fut.receivedBytes.addAndGet(supplyMsg.messageSize());
 
             if (grp.sharedGroup()) {
@@ -1046,7 +1056,8 @@ public class GridDhtPartitionDemander {
      * @param supplyMsg Supply message.
      */
     private String demandRoutineInfo(UUID supplier, GridDhtPartitionSupplyMessage supplyMsg) {
-        return "grp=" + grp.cacheOrGroupName() + ", topVer=" + supplyMsg.topologyVersion() + ", supplier=" + supplier;
+        return "grp=" + grp.cacheOrGroupName() + ", rebalanceId=" + supplyMsg.rebalanceId() +
+            ", topVer=" + supplyMsg.topologyVersion() + ", supplier=" + supplier;
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
index 567773e..90a6075 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
@@ -32,6 +33,8 @@ import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.thread.IgniteThread;
 import org.junit.Test;
@@ -41,7 +44,7 @@ import org.junit.Test;
  */
 public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonAbstractTest {
     /** partitioned cache name. */
-    protected static String CACHE = "cache";
+    protected static final String CACHE = "cache";
 
     /** Allows to change behavior of readExternal method. */
     protected static AtomicInteger readCnt = new AtomicInteger();
@@ -49,6 +52,13 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
     /** */
     private volatile Marshaller marshaller;
 
+    /** */
+    private ListeningTestLogger customLog;
+
+    /** */
+    private static final Pattern UNMARSHALING_ERROR_PATTERN = Pattern.compile(".*Rebalancing routine has failed" +
+        ".*unavailablePartitions=\\[.*].*");
+
     /** Test key 1. */
     private static class TestKey implements Externalizable {
         /** Field. */
@@ -115,6 +125,7 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
 
         iCfg.setCacheConfiguration(cfg);
         iCfg.setMarshaller(marshaller);
+        iCfg.setGridLogger(customLog);
 
         return iCfg;
     }
@@ -129,7 +140,6 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
         runTest();
     }
 
-
     /**
      * @throws Exception e.
      */
@@ -154,6 +164,12 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
      * @throws Exception e.
      */
     private void runTest() throws Exception {
+        customLog = new ListeningTestLogger(log);
+
+        LogListener unmarshalErrorLogListener = LogListener.matches(UNMARSHALING_ERROR_PATTERN).atLeast(1).build();
+
+        customLog.registerListener(unmarshalErrorLogListener);
+
         assert marshaller != null;
 
         readCnt.set(Integer.MAX_VALUE);
@@ -176,6 +192,8 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
 
         for (int i = 50; i < 100; i++)
             assertNull(grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))));
+
+        assertTrue("Unmarshal log error message is not valid.", unmarshalErrorLogListener.check());
     }
 
     /** {@inheritDoc} */