You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2021/09/14 21:56:35 UTC

[brooklyn-server] 16/27: better cleanup on switch from RO/hot

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 751b7ff77919a46728faa98f8e203314cb9d870b
Author: Alex Heneveld <al...@cloudsoftcorp.com>
AuthorDate: Tue Sep 14 13:52:22 2021 +0100

    better cleanup on switch from RO/hot
---
 .../brooklyn/core/mgmt/rebind/RebindIteration.java | 28 ++-------------
 .../core/mgmt/rebind/RebindManagerImpl.java        | 42 +++++++++++++++++++++-
 2 files changed, 43 insertions(+), 27 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
index cfbd400..69dd588 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
@@ -264,32 +264,8 @@ public abstract class RebindIteration {
 
     protected void doRun() throws Exception {
         if (readOnlyRebindCount.get() > 1) {
-            // wait for tasks
-            Collection<Task<?>> entityTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive()
-                    .stream().filter(t -> BrooklynTaskTags.getContextEntity(t) != null).collect(Collectors.toList());
-            List<Task<?>> openTasksIncludingCancelled;
-            CountdownTimer time = CountdownTimer.newInstanceStarted(Duration.seconds(15));
-            do {
-                openTasksIncludingCancelled = entityTasks.stream().filter(t -> !t.isDone(true)).collect(Collectors.toList());
-                List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList());
-                if (openTasksIncludingCancelled.isEmpty()) break;
-                if (time.isExpired() && !openTasksCancellable.isEmpty()) {
-                    LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) before rebinding again: " + openTasksCancellable);
-                    openTasksCancellable.forEach(t -> t.cancel(true));
-                }
-                if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) {
-                    LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again: " + openTasksIncludingCancelled);
-                }
-                LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again, details: " +
-                        openTasksIncludingCancelled.stream().map(t -> ""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList()));
-                Time.sleep(Duration.millis(200));
-            } while (true);
-
-            entityTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask);
-
-            List<Task<?>> otherDoneTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive()
-                    .stream().filter(t -> t.isDone(true)).collect(Collectors.toList());
-            otherDoneTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask);
+            // prevent leaking
+            rebindManager.stopEntityAndDoneTasksBeforeRebinding();
         }
 
         loadManifestFiles();
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index 57395c7..2db7063 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.brooklyn.core.mgmt.rebind;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +30,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import java.util.stream.Collectors;
 import org.apache.brooklyn.api.entity.Application;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.ExecutionContext;
@@ -48,7 +50,9 @@ import org.apache.brooklyn.core.BrooklynFeatureEnablement;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.enricher.AbstractEnricher;
 import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl;
+import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
 import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
 import org.apache.brooklyn.core.mgmt.persist.BrooklynMementoPersisterToObjectStore;
 import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils;
@@ -61,11 +65,14 @@ import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.QuorumCheck;
 import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;
 import org.apache.brooklyn.util.core.task.BasicExecutionContext;
+import org.apache.brooklyn.util.core.task.BasicExecutionManager;
 import org.apache.brooklyn.util.core.task.ScheduledTask;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
+import org.apache.brooklyn.util.time.CountdownTimer;
 import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -148,7 +155,7 @@ public class RebindManagerImpl implements RebindManager {
     private PersistenceActivityMetrics persistMetrics = new PersistenceActivityMetrics();
 
     Integer firstRebindAppCount, firstRebindEntityCount, firstRebindItemCount;
-    
+
     /**
      * For tracking if rebinding, for {@link AbstractEnricher#isRebinding()} etc.
      *  
@@ -391,6 +398,39 @@ public class RebindManagerImpl implements RebindManager {
             readOnlyTask = null;
             LOG.debug("Stopped read-only rebinding ("+this+"), mgmt "+managementContext.getManagementNodeId());
         }
+        stopEntityAndDoneTasksBeforeRebinding();
+    }
+
+    public void stopEntityAndDoneTasksBeforeRebinding() {
+        // wait for tasks
+        Collection<Task<?>> entityTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive()
+                .stream().filter(t -> BrooklynTaskTags.getContextEntity(t) != null).collect(Collectors.toList());
+        List<Task<?>> openTasksIncludingCancelled;
+        CountdownTimer time = CountdownTimer.newInstanceStarted(Duration.seconds(15));
+        do {
+            openTasksIncludingCancelled = entityTasks.stream().filter(t -> !t.isDone(true)).collect(Collectors.toList());
+            List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList());
+            if (openTasksIncludingCancelled.isEmpty()) break;
+            if (time.isExpired() && !openTasksCancellable.isEmpty()) {
+                LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) before rebinding again: " + openTasksCancellable);
+                openTasksCancellable.forEach(t -> t.cancel(true));
+            }
+            if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) {
+                LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again: " + openTasksIncludingCancelled);
+            }
+            LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again, details: " +
+                    openTasksIncludingCancelled.stream().map(t -> ""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList()));
+            Time.sleep(Duration.millis(200));
+        } while (true);
+
+        entityTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask);
+
+        List<Task<?>> otherDoneTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive()
+                .stream().filter(t -> t.isDone(true)).collect(Collectors.toList());
+        otherDoneTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask);
+
+        // also collect tasks, so that unmanaged entities are cleared before next run
+        ((LocalManagementContext)managementContext).getGarbageCollector().gcTasks();
     }
 
     @Override