You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2021/09/14 21:56:35 UTC
[brooklyn-server] 16/27: better cleanup on switch from RO/hot
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 751b7ff77919a46728faa98f8e203314cb9d870b
Author: Alex Heneveld <al...@cloudsoftcorp.com>
AuthorDate: Tue Sep 14 13:52:22 2021 +0100
better cleanup on switch from RO/hot
---
.../brooklyn/core/mgmt/rebind/RebindIteration.java | 28 ++-------------
.../core/mgmt/rebind/RebindManagerImpl.java | 42 +++++++++++++++++++++-
2 files changed, 43 insertions(+), 27 deletions(-)
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
index cfbd400..69dd588 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
@@ -264,32 +264,8 @@ public abstract class RebindIteration {
protected void doRun() throws Exception {
if (readOnlyRebindCount.get() > 1) {
- // wait for tasks
- Collection<Task<?>> entityTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive()
- .stream().filter(t -> BrooklynTaskTags.getContextEntity(t) != null).collect(Collectors.toList());
- List<Task<?>> openTasksIncludingCancelled;
- CountdownTimer time = CountdownTimer.newInstanceStarted(Duration.seconds(15));
- do {
- openTasksIncludingCancelled = entityTasks.stream().filter(t -> !t.isDone(true)).collect(Collectors.toList());
- List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList());
- if (openTasksIncludingCancelled.isEmpty()) break;
- if (time.isExpired() && !openTasksCancellable.isEmpty()) {
- LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) before rebinding again: " + openTasksCancellable);
- openTasksCancellable.forEach(t -> t.cancel(true));
- }
- if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) {
- LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again: " + openTasksIncludingCancelled);
- }
- LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again, details: " +
- openTasksIncludingCancelled.stream().map(t -> ""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList()));
- Time.sleep(Duration.millis(200));
- } while (true);
-
- entityTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask);
-
- List<Task<?>> otherDoneTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive()
- .stream().filter(t -> t.isDone(true)).collect(Collectors.toList());
- otherDoneTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask);
+ // prevent leaking
+ rebindManager.stopEntityAndDoneTasksBeforeRebinding();
}
loadManifestFiles();
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index 57395c7..2db7063 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.brooklyn.core.mgmt.rebind;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -29,6 +30,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.brooklyn.api.entity.Application;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
@@ -48,7 +50,9 @@ import org.apache.brooklyn.core.BrooklynFeatureEnablement;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl;
+import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
import org.apache.brooklyn.core.mgmt.persist.BrooklynMementoPersisterToObjectStore;
import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils;
@@ -61,11 +65,14 @@ import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.QuorumCheck;
import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;
import org.apache.brooklyn.util.core.task.BasicExecutionContext;
+import org.apache.brooklyn.util.core.task.BasicExecutionManager;
import org.apache.brooklyn.util.core.task.ScheduledTask;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
+import org.apache.brooklyn.util.time.CountdownTimer;
import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,7 +155,7 @@ public class RebindManagerImpl implements RebindManager {
private PersistenceActivityMetrics persistMetrics = new PersistenceActivityMetrics();
Integer firstRebindAppCount, firstRebindEntityCount, firstRebindItemCount;
-
+
/**
* For tracking if rebinding, for {@link AbstractEnricher#isRebinding()} etc.
*
@@ -391,6 +398,39 @@ public class RebindManagerImpl implements RebindManager {
readOnlyTask = null;
LOG.debug("Stopped read-only rebinding ("+this+"), mgmt "+managementContext.getManagementNodeId());
}
+ stopEntityAndDoneTasksBeforeRebinding();
+ }
+
+ public void stopEntityAndDoneTasksBeforeRebinding() {
+ // wait for tasks
+ Collection<Task<?>> entityTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive()
+ .stream().filter(t -> BrooklynTaskTags.getContextEntity(t) != null).collect(Collectors.toList());
+ List<Task<?>> openTasksIncludingCancelled;
+ CountdownTimer time = CountdownTimer.newInstanceStarted(Duration.seconds(15));
+ do {
+ openTasksIncludingCancelled = entityTasks.stream().filter(t -> !t.isDone(true)).collect(Collectors.toList());
+ List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList());
+ if (openTasksIncludingCancelled.isEmpty()) break;
+ if (time.isExpired() && !openTasksCancellable.isEmpty()) {
+ LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) before rebinding again: " + openTasksCancellable);
+ openTasksCancellable.forEach(t -> t.cancel(true));
+ }
+ if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) {
+ LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again: " + openTasksIncludingCancelled);
+ }
+ LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again, details: " +
+ openTasksIncludingCancelled.stream().map(t -> ""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList()));
+ Time.sleep(Duration.millis(200));
+ } while (true);
+
+ entityTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask);
+
+ List<Task<?>> otherDoneTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive()
+ .stream().filter(t -> t.isDone(true)).collect(Collectors.toList());
+ otherDoneTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask);
+
+ // also collect tasks, so that unmanaged entities are cleared before next run
+ ((LocalManagementContext)managementContext).getGarbageCollector().gcTasks();
}
@Override