You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/06/07 13:29:56 UTC
[ignite] branch master updated: IGNITE-14474 Expanded log for
errors with GridDhtPartitionSupplyMessage in GridDhtPartitionDemander.
Fixes #9004
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 48f71eb IGNITE-14474 Expanded log for errors with GridDhtPartitionSupplyMessage in GridDhtPartitionDemander. Fixes #9004
48f71eb is described below
commit 48f71eb1e41e911128301aaed847385fb6278b1f
Author: RodionSmolnikovGG <81...@users.noreply.github.com>
AuthorDate: Mon Jun 7 16:29:02 2021 +0300
IGNITE-14474 Expanded log for errors with GridDhtPartitionSupplyMessage in GridDhtPartitionDemander. Fixes #9004
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../dht/preloader/GridDhtPartitionDemander.java | 27 +++++++++++++++-------
...acheRebalancingUnmarshallingFailedSelfTest.java | 22 ++++++++++++++++--
2 files changed, 39 insertions(+), 10 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index c012b5a..aaad997 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -536,22 +537,31 @@ public class GridDhtPartitionDemander {
log.debug("Received supply message [" + demandRoutineInfo(nodeId, supplyMsg) + ']');
// Check whether there were error during supplying process.
+ Throwable msgExc = null;
+
+ final GridDhtPartitionTopology top = grp.topology();
+
if (supplyMsg.classError() != null)
- errMsg = supplyMsg.classError().getMessage();
+ msgExc = supplyMsg.classError();
else if (supplyMsg.error() != null)
- errMsg = supplyMsg.error().getMessage();
+ msgExc = supplyMsg.error();
- if (errMsg != null) {
- U.warn(log, "Rebalancing routine has failed [" +
- demandRoutineInfo(nodeId, supplyMsg) + ", err=" + errMsg + ']');
+ if (msgExc != null) {
+ GridDhtPartitionMap partMap = top.localPartitionMap();
+
+ Set<Integer> unstableParts = supplyMsg.infos().keySet().stream()
+ .filter(p -> partMap.get(p) == MOVING)
+ .collect(Collectors.toSet());
+
+ U.error(log, "Rebalancing routine has failed, some partitions could be unavailable for reading" +
+ " [" + demandRoutineInfo(nodeId, supplyMsg) +
+ ", unavailablePartitions=" + S.compact(unstableParts) + ']', msgExc);
fut.error(nodeId);
return;
}
- final GridDhtPartitionTopology top = grp.topology();
-
fut.receivedBytes.addAndGet(supplyMsg.messageSize());
if (grp.sharedGroup()) {
@@ -1046,7 +1056,8 @@ public class GridDhtPartitionDemander {
* @param supplyMsg Supply message.
*/
private String demandRoutineInfo(UUID supplier, GridDhtPartitionSupplyMessage supplyMsg) {
- return "grp=" + grp.cacheOrGroupName() + ", topVer=" + supplyMsg.topologyVersion() + ", supplier=" + supplier;
+ return "grp=" + grp.cacheOrGroupName() + ", rebalanceId=" + supplyMsg.rebalanceId() +
+ ", topVer=" + supplyMsg.topologyVersion() + ", supplier=" + supplier;
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
index 567773e..90a6075 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
@@ -32,6 +33,8 @@ import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteThread;
import org.junit.Test;
@@ -41,7 +44,7 @@ import org.junit.Test;
*/
public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonAbstractTest {
/** partitioned cache name. */
- protected static String CACHE = "cache";
+ protected static final String CACHE = "cache";
/** Allows to change behavior of readExternal method. */
protected static AtomicInteger readCnt = new AtomicInteger();
@@ -49,6 +52,13 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
/** */
private volatile Marshaller marshaller;
+ /** */
+ private ListeningTestLogger customLog;
+
+ /** */
+ private static final Pattern UNMARSHALING_ERROR_PATTERN = Pattern.compile(".*Rebalancing routine has failed" +
+ ".*unavailablePartitions=\\[.*].*");
+
/** Test key 1. */
private static class TestKey implements Externalizable {
/** Field. */
@@ -115,6 +125,7 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
iCfg.setCacheConfiguration(cfg);
iCfg.setMarshaller(marshaller);
+ iCfg.setGridLogger(customLog);
return iCfg;
}
@@ -129,7 +140,6 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
runTest();
}
-
/**
* @throws Exception e.
*/
@@ -154,6 +164,12 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
* @throws Exception e.
*/
private void runTest() throws Exception {
+ customLog = new ListeningTestLogger(log);
+
+ LogListener unmarshalErrorLogListener = LogListener.matches(UNMARSHALING_ERROR_PATTERN).atLeast(1).build();
+
+ customLog.registerListener(unmarshalErrorLogListener);
+
assert marshaller != null;
readCnt.set(Integer.MAX_VALUE);
@@ -176,6 +192,8 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
for (int i = 50; i < 100; i++)
assertNull(grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))));
+
+ assertTrue("Unmarshal log error message is not valid.", unmarshalErrorLogListener.check());
}
/** {@inheritDoc} */