You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/05/07 15:21:29 UTC
[ignite] 21/41: GG-17331 Clean up GridDhtPartitionsFullMessage when
not needed, do not hold decompressed partsSizes in field.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch gg-18540
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit c35f196fb4cde684e1de2ce240a362a849dde922
Author: Ilya Kasnacheev <il...@gmail.com>
AuthorDate: Fri Apr 26 14:35:02 2019 +0300
GG-17331 Clean up GridDhtPartitionsFullMessage when not needed, do not hold decompressed partsSizes in field.
Since uncompression of partsSizes is de-parallelized, PME might be slightly slower while consuming way less Heap.
Cherry-picked from 478277e5e3fe1a535ea905f8beab42926453825a
---
.../cache/GridCachePartitionExchangeManager.java | 21 ++++-
.../GridDhtPartitionsAbstractMessage.java | 4 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 14 ++-
.../preloader/GridDhtPartitionsFullMessage.java | 99 ++++++++++------------
.../preloader/GridDhtPartitionsSingleMessage.java | 15 +---
.../cache/CacheGroupsMetricsRebalanceTest.java | 4 +-
6 files changed, 81 insertions(+), 76 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index bcb1dc5..357fa6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1377,10 +1377,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
partsToReload
);
- m.compress(compress);
+ m.compressed(compress);
final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
+ Map<Integer, Map<Integer, Long>> partsSizes = new HashMap<>();
+
for (CacheGroupContext grp : grps) {
if (!grp.isLocal()) {
if (exchId != null) {
@@ -1397,7 +1399,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (locMap != null)
addFullPartitionsMap(m, dupData, compress, grp.groupId(), locMap, affCache.similarAffinityKey());
- m.addPartitionSizes(grp.groupId(), grp.topology().globalPartSizes());
+ Map<Integer, Long> partSizesMap = grp.topology().globalPartSizes();
+
+ if (!partSizesMap.isEmpty())
+ partsSizes.put(grp.groupId(), partSizesMap);
if (exchId != null) {
CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters();
@@ -1427,12 +1432,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
else
m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap));
- m.addPartitionSizes(top.groupId(), top.globalPartSizes());
+ Map<Integer, Long> partSizesMap = top.globalPartSizes();
+
+ if (!partSizesMap.isEmpty())
+ partsSizes.put(top.groupId(), partSizesMap);
}
}
cctx.kernalContext().txDr().onPartitionsFullMessagePrepared(exchId, m);
+ if (!partsSizes.isEmpty())
+ m.partitionSizes(cctx, partsSizes);
+
return m;
}
@@ -1778,6 +1789,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean updated = false;
+ Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
+
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
@@ -1795,7 +1808,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
entry.getValue(),
null,
msg.partsToReload(cctx.localNodeId(), grpId),
- msg.partitionSizes(grpId),
+ partsSizes.getOrDefault(grpId, Collections.emptyMap()),
msg.topologyVersion());
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index a82b4ba..e5ae699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -117,14 +117,14 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
/**
* @return {@code True} if message data is compressed.
*/
- protected final boolean compressed() {
+ public final boolean compressed() {
return (flags & COMPRESSED_FLAG_MASK) != 0;
}
/**
* @param compressed {@code True} if message data is compressed.
*/
- protected final void compressed(boolean compressed) {
+ public final void compressed(boolean compressed) {
flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4ba4d5b..6330d4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2493,6 +2493,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
newCrdFut = null;
exchangeLocE = null;
exchangeGlobalExceptions.clear();
+ if (finishState != null)
+ finishState.cleanUp();
}
/**
@@ -4250,6 +4252,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
try {
+ Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
+
doInParallel(
parallelismLvl,
cctx.kernalContext().getSystemExecutorService(),
@@ -4264,7 +4268,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
msg.partitions().get(grpId),
cntrMap,
msg.partsToReload(cctx.localNodeId(), grpId),
- msg.partitionSizes(grpId),
+ partsSizes.getOrDefault(grpId, Collections.emptyMap()),
null);
}
else {
@@ -5079,6 +5083,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
this.resTopVer = resTopVer;
this.msg = msg;
}
+
+ /**
+ * Cleans up resources to avoid excessive memory usage.
+ */
+ public void cleanUp() {
+ if (msg != null)
+ msg.cleanUp();
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index b010ff5..b8f57ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
@@ -99,11 +100,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** Serialized partitions that must be cleared and re-loaded. */
private byte[] partsToReloadBytes;
- /** Partitions sizes. */
- @GridToStringInclude
- @GridDirectTransient
- private Map<Integer, Map<Integer, Long>> partsSizes;
-
/** Serialized partitions sizes. */
private byte[] partsSizesBytes;
@@ -119,10 +115,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
private byte[] errsBytes;
/** */
- @GridDirectTransient
- private transient boolean compress;
-
- /** */
private AffinityTopologyVersion resTopVer;
/** */
@@ -178,12 +170,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
cp.partHistSuppliersBytes = partHistSuppliersBytes;
cp.partsToReload = partsToReload;
cp.partsToReloadBytes = partsToReloadBytes;
- cp.partsSizes = partsSizes;
cp.partsSizesBytes = partsSizesBytes;
cp.topVer = topVer;
cp.errs = errs;
cp.errsBytes = errsBytes;
- cp.compress = compress;
cp.resTopVer = resTopVer;
cp.joinedNodeAff = joinedNodeAff;
cp.idealAffDiff = idealAffDiff;
@@ -250,13 +240,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/**
- * @param compress {@code True} if it is possible to use compression for message.
- */
- public void compress(boolean compress) {
- this.compress = compress;
- }
-
- /**
* @return Local partitions.
*/
public Map<Integer, GridDhtPartitionFullMap> partitions() {
@@ -289,7 +272,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
parts.put(grpId, fullMap);
if (dupDataCache != null) {
- assert compress;
+ assert compressed();
assert parts.containsKey(dupDataCache);
if (dupPartsData == null)
@@ -360,32 +343,43 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/**
- * Adds partition sizes map for specified {@code grpId} to the current message.
+ * Supplies partition sizes map for all cache groups.
*
- * @param grpId Group id.
- * @param partSizesMap Partition sizes map.
+ * @param ctx Cache context.
+ * @param partsSizes Partitions sizes map.
*/
- public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) {
- if (partSizesMap.isEmpty())
- return;
+ public void partitionSizes(GridCacheSharedContext ctx, Map<Integer, Map<Integer, Long>> partsSizes) {
+ try {
+ byte[] marshalled = U.marshal(ctx, partsSizes);
- if (partsSizes == null)
- partsSizes = new HashMap<>();
+ if (compressed())
+ marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel());
- partsSizes.put(grpId, partSizesMap);
+ partsSizesBytes = marshalled;
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException(ex);
+ }
}
/**
- * Returns partition sizes map for specified {@code grpId}.
+ * Returns partition sizes map for all cache groups.
*
- * @param grpId Group id.
- * @return Partition sizes map (partId, partSize).
+ * @param ctx Cache context.
+ * @return Partition sizes map (grpId, (partId, partSize)).
*/
- public Map<Integer, Long> partitionSizes(int grpId) {
- if (partsSizes == null)
+ public Map<Integer, Map<Integer, Long>> partitionSizes(GridCacheSharedContext ctx) {
+ if (partsSizesBytes == null)
return Collections.emptyMap();
- return partsSizes.getOrDefault(grpId, Collections.emptyMap());
+ try {
+ return compressed()
+ ? U.unmarshalZip(ctx.marshaller(), partsSizesBytes, ctx.deploy().globalLoader())
+ : U.unmarshal(ctx, partsSizesBytes, ctx.deploy().globalLoader());
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException(ex);
+ }
}
/**
@@ -434,9 +428,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsToReload != null && partsToReloadBytes == null)
objectsToMarshall.add(partsToReload);
- if (partsSizes != null && partsSizesBytes == null)
- objectsToMarshall.add(partsSizes);
-
if (!F.isEmpty(errs) && errsBytes == null)
objectsToMarshall.add(errs);
@@ -448,7 +439,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Override public byte[] apply(Object payload) throws IgniteCheckedException {
byte[] marshalled = U.marshal(ctx, payload);
- if(compress)
+ if(compressed())
marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel());
return marshalled;
@@ -472,17 +463,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsToReload != null && partsToReloadBytes == null)
partsToReloadBytes = iterator.next();
- if (partsSizes != null && partsSizesBytes == null)
- partsSizesBytes = iterator.next();
-
if (!F.isEmpty(errs) && errsBytes == null)
errsBytes = iterator.next();
-
- if (compress) {
- assert !compressed() : "Unexpected compressed state";
-
- compressed(true);
- }
}
}
@@ -526,9 +508,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsToReloadBytes != null && partsToReload == null)
objectsToUnmarshall.add(partsToReloadBytes);
- if (partsSizesBytes != null && partsSizes == null)
- objectsToUnmarshall.add(partsSizesBytes);
-
if (errsBytes != null && errs == null)
objectsToUnmarshall.add(errsBytes);
@@ -590,9 +569,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsToReloadBytes != null && partsToReload == null)
partsToReload = (IgniteDhtPartitionsToReloadMap)iterator.next();
- if (partsSizesBytes != null && partsSizes == null)
- partsSizes = (Map<Integer, Map<Integer, Long>>)iterator.next();
-
if (errsBytes != null && errs == null)
errs = (Map<UUID, Exception>)iterator.next();
@@ -611,9 +587,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if(partsToReload == null)
partsToReload = new IgniteDhtPartitionsToReloadMap();
- if(partsSizes == null)
- partsSizes = new HashMap<>();
-
if (errs == null)
errs = new HashMap<>();
}
@@ -868,4 +841,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
}
}
+
+ /**
+ * Cleans up resources to avoid excessive memory usage.
+ */
+ public void cleanUp() {
+ partsBytes = null;
+ partCntrs2 = null;
+ partCntrsBytes = null;
+ partCntrsBytes2 = null;
+ partHistSuppliersBytes = null;
+ partsToReloadBytes = null;
+ partsSizesBytes = null;
+ errsBytes = null;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 398649f..793224c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -94,10 +94,6 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
private boolean client;
/** */
- @GridDirectTransient
- private transient boolean compress;
-
- /** */
@GridDirectCollection(Integer.class)
private Collection<Integer> grpsAffRequest;
@@ -130,8 +126,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
) {
super(exchId, lastVer);
+ compressed(compress);
+
this.client = client;
- this.compress = compress;
}
/**
@@ -186,7 +183,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
parts.put(cacheId, locMap);
if (dupDataCache != null) {
- assert compress;
+ assert compressed();
assert F.isEmpty(locMap.map());
assert parts.containsKey(dupDataCache);
@@ -365,9 +362,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (err != null && errBytes == null)
errBytes0 = U.marshal(ctx, err);
- if (compress) {
- assert !compressed();
-
+ if (compressed()) {
try {
byte[] partsBytesZip = U.zip(partsBytes0);
byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
@@ -380,8 +375,6 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
partHistCntrsBytes0 = partHistCntrsBytesZip;
partsSizesBytes0 = partsSizesBytesZip;
errBytes0 = exBytesZip;
-
- compressed(true);
}
catch (IgniteCheckedException e) {
U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
index f615410..1eb1115 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
@@ -346,7 +346,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
@Override public boolean apply() {
return ig2.cache(CACHE1).localMetrics().getKeysToRebalanceLeft() == 0;
}
- }, timeLeft + 10_000L);
+ }, timeLeft + 12_000L);
log.info("[timePassed=" + timePassed + ", timeLeft=" + timeLeft +
", Time to rebalance=" + (finishTime - startTime) +
@@ -361,7 +361,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
long diff = finishTime - currTime;
- assertTrue("Expected less than 10000, but actual: " + diff, Math.abs(diff) < 10_000L);
+ assertTrue("Expected less than 12000, but actual: " + diff, Math.abs(diff) < 12_000L);
}
/**