You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2019/08/16 12:04:42 UTC
[ignite] branch ignite-2.7.6 updated: IGNITE-11767 Clean up
GridDhtPartitionsFullMessage when not needed,
do not hold decompressed partsSizes in field.
This is an automated email from the ASF dual-hosted git repository.
ilyak pushed a commit to branch ignite-2.7.6
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.7.6 by this push:
new 2a52689 IGNITE-11767 Clean up GridDhtPartitionsFullMessage when not needed, do not hold decompressed partsSizes in field.
2a52689 is described below
commit 2a5268940b5690af9a653d3ac03b3ddcad201e29
Author: Ilya Kasnacheev <il...@gmail.com>
AuthorDate: Thu Aug 15 16:57:15 2019 +0300
IGNITE-11767 Clean up GridDhtPartitionsFullMessage when not needed, do not hold decompressed partsSizes in field.
Since uncompression of partsSizes is de-parallelized, PME might be slightly slower while consuming way less Heap.
Cherry-picked from 478277e5e3fe1a535ea905f8beab42926453825a
---
.../cache/GridCachePartitionExchangeManager.java | 21 ++++-
.../GridDhtPartitionsAbstractMessage.java | 4 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 14 +++-
.../preloader/GridDhtPartitionsFullMessage.java | 97 ++++++++++------------
.../preloader/GridDhtPartitionsSingleMessage.java | 15 +---
.../cache/CacheGroupsMetricsRebalanceTest.java | 4 +-
6 files changed, 81 insertions(+), 74 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f18b404..20e3c02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1158,10 +1158,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
partsToReload
);
- m.compress(compress);
+ m.compressed(compress);
final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
+ Map<Integer, Map<Integer, Long>> partsSizes = new HashMap<>();
+
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (!grp.isLocal()) {
if (exchId != null) {
@@ -1184,7 +1186,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
affCache.similarAffinityKey());
}
- m.addPartitionSizes(grp.groupId(), grp.topology().globalPartSizes());
+ Map<Integer, Long> partSizesMap = grp.topology().globalPartSizes();
+
+ if (!partSizesMap.isEmpty())
+ partsSizes.put(grp.groupId(), partSizesMap);
if (exchId != null) {
CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters();
@@ -1220,10 +1225,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
else
m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap));
- m.addPartitionSizes(top.groupId(), top.globalPartSizes());
+ Map<Integer, Long> partSizesMap = top.globalPartSizes();
+
+ if (!partSizesMap.isEmpty())
+ partsSizes.put(top.groupId(), partSizesMap);
}
}
+ if (!partsSizes.isEmpty())
+ m.partitionSizes(cctx, partsSizes);
+
return m;
}
@@ -1535,6 +1546,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean updated = false;
+ Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
+
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
@@ -1552,7 +1565,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
entry.getValue(),
null,
msg.partsToReload(cctx.localNodeId(), grpId),
- msg.partitionSizes(grpId),
+ partsSizes.getOrDefault(grpId, Collections.emptyMap()),
msg.topologyVersion());
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 84cc792..a5eb237 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -118,14 +118,14 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
/**
* @return {@code True} if message data is compressed.
*/
- protected final boolean compressed() {
+ public final boolean compressed() {
return (flags & COMPRESSED_FLAG_MASK) != 0;
}
/**
* @param compressed {@code True} if message data is compressed.
*/
- protected final void compressed(boolean compressed) {
+ public final void compressed(boolean compressed) {
flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e84b7a0..d9e780a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2331,6 +2331,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
newCrdFut = null;
exchangeLocE = null;
exchangeGlobalExceptions.clear();
+ if (finishState != null)
+ finishState.cleanUp();
}
/**
@@ -3981,6 +3983,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
long time = System.currentTimeMillis();
+ Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
+
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
@@ -3994,7 +3998,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
entry.getValue(),
cntrMap,
msg.partsToReload(cctx.localNodeId(), grpId),
- msg.partitionSizes(grpId),
+ partsSizes.getOrDefault(grpId, Collections.emptyMap()),
null);
}
else {
@@ -4776,6 +4780,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
this.resTopVer = resTopVer;
this.msg = msg;
}
+
+ /**
+ * Cleans up resources to avoid excessive memory usage.
+ */
+ public void cleanUp() {
+ if (msg != null)
+ msg.cleanUp();
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index a63ab70..b9ffd9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
@@ -95,11 +96,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** Serialized partitions that must be cleared and re-loaded. */
private byte[] partsToReloadBytes;
- /** Partitions sizes. */
- @GridToStringInclude
- @GridDirectTransient
- private Map<Integer, Map<Integer, Long>> partsSizes;
-
/** Serialized partitions sizes. */
private byte[] partsSizesBytes;
@@ -115,10 +111,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
private byte[] errsBytes;
/** */
- @GridDirectTransient
- private transient boolean compress;
-
- /** */
private AffinityTopologyVersion resTopVer;
/** */
@@ -174,12 +166,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
cp.partHistSuppliersBytes = partHistSuppliersBytes;
cp.partsToReload = partsToReload;
cp.partsToReloadBytes = partsToReloadBytes;
- cp.partsSizes = partsSizes;
cp.partsSizesBytes = partsSizesBytes;
cp.topVer = topVer;
cp.errs = errs;
cp.errsBytes = errsBytes;
- cp.compress = compress;
cp.resTopVer = resTopVer;
cp.joinedNodeAff = joinedNodeAff;
cp.idealAffDiff = idealAffDiff;
@@ -246,13 +236,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/**
- * @param compress {@code True} if it is possible to use compression for message.
- */
- public void compress(boolean compress) {
- this.compress = compress;
- }
-
- /**
* @return Local partitions.
*/
public Map<Integer, GridDhtPartitionFullMap> partitions() {
@@ -285,7 +268,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
parts.put(grpId, fullMap);
if (dupDataCache != null) {
- assert compress;
+ assert compressed();
assert parts.containsKey(dupDataCache);
if (dupPartsData == null)
@@ -356,32 +339,43 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/**
- * Adds partition sizes map for specified {@code grpId} to the current message.
+ * Supplies partition sizes map for all cache groups.
*
- * @param grpId Group id.
- * @param partSizesMap Partition sizes map.
+ * @param ctx Cache context.
+ * @param partsSizes Partitions sizes map.
*/
- public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) {
- if (partSizesMap.isEmpty())
- return;
+ public void partitionSizes(GridCacheSharedContext ctx, Map<Integer, Map<Integer, Long>> partsSizes) {
+ try {
+ byte[] marshalled = U.marshal(ctx, partsSizes);
- if (partsSizes == null)
- partsSizes = new HashMap<>();
+ if (compressed())
+ marshalled = U.zip(marshalled);
- partsSizes.put(grpId, partSizesMap);
+ partsSizesBytes = marshalled;
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException(ex);
+ }
}
/**
- * Returns partition sizes map for specified {@code grpId}.
+ * Returns partition sizes map for all cache groups.
*
- * @param grpId Group id.
- * @return Partition sizes map (partId, partSize).
+ * @param ctx Cache context.
+ * @return Partition sizes map (grpId, (partId, partSize)).
*/
- public Map<Integer, Long> partitionSizes(int grpId) {
- if (partsSizes == null)
+ public Map<Integer, Map<Integer, Long>> partitionSizes(GridCacheSharedContext ctx) {
+ if (partsSizesBytes == null)
return Collections.emptyMap();
- return partsSizes.getOrDefault(grpId, Collections.emptyMap());
+ try {
+ return compressed()
+ ? U.unmarshalZip(ctx.marshaller(), partsSizesBytes, ctx.deploy().globalLoader())
+ : U.unmarshal(ctx, partsSizesBytes, ctx.deploy().globalLoader());
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException(ex);
+ }
}
/**
@@ -415,7 +409,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
byte[] partCntrsBytes20 = null;
byte[] partHistSuppliersBytes0 = null;
byte[] partsToReloadBytes0 = null;
- byte[] partsSizesBytes0 = null;
byte[] errsBytes0 = null;
if (!F.isEmpty(parts) && partsBytes == null)
@@ -433,32 +426,23 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsToReload != null && partsToReloadBytes == null)
partsToReloadBytes0 = U.marshal(ctx, partsToReload);
- if (partsSizes != null && partsSizesBytes == null)
- partsSizesBytes0 = U.marshal(ctx, partsSizes);
-
if (!F.isEmpty(errs) && errsBytes == null)
errsBytes0 = U.marshal(ctx, errs);
- if (compress) {
- assert !compressed();
-
+ if (compressed()) {
try {
byte[] partsBytesZip = U.zip(partsBytes0);
byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
byte[] partCntrsBytes2Zip = U.zip(partCntrsBytes20);
byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0);
byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0);
- byte[] partsSizesBytesZip = U.zip(partsSizesBytes0);
byte[] exsBytesZip = U.zip(errsBytes0);
-
partsBytes0 = partsBytesZip;
partCntrsBytes0 = partCntrsBytesZip;
partCntrsBytes20 = partCntrsBytes2Zip;
partHistSuppliersBytes0 = partHistSuppliersBytesZip;
partsToReloadBytes0 = partsToReloadBytesZip;
- partsSizesBytes0 = partsSizesBytesZip;
errsBytes0 = exsBytesZip;
-
compressed(true);
}
catch (IgniteCheckedException e) {
@@ -471,7 +455,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
partCntrsBytes2 = partCntrsBytes20;
partHistSuppliersBytes = partHistSuppliersBytes0;
partsToReloadBytes = partsToReloadBytes0;
- partsSizesBytes = partsSizesBytes0;
errsBytes = errsBytes0;
}
}
@@ -559,13 +542,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
partsToReload = U.unmarshal(ctx, partsToReloadBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}
- if (partsSizesBytes != null && partsSizes == null) {
- if (compressed())
- partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
- else
- partsSizes = U.unmarshal(ctx, partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
- }
-
if (partCntrs == null)
partCntrs = new IgniteDhtPartitionCountersMap();
@@ -575,7 +551,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
else
errs = U.unmarshal(ctx, errsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}
-
if (errs == null)
errs = new HashMap<>();
}
@@ -830,4 +805,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
}
}
+
+ /**
+ * Cleans up resources to avoid excessive memory usage.
+ */
+ public void cleanUp() {
+ partsBytes = null;
+ partCntrs2 = null;
+ partCntrsBytes = null;
+ partCntrsBytes2 = null;
+ partHistSuppliersBytes = null;
+ partsToReloadBytes = null;
+ partsSizesBytes = null;
+ errsBytes = null;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 088fb31..db5d800 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -96,10 +96,6 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
private boolean client;
/** */
- @GridDirectTransient
- private transient boolean compress;
-
- /** */
@GridDirectCollection(Integer.class)
private Collection<Integer> grpsAffRequest;
@@ -131,8 +127,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
boolean compress) {
super(exchId, lastVer);
+ compressed(compress);
+
this.client = client;
- this.compress = compress;
}
/**
@@ -201,7 +198,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
parts.put(cacheId, locMap);
if (dupDataCache != null) {
- assert compress;
+ assert compressed();
assert F.isEmpty(locMap.map());
assert parts.containsKey(dupDataCache);
@@ -367,9 +364,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (err != null && errBytes == null)
errBytes0 = U.marshal(ctx, err);
- if (compress) {
- assert !compressed();
-
+ if (compressed()) {
try {
byte[] partsBytesZip = U.zip(partsBytes0);
byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
@@ -382,8 +377,6 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
partHistCntrsBytes0 = partHistCntrsBytesZip;
partsSizesBytes0 = partsSizesBytesZip;
errBytes0 = exBytesZip;
-
- compressed(true);
}
catch (IgniteCheckedException e) {
U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
index af2dc63..3aa07a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
@@ -277,7 +277,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
@Override public boolean apply() {
return ig2.cache(CACHE1).localMetrics().getKeysToRebalanceLeft() == 0;
}
- }, timeLeft + 10_000L);
+ }, timeLeft + 12_000L);
log.info("[timePassed=" + timePassed + ", timeLeft=" + timeLeft +
", Time to rebalance=" + (finishTime - startTime) +
@@ -292,7 +292,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
long diff = finishTime - currTime;
- assertTrue("Expected less than 10000, but actual: " + diff, Math.abs(diff) < 10_000L);
+ assertTrue("Expected less than 12000, but actual: " + diff, Math.abs(diff) < 12_000L);
}
/**