You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2018/04/18 18:04:13 UTC

ignite git commit: IGNITE-8243 Fixed possible memory leak. Added latch manager to diagnostic messages. - Fixes #3850.

Repository: ignite
Updated Branches:
  refs/heads/master 5c7185436 -> 6c6f0949b


IGNITE-8243 Fixed possible memory leak. Added latch manager to diagnostic messages. - Fixes #3850.

Signed-off-by: dpavlov <dp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c6f0949
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c6f0949
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c6f0949

Branch: refs/heads/master
Commit: 6c6f0949b7e79bc3d9ddd5416a990109405b1fb8
Parents: 5c71854
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Wed Apr 18 21:03:32 2018 +0300
Committer: dpavlov <dp...@apache.org>
Committed: Wed Apr 18 21:03:32 2018 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  2 ++
 .../preloader/latch/ExchangeLatchManager.java   | 27 ++++++++++++++------
 2 files changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6c6f0949/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 20a3ccb..7ea161a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1633,6 +1633,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
         }
 
+        U.warn(diagnosticLog, "Latch manager state: " + latchMgr);
+
         dumpPendingObjects(exchTopVer, diagCtx);
 
         for (CacheGroupContext grp : cctx.cache().cacheGroups())

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c6f0949/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index 404f88f..b9c7dee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -64,21 +64,26 @@ public class ExchangeLatchManager {
     private final GridKernalContext ctx;
 
     /** Discovery manager. */
+    @GridToStringExclude
     private final GridDiscoveryManager discovery;
 
     /** IO manager. */
+    @GridToStringExclude
     private final GridIoManager io;
 
     /** Current coordinator. */
-    private volatile ClusterNode coordinator;
+    @GridToStringExclude
+    private volatile ClusterNode crd;
 
     /** Pending acks collection. */
     private final ConcurrentMap<T2<String, AffinityTopologyVersion>, Set<UUID>> pendingAcks = new ConcurrentHashMap<>();
 
     /** Server latches collection. */
+    @GridToStringInclude
     private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ServerLatch> serverLatches = new ConcurrentHashMap<>();
 
     /** Client latches collection. */
+    @GridToStringInclude
     private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ClientLatch> clientLatches = new ConcurrentHashMap<>();
 
     /** Lock. */
@@ -97,15 +102,14 @@ public class ExchangeLatchManager {
 
         if (!ctx.clientNode()) {
             ctx.io().addMessageListener(GridTopic.TOPIC_EXCHANGE, (nodeId, msg, plc) -> {
-                if (msg instanceof LatchAckMessage) {
+                if (msg instanceof LatchAckMessage)
                     processAck(nodeId, (LatchAckMessage) msg);
-                }
             });
 
             // First coordinator initialization.
             ctx.discovery().localJoinFuture().listen(f -> {
                 if (f.error() == null)
-                    this.coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE);
+                    this.crd = getLatchCoordinator(AffinityTopologyVersion.NONE);
             });
 
             ctx.event().addDiscoveryEventListener((e, cache) -> {
@@ -288,6 +292,8 @@ public class ExchangeLatchManager {
                     pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>());
                     pendingAcks.get(latchId).add(from);
                 }
+                else if (coordinator.isLocal())
+                    serverLatches.remove(latchId);
             } else {
                 if (log.isDebugEnabled())
                     log.debug("Process ack [latch=" + latchId + ", from=" + from + "]");
@@ -319,7 +325,7 @@ public class ExchangeLatchManager {
      */
     private void becomeNewCoordinator() {
         if (log.isInfoEnabled())
-            log.info("Become new coordinator " + coordinator.id());
+            log.info("Become new coordinator " + crd.id());
 
         List<T2<String, AffinityTopologyVersion>> latchesToRestore = new ArrayList<>();
         latchesToRestore.addAll(pendingAcks.keySet());
@@ -347,7 +353,7 @@ public class ExchangeLatchManager {
      * @param left Left node.
      */
     private void processNodeLeft(ClusterNode left) {
-        assert this.coordinator != null : "Coordinator is not initialized";
+        assert this.crd != null : "Coordinator is not initialized";
 
         lock.lock();
 
@@ -402,8 +408,8 @@ public class ExchangeLatchManager {
             }
 
             // Coordinator is changed.
-            if (coordinator.isLocal() && this.coordinator.id() != coordinator.id()) {
-                this.coordinator = coordinator;
+            if (coordinator.isLocal() && this.crd.id() != coordinator.id()) {
+                this.crd = coordinator;
 
                 becomeNewCoordinator();
             }
@@ -693,4 +699,9 @@ public class ExchangeLatchManager {
             return S.toString(CompletableLatch.class, this);
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ExchangeLatchManager.class, this);
+    }
 }