You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2019/04/24 09:56:28 UTC
[ignite] branch master updated: IGNITE-11767 Clean up
GridDhtPartitionsFullMessage when not needed,
do not hold decompressed partsSizes in field.
This is an automated email from the ASF dual-hosted git repository.
ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 478277e IGNITE-11767 Clean up GridDhtPartitionsFullMessage when not needed, do not hold decompressed partsSizes in field.
478277e is described below
commit 478277e5e3fe1a535ea905f8beab42926453825a
Author: Ilya Kasnacheev <il...@gmail.com>
AuthorDate: Wed Apr 24 12:50:13 2019 +0300
IGNITE-11767 Clean up GridDhtPartitionsFullMessage when not needed, do not hold decompressed partsSizes in field.
Since uncompression of partsSizes is de-parallelized, PME might be slightly slower while consuming way less Heap.
Fixes #6476.
Signed-off-by: Ilya Kasnacheev <il...@gmail.com>
---
.../cache/GridCachePartitionExchangeManager.java | 21 ++++-
.../GridDhtPartitionsAbstractMessage.java | 4 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 14 ++-
.../preloader/GridDhtPartitionsFullMessage.java | 99 ++++++++++------------
.../preloader/GridDhtPartitionsSingleMessage.java | 15 +---
.../cache/CacheGroupsMetricsRebalanceTest.java | 4 +-
6 files changed, 81 insertions(+), 76 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 774c74a..199f725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1343,10 +1343,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
final GridDhtPartitionsFullMessage m =
new GridDhtPartitionsFullMessage(exchId, lastVer, ver, partHistSuppliers, partsToReload);
- m.compress(compress);
+ m.compressed(compress);
final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
+ Map<Integer, Map<Integer, Long>> partsSizes = new HashMap<>();
+
for (CacheGroupContext grp : grps) {
if (!grp.isLocal()) {
if (exchId != null) {
@@ -1363,7 +1365,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (locMap != null)
addFullPartitionsMap(m, dupData, compress, grp.groupId(), locMap, affCache.similarAffinityKey());
- m.addPartitionSizes(grp.groupId(), grp.topology().globalPartSizes());
+ Map<Integer, Long> partSizesMap = grp.topology().globalPartSizes();
+
+ if (!partSizesMap.isEmpty())
+ partsSizes.put(grp.groupId(), partSizesMap);
if (exchId != null) {
CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters();
@@ -1393,10 +1398,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
else
m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap));
- m.addPartitionSizes(top.groupId(), top.globalPartSizes());
+ Map<Integer, Long> partSizesMap = top.globalPartSizes();
+
+ if (!partSizesMap.isEmpty())
+ partsSizes.put(top.groupId(), partSizesMap);
}
}
+ if (!partsSizes.isEmpty())
+ m.partitionSizes(cctx, partsSizes);
+
return m;
}
@@ -1742,6 +1753,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean updated = false;
+ Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
+
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
@@ -1759,7 +1772,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
entry.getValue(),
null,
msg.partsToReload(cctx.localNodeId(), grpId),
- msg.partitionSizes(grpId),
+ partsSizes.getOrDefault(grpId, Collections.emptyMap()),
msg.topologyVersion());
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index e2884e1..26fcb8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -118,14 +118,14 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
/**
* @return {@code True} if message data is compressed.
*/
- protected final boolean compressed() {
+ public final boolean compressed() {
return (flags & COMPRESSED_FLAG_MASK) != 0;
}
/**
* @param compressed {@code True} if message data is compressed.
*/
- protected final void compressed(boolean compressed) {
+ public final void compressed(boolean compressed) {
flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d1b1f22..33a5a8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2466,6 +2466,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
newCrdFut = null;
exchangeLocE = null;
exchangeGlobalExceptions.clear();
+ if (finishState != null)
+ finishState.cleanUp();
}
/**
@@ -4201,6 +4203,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
try {
+ Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
+
doInParallel(
parallelismLvl,
cctx.kernalContext().getSystemExecutorService(),
@@ -4215,7 +4219,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
msg.partitions().get(grpId),
cntrMap,
msg.partsToReload(cctx.localNodeId(), grpId),
- msg.partitionSizes(grpId),
+ partsSizes.getOrDefault(grpId, Collections.emptyMap()),
null);
}
else {
@@ -5030,6 +5034,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
this.resTopVer = resTopVer;
this.msg = msg;
}
+
+ /**
+ * Cleans up resources to avoid excessive memory usage.
+ */
+ public void cleanUp() {
+ if (msg != null)
+ msg.cleanUp();
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 8290e84..1640e8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
@@ -100,11 +101,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** Serialized partitions that must be cleared and re-loaded. */
private byte[] partsToReloadBytes;
- /** Partitions sizes. */
- @GridToStringInclude
- @GridDirectTransient
- private Map<Integer, Map<Integer, Long>> partsSizes;
-
/** Serialized partitions sizes. */
private byte[] partsSizesBytes;
@@ -120,10 +116,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
private byte[] errsBytes;
/** */
- @GridDirectTransient
- private transient boolean compress;
-
- /** */
private AffinityTopologyVersion resTopVer;
/** */
@@ -179,12 +171,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
cp.partHistSuppliersBytes = partHistSuppliersBytes;
cp.partsToReload = partsToReload;
cp.partsToReloadBytes = partsToReloadBytes;
- cp.partsSizes = partsSizes;
cp.partsSizesBytes = partsSizesBytes;
cp.topVer = topVer;
cp.errs = errs;
cp.errsBytes = errsBytes;
- cp.compress = compress;
cp.resTopVer = resTopVer;
cp.joinedNodeAff = joinedNodeAff;
cp.idealAffDiff = idealAffDiff;
@@ -251,13 +241,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/**
- * @param compress {@code True} if it is possible to use compression for message.
- */
- public void compress(boolean compress) {
- this.compress = compress;
- }
-
- /**
* @return Local partitions.
*/
public Map<Integer, GridDhtPartitionFullMap> partitions() {
@@ -290,7 +273,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
parts.put(grpId, fullMap);
if (dupDataCache != null) {
- assert compress;
+ assert compressed();
assert parts.containsKey(dupDataCache);
if (dupPartsData == null)
@@ -361,32 +344,43 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/**
- * Adds partition sizes map for specified {@code grpId} to the current message.
+ * Supplies partition sizes map for all cache groups.
*
- * @param grpId Group id.
- * @param partSizesMap Partition sizes map.
+ * @param ctx Cache context.
+ * @param partsSizes Partitions sizes map.
*/
- public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) {
- if (partSizesMap.isEmpty())
- return;
+ public void partitionSizes(GridCacheSharedContext ctx, Map<Integer, Map<Integer, Long>> partsSizes) {
+ try {
+ byte[] marshalled = U.marshal(ctx, partsSizes);
- if (partsSizes == null)
- partsSizes = new HashMap<>();
+ if (compressed())
+ marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel());
- partsSizes.put(grpId, partSizesMap);
+ partsSizesBytes = marshalled;
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException(ex);
+ }
}
/**
- * Returns partition sizes map for specified {@code grpId}.
+ * Returns partition sizes map for all cache groups.
*
- * @param grpId Group id.
- * @return Partition sizes map (partId, partSize).
+ * @param ctx Cache context.
+ * @return Partition sizes map (grpId, (partId, partSize)).
*/
- public Map<Integer, Long> partitionSizes(int grpId) {
- if (partsSizes == null)
+ public Map<Integer, Map<Integer, Long>> partitionSizes(GridCacheSharedContext ctx) {
+ if (partsSizesBytes == null)
return Collections.emptyMap();
- return partsSizes.getOrDefault(grpId, Collections.emptyMap());
+ try {
+ return compressed()
+ ? U.unmarshalZip(ctx.marshaller(), partsSizesBytes, ctx.deploy().globalLoader())
+ : U.unmarshal(ctx, partsSizesBytes, ctx.deploy().globalLoader());
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException(ex);
+ }
}
/**
@@ -435,9 +429,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsToReload != null && partsToReloadBytes == null)
objectsToMarshall.add(partsToReload);
- if (partsSizes != null && partsSizesBytes == null)
- objectsToMarshall.add(partsSizes);
-
if (!F.isEmpty(errs) && errsBytes == null)
objectsToMarshall.add(errs);
@@ -449,7 +440,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Override public byte[] apply(Object payload) throws IgniteCheckedException {
byte[] marshalled = U.marshal(ctx, payload);
- if(compress)
+ if(compressed())
marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel());
return marshalled;
@@ -473,17 +464,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsToReload != null && partsToReloadBytes == null)
partsToReloadBytes = iterator.next();
- if (partsSizes != null && partsSizesBytes == null)
- partsSizesBytes = iterator.next();
-
if (!F.isEmpty(errs) && errsBytes == null)
errsBytes = iterator.next();
-
- if (compress) {
- assert !compressed() : "Unexpected compressed state";
-
- compressed(true);
- }
}
}
@@ -527,9 +509,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsToReloadBytes != null && partsToReload == null)
objectsToUnmarshall.add(partsToReloadBytes);
- if (partsSizesBytes != null && partsSizes == null)
- objectsToUnmarshall.add(partsSizesBytes);
-
if (errsBytes != null && errs == null)
objectsToUnmarshall.add(errsBytes);
@@ -591,9 +570,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsToReloadBytes != null && partsToReload == null)
partsToReload = (IgniteDhtPartitionsToReloadMap)iterator.next();
- if (partsSizesBytes != null && partsSizes == null)
- partsSizes = (Map<Integer, Map<Integer, Long>>)iterator.next();
-
if (errsBytes != null && errs == null)
errs = (Map<UUID, Exception>)iterator.next();
@@ -612,9 +588,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if(partsToReload == null)
partsToReload = new IgniteDhtPartitionsToReloadMap();
- if(partsSizes == null)
- partsSizes = new HashMap<>();
-
if (errs == null)
errs = new HashMap<>();
}
@@ -869,4 +842,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
}
}
+
+ /**
+ * Cleans up resources to avoid excessive memory usage.
+ */
+ public void cleanUp() {
+ partsBytes = null;
+ partCntrs2 = null;
+ partCntrsBytes = null;
+ partCntrsBytes2 = null;
+ partHistSuppliersBytes = null;
+ partsToReloadBytes = null;
+ partsSizesBytes = null;
+ errsBytes = null;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index bbd2480..d0d98c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -95,10 +95,6 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
private boolean client;
/** */
- @GridDirectTransient
- private transient boolean compress;
-
- /** */
@GridDirectCollection(Integer.class)
private Collection<Integer> grpsAffRequest;
@@ -131,8 +127,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
) {
super(exchId, lastVer);
+ compressed(compress);
+
this.client = client;
- this.compress = compress;
}
/**
@@ -187,7 +184,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
parts.put(cacheId, locMap);
if (dupDataCache != null) {
- assert compress;
+ assert compressed();
assert F.isEmpty(locMap.map());
assert parts.containsKey(dupDataCache);
@@ -366,9 +363,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (err != null && errBytes == null)
errBytes0 = U.marshal(ctx, err);
- if (compress) {
- assert !compressed();
-
+ if (compressed()) {
try {
byte[] partsBytesZip = U.zip(partsBytes0);
byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
@@ -381,8 +376,6 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
partHistCntrsBytes0 = partHistCntrsBytesZip;
partsSizesBytes0 = partsSizesBytesZip;
errBytes0 = exBytesZip;
-
- compressed(true);
}
catch (IgniteCheckedException e) {
U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
index 02841a4..bc1c9e5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
@@ -347,7 +347,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
@Override public boolean apply() {
return ig2.cache(CACHE1).localMetrics().getKeysToRebalanceLeft() == 0;
}
- }, timeLeft + 10_000L);
+ }, timeLeft + 12_000L);
log.info("[timePassed=" + timePassed + ", timeLeft=" + timeLeft +
", Time to rebalance=" + (finishTime - startTime) +
@@ -362,7 +362,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
long diff = finishTime - currTime;
- assertTrue("Expected less than 10000, but actual: " + diff, Math.abs(diff) < 10_000L);
+ assertTrue("Expected less than 12000, but actual: " + diff, Math.abs(diff) < 12_000L);
}
/**