You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2021/09/14 21:56:36 UTC
[brooklyn-server] 17/27: stop tasks when ending RO mode also,
more cleanly restart on rebind/promotion
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 109876b10579641964aea9627e67259e04dd13e7
Author: Alex Heneveld <al...@cloudsoftcorp.com>
AuthorDate: Tue Sep 14 14:08:40 2021 +0100
stop tasks when ending RO mode also, more cleanly restart on rebind/promotion
---
.../core/mgmt/ha/HighAvailabilityManagerImpl.java | 5 ++
.../brooklyn/core/mgmt/rebind/RebindIteration.java | 3 +-
.../core/mgmt/rebind/RebindManagerImpl.java | 54 +++++++++++++++++-----
.../core/mgmt/rebind/ManagementPlaneIdTest.java | 6 +--
4 files changed, 53 insertions(+), 15 deletions(-)
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
index 00f511a..d3ad012 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
@@ -990,6 +990,9 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
* (e.g. during the periodic rebind as hot_stanby we will not repeatedly clear the brooklyn-managed-bundles).
*/
protected void clearManagedItems(ManagementTransitionMode mode) {
+ // note, tasks are cancelled prior to this, when coming from RO mode, via
+ // RebindManagerImpl.stopEntityAndDoneTasksBeforeRebinding
+
// log this because it may be surprising, it is just HA transitions,
// not symmetric with usual single-node start
LOG.info("Clearing all managed items on transition to "+mode);
@@ -1018,6 +1021,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
((BasicBrooklynCatalog)managementContext.getCatalog()).reset(CatalogDto.newEmptyInstance("<reset-by-ha-status-change>"));
((BasicBrooklynTypeRegistry)managementContext.getTypeRegistry()).clear();
managementContext.getCatalogInitialization().clearBrooklynManagedBundles();
+
+ ((LocalManagementContext)managementContext).getGarbageCollector().gcTasks();
}
/** Starts hot standby or hot backup, in foreground
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
index 69dd588..bf28d65 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
@@ -265,7 +265,7 @@ public abstract class RebindIteration {
protected void doRun() throws Exception {
if (readOnlyRebindCount.get() > 1) {
// prevent leaking
- rebindManager.stopEntityAndDoneTasksBeforeRebinding();
+ rebindManager.stopEntityAndDoneTasksBeforeRebinding("before next read-only rebind", Duration.seconds(10), Duration.seconds(20));
}
loadManifestFiles();
@@ -560,6 +560,7 @@ public abstract class RebindIteration {
try {
Feed feed = instantiator.newFeed(feedMemento);
rebindContext.registerFeed(feedMemento.getId(), feed);
+ // started during associateAdjunctsWithEntities by RebindAdjuncts
} catch (Exception e) {
exceptionHandler.onCreateFailed(BrooklynObjectType.FEED, feedMemento.getId(), feedMemento.getType(), e);
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index 2db7063..781a8c1 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -47,6 +47,7 @@ import org.apache.brooklyn.api.mgmt.rebind.mementos.TreeNode;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.BrooklynFeatureEnablement;
+import org.apache.brooklyn.core.BrooklynVersion;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.Entities;
@@ -398,29 +399,60 @@ public class RebindManagerImpl implements RebindManager {
readOnlyTask = null;
LOG.debug("Stopped read-only rebinding ("+this+"), mgmt "+managementContext.getManagementNodeId());
}
- stopEntityAndDoneTasksBeforeRebinding();
+ // short waits when promoting
+ stopEntityAndDoneTasksBeforeRebinding("when stopping hot proxy read-only mode",
+ Duration.seconds(2),
+ Duration.seconds(5));
+ // note, items are subsequently unmanaged via:
+ // HighAvailabilityManagerImpl.clearManagedItems
}
- public void stopEntityAndDoneTasksBeforeRebinding() {
+ public void stopEntityAndDoneTasksBeforeRebinding(String reason, Duration delayBeforeCancelling, Duration delayBeforeAbandoning) {
+ // TODO inputs should be configurable
+
+ if (!managementContext.isRunning() || managementContext.getExecutionManager().isShutdown()) {
+ return;
+ }
+
// wait for tasks
Collection<Task<?>> entityTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive()
.stream().filter(t -> BrooklynTaskTags.getContextEntity(t) != null).collect(Collectors.toList());
List<Task<?>> openTasksIncludingCancelled;
- CountdownTimer time = CountdownTimer.newInstanceStarted(Duration.seconds(15));
+ CountdownTimer timeBeforeCancelling = CountdownTimer.newInstanceStarted(delayBeforeCancelling);
+ CountdownTimer timeBeforeAbandoning = CountdownTimer.newInstanceStarted(delayBeforeAbandoning);
+ Duration backoff = Duration.millis(10);
do {
openTasksIncludingCancelled = entityTasks.stream().filter(t -> !t.isDone(true)).collect(Collectors.toList());
- List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList());
if (openTasksIncludingCancelled.isEmpty()) break;
- if (time.isExpired() && !openTasksCancellable.isEmpty()) {
- LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) before rebinding again: " + openTasksCancellable);
+
+ List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList());
+ List<Task<?>> openTasksScheduled = openTasksCancellable.stream().filter(t -> t instanceof ScheduledTask).collect(Collectors.toList());
+
+ if (!openTasksScheduled.isEmpty()) {
+ // stop scheduled tasks immediately
+ openTasksScheduled.forEach(t -> t.cancel(false));
+ continue;
+ }
+
+ if (timeBeforeCancelling!=null && timeBeforeCancelling.isExpired() && !openTasksCancellable.isEmpty()) {
+ LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) "+reason+": " + openTasksCancellable);
openTasksCancellable.forEach(t -> t.cancel(true));
+ timeBeforeCancelling = null;
}
- if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) {
- LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again: " + openTasksIncludingCancelled);
+
+ if (timeBeforeAbandoning.isExpired()) break;
+
+ if (timeBeforeAbandoning.getDurationElapsed().isShorterThan(backoff)) {
+ LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) "+reason+": " + openTasksIncludingCancelled);
}
- LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again, details: " +
- openTasksIncludingCancelled.stream().map(t -> ""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList()));
- Time.sleep(Duration.millis(200));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) " + reason + ", details: " +
+ openTasksIncludingCancelled.stream().map(t -> "" + t + "(" + BrooklynTaskTags.getContextEntity(t) + ")").collect(Collectors.toList()));
+ }
+
+ Time.sleep(Duration.min(timeBeforeAbandoning.getDurationRemaining(), backoff));
+ backoff = Duration.min(backoff.multiply(2), Duration.millis(200));
+
} while (true);
entityTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask);
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java
index ea3ee5e..c24edcd 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java
@@ -85,7 +85,7 @@ public class ManagementPlaneIdTest {
checkPlaneIdPersisted(mgmt);
}
- @Test
+ @Test(groups="Integration") // because slow
public void testPlaneIdRolledBack() throws Exception {
final LocalManagementContext mgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.AUTO);
@@ -101,7 +101,7 @@ public class ManagementPlaneIdTest {
});
}
- @Test
+ @Test(groups="Integration") // because slow
public void testColdRebindInitialisesPlaneId() throws Exception {
final LocalManagementContext origMgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.DISABLED);
checkPlaneIdPersisted(origMgmt);
@@ -136,7 +136,7 @@ public class ManagementPlaneIdTest {
});
}
- @Test
+ @Test(groups="Integration") // because slow
public void testHaFailoverKeepsPlaneId() throws Exception {
final LocalManagementContext origMgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.MASTER);
final LocalManagementContext rebindMgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.STANDBY);