You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/05/07 15:21:29 UTC

[ignite] 21/41: GG-17331 Clean up GridDhtPartitionsFullMessage when not needed, do not hold decompressed partsSizes in field.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch gg-18540
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit c35f196fb4cde684e1de2ce240a362a849dde922
Author: Ilya Kasnacheev <il...@gmail.com>
AuthorDate: Fri Apr 26 14:35:02 2019 +0300

    GG-17331 Clean up GridDhtPartitionsFullMessage when not needed, do not hold decompressed partsSizes in field.
    
    Since uncompression of partsSizes is de-parallelized, PME might be slightly slower while consuming way less Heap.
    
    Cherry-picked from 478277e5e3fe1a535ea905f8beab42926453825a
---
 .../cache/GridCachePartitionExchangeManager.java   | 21 ++++-
 .../GridDhtPartitionsAbstractMessage.java          |  4 +-
 .../preloader/GridDhtPartitionsExchangeFuture.java | 14 ++-
 .../preloader/GridDhtPartitionsFullMessage.java    | 99 ++++++++++------------
 .../preloader/GridDhtPartitionsSingleMessage.java  | 15 +---
 .../cache/CacheGroupsMetricsRebalanceTest.java     |  4 +-
 6 files changed, 81 insertions(+), 76 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index bcb1dc5..357fa6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1377,10 +1377,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             partsToReload
             );
 
-        m.compress(compress);
+        m.compressed(compress);
 
         final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
 
+        Map<Integer, Map<Integer, Long>> partsSizes = new HashMap<>();
+
         for (CacheGroupContext grp : grps) {
             if (!grp.isLocal()) {
                 if (exchId != null) {
@@ -1397,7 +1399,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 if (locMap != null)
                     addFullPartitionsMap(m, dupData, compress, grp.groupId(), locMap, affCache.similarAffinityKey());
 
-                m.addPartitionSizes(grp.groupId(), grp.topology().globalPartSizes());
+                Map<Integer, Long> partSizesMap = grp.topology().globalPartSizes();
+
+                if (!partSizesMap.isEmpty())
+                    partsSizes.put(grp.groupId(), partSizesMap);
 
                 if (exchId != null) {
                     CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters();
@@ -1427,12 +1432,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 else
                     m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap));
 
-                m.addPartitionSizes(top.groupId(), top.globalPartSizes());
+                Map<Integer, Long> partSizesMap = top.globalPartSizes();
+
+                if (!partSizesMap.isEmpty())
+                    partsSizes.put(top.groupId(), partSizesMap);
             }
         }
 
         cctx.kernalContext().txDr().onPartitionsFullMessagePrepared(exchId, m);
 
+        if (!partsSizes.isEmpty())
+            m.partitionSizes(cctx, partsSizes);
+
         return m;
     }
 
@@ -1778,6 +1789,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                 boolean updated = false;
 
+                Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
+
                 for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
                     Integer grpId = entry.getKey();
 
@@ -1795,7 +1808,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             entry.getValue(),
                             null,
                             msg.partsToReload(cctx.localNodeId(), grpId),
-                            msg.partitionSizes(grpId),
+                            partsSizes.getOrDefault(grpId, Collections.emptyMap()),
                             msg.topologyVersion());
                     }
                 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index a82b4ba..e5ae699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -117,14 +117,14 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
     /**
      * @return {@code True} if message data is compressed.
      */
-    protected final boolean compressed() {
+    public final boolean compressed() {
         return (flags & COMPRESSED_FLAG_MASK) != 0;
     }
 
     /**
      * @param compressed {@code True} if message data is compressed.
      */
-    protected final void compressed(boolean compressed) {
+    public final void compressed(boolean compressed) {
         flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK);
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4ba4d5b..6330d4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2493,6 +2493,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         newCrdFut = null;
         exchangeLocE = null;
         exchangeGlobalExceptions.clear();
+        if (finishState != null)
+            finishState.cleanUp();
     }
 
     /**
@@ -4250,6 +4252,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
 
         try {
+            Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
+
             doInParallel(
                 parallelismLvl,
                 cctx.kernalContext().getSystemExecutorService(),
@@ -4264,7 +4268,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                             msg.partitions().get(grpId),
                             cntrMap,
                             msg.partsToReload(cctx.localNodeId(), grpId),
-                            msg.partitionSizes(grpId),
+                            partsSizes.getOrDefault(grpId, Collections.emptyMap()),
                             null);
                     }
                     else {
@@ -5079,6 +5083,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             this.resTopVer = resTopVer;
             this.msg = msg;
         }
+
+        /**
+         * Cleans up resources to avoid excessive memory usage.
+         */
+        public void cleanUp() {
+            if (msg != null)
+                msg.cleanUp();
+        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index b010ff5..b8f57ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
@@ -99,11 +100,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /** Serialized partitions that must be cleared and re-loaded. */
     private byte[] partsToReloadBytes;
 
-    /** Partitions sizes. */
-    @GridToStringInclude
-    @GridDirectTransient
-    private Map<Integer, Map<Integer, Long>> partsSizes;
-
     /** Serialized partitions sizes. */
     private byte[] partsSizesBytes;
 
@@ -119,10 +115,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     private byte[] errsBytes;
 
     /** */
-    @GridDirectTransient
-    private transient boolean compress;
-
-    /** */
     private AffinityTopologyVersion resTopVer;
 
     /** */
@@ -178,12 +170,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         cp.partHistSuppliersBytes = partHistSuppliersBytes;
         cp.partsToReload = partsToReload;
         cp.partsToReloadBytes = partsToReloadBytes;
-        cp.partsSizes = partsSizes;
         cp.partsSizesBytes = partsSizesBytes;
         cp.topVer = topVer;
         cp.errs = errs;
         cp.errsBytes = errsBytes;
-        cp.compress = compress;
         cp.resTopVer = resTopVer;
         cp.joinedNodeAff = joinedNodeAff;
         cp.idealAffDiff = idealAffDiff;
@@ -250,13 +240,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     }
 
     /**
-     * @param compress {@code True} if it is possible to use compression for message.
-     */
-    public void compress(boolean compress) {
-        this.compress = compress;
-    }
-
-    /**
      * @return Local partitions.
      */
     public Map<Integer, GridDhtPartitionFullMap> partitions() {
@@ -289,7 +272,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             parts.put(grpId, fullMap);
 
             if (dupDataCache != null) {
-                assert compress;
+                assert compressed();
                 assert parts.containsKey(dupDataCache);
 
                 if (dupPartsData == null)
@@ -360,32 +343,43 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     }
 
     /**
-     * Adds partition sizes map for specified {@code grpId} to the current message.
+     * Supplies partition sizes map for all cache groups.
      *
-     * @param grpId Group id.
-     * @param partSizesMap Partition sizes map.
+     * @param ctx Cache context.
+     * @param partsSizes Partitions sizes map.
      */
-    public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) {
-        if (partSizesMap.isEmpty())
-            return;
+    public void partitionSizes(GridCacheSharedContext ctx, Map<Integer, Map<Integer, Long>> partsSizes) {
+        try {
+            byte[] marshalled = U.marshal(ctx, partsSizes);
 
-        if (partsSizes == null)
-            partsSizes = new HashMap<>();
+            if (compressed())
+                marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel());
 
-        partsSizes.put(grpId, partSizesMap);
+            partsSizesBytes = marshalled;
+        }
+        catch (IgniteCheckedException ex) {
+            throw new IgniteException(ex);
+        }
     }
 
     /**
-     * Returns partition sizes map for specified {@code grpId}.
+     * Returns partition sizes map for all cache groups.
      *
-     * @param grpId Group id.
-     * @return Partition sizes map (partId, partSize).
+     * @param ctx Cache context.
+     * @return Partition sizes map (grpId, (partId, partSize)).
      */
-    public Map<Integer, Long> partitionSizes(int grpId) {
-        if (partsSizes == null)
+    public Map<Integer, Map<Integer, Long>> partitionSizes(GridCacheSharedContext ctx) {
+        if (partsSizesBytes == null)
             return Collections.emptyMap();
 
-        return partsSizes.getOrDefault(grpId, Collections.emptyMap());
+        try {
+            return compressed()
+                ? U.unmarshalZip(ctx.marshaller(), partsSizesBytes, ctx.deploy().globalLoader())
+                : U.unmarshal(ctx, partsSizesBytes, ctx.deploy().globalLoader());
+        }
+        catch (IgniteCheckedException ex) {
+            throw new IgniteException(ex);
+        }
     }
 
     /**
@@ -434,9 +428,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             if (partsToReload != null && partsToReloadBytes == null)
                 objectsToMarshall.add(partsToReload);
 
-            if (partsSizes != null && partsSizesBytes == null)
-                objectsToMarshall.add(partsSizes);
-
             if (!F.isEmpty(errs) && errsBytes == null)
                 objectsToMarshall.add(errs);
 
@@ -448,7 +439,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                     @Override public byte[] apply(Object payload) throws IgniteCheckedException {
                         byte[] marshalled = U.marshal(ctx, payload);
 
-                        if(compress)
+                        if(compressed())
                             marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel());
 
                         return marshalled;
@@ -472,17 +463,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             if (partsToReload != null && partsToReloadBytes == null)
                 partsToReloadBytes = iterator.next();
 
-            if (partsSizes != null && partsSizesBytes == null)
-                partsSizesBytes = iterator.next();
-
             if (!F.isEmpty(errs) && errsBytes == null)
                 errsBytes = iterator.next();
-
-            if (compress) {
-                assert !compressed() : "Unexpected compressed state";
-
-                compressed(true);
-            }
         }
     }
 
@@ -526,9 +508,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         if (partsToReloadBytes != null && partsToReload == null)
             objectsToUnmarshall.add(partsToReloadBytes);
 
-        if (partsSizesBytes != null && partsSizes == null)
-            objectsToUnmarshall.add(partsSizesBytes);
-
         if (errsBytes != null && errs == null)
             objectsToUnmarshall.add(errsBytes);
 
@@ -590,9 +569,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         if (partsToReloadBytes != null && partsToReload == null)
             partsToReload = (IgniteDhtPartitionsToReloadMap)iterator.next();
 
-        if (partsSizesBytes != null && partsSizes == null)
-            partsSizes = (Map<Integer, Map<Integer, Long>>)iterator.next();
-
         if (errsBytes != null && errs == null)
             errs = (Map<UUID, Exception>)iterator.next();
 
@@ -611,9 +587,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         if(partsToReload == null)
             partsToReload = new IgniteDhtPartitionsToReloadMap();
 
-        if(partsSizes == null)
-            partsSizes = new HashMap<>();
-
         if (errs == null)
             errs = new HashMap<>();
     }
@@ -868,4 +841,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             }
         }
     }
+
+    /**
+     * Cleans up resources to avoid excessive memory usage.
+     */
+    public void cleanUp() {
+        partsBytes = null;
+        partCntrs2 = null;
+        partCntrsBytes = null;
+        partCntrsBytes2 = null;
+        partHistSuppliersBytes = null;
+        partsToReloadBytes = null;
+        partsSizesBytes = null;
+        errsBytes = null;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 398649f..793224c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -94,10 +94,6 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     private boolean client;
 
     /** */
-    @GridDirectTransient
-    private transient boolean compress;
-
-    /** */
     @GridDirectCollection(Integer.class)
     private Collection<Integer> grpsAffRequest;
 
@@ -130,8 +126,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     ) {
         super(exchId, lastVer);
 
+        compressed(compress);
+
         this.client = client;
-        this.compress = compress;
     }
 
     /**
@@ -186,7 +183,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         parts.put(cacheId, locMap);
 
         if (dupDataCache != null) {
-            assert compress;
+            assert compressed();
             assert F.isEmpty(locMap.map());
             assert parts.containsKey(dupDataCache);
 
@@ -365,9 +362,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             if (err != null && errBytes == null)
                 errBytes0 = U.marshal(ctx, err);
 
-            if (compress) {
-                assert !compressed();
-
+            if (compressed()) {
                 try {
                     byte[] partsBytesZip = U.zip(partsBytes0);
                     byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
@@ -380,8 +375,6 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                     partHistCntrsBytes0 = partHistCntrsBytesZip;
                     partsSizesBytes0 = partsSizesBytesZip;
                     errBytes0 = exBytesZip;
-
-                    compressed(true);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
index f615410..1eb1115 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
@@ -346,7 +346,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
             @Override public boolean apply() {
                 return ig2.cache(CACHE1).localMetrics().getKeysToRebalanceLeft() == 0;
             }
-        }, timeLeft + 10_000L);
+        }, timeLeft + 12_000L);
 
         log.info("[timePassed=" + timePassed + ", timeLeft=" + timeLeft +
                 ", Time to rebalance=" + (finishTime - startTime) +
@@ -361,7 +361,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
 
         long diff = finishTime - currTime;
 
-        assertTrue("Expected less than 10000, but actual: " + diff, Math.abs(diff) < 10_000L);
+        assertTrue("Expected less than 12000, but actual: " + diff, Math.abs(diff) < 12_000L);
     }
 
     /**