You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2021/09/14 21:56:36 UTC

[brooklyn-server] 17/27: stop tasks when ending RO mode also, more cleanly restart on rebind/promotion

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 109876b10579641964aea9627e67259e04dd13e7
Author: Alex Heneveld <al...@cloudsoftcorp.com>
AuthorDate: Tue Sep 14 14:08:40 2021 +0100

    stop tasks when ending RO mode also, more cleanly restart on rebind/promotion
---
 .../core/mgmt/ha/HighAvailabilityManagerImpl.java  |  5 ++
 .../brooklyn/core/mgmt/rebind/RebindIteration.java |  3 +-
 .../core/mgmt/rebind/RebindManagerImpl.java        | 54 +++++++++++++++++-----
 .../core/mgmt/rebind/ManagementPlaneIdTest.java    |  6 +--
 4 files changed, 53 insertions(+), 15 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
index 00f511a..d3ad012 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
@@ -990,6 +990,9 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
      * (e.g. during the periodic rebind as hot_stanby we will not repeatedly clear the brooklyn-managed-bundles).
      */
     protected void clearManagedItems(ManagementTransitionMode mode) {
+        // note, tasks are cancelled prior to this, when coming from RO mode, via
+        // RebindManagerImpl.stopEntityAndDoneTasksBeforeRebinding
+
         // log this because it may be surprising, it is just HA transitions,
         // not symmetric with usual single-node start
         LOG.info("Clearing all managed items on transition to "+mode);
@@ -1018,6 +1021,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
         ((BasicBrooklynCatalog)managementContext.getCatalog()).reset(CatalogDto.newEmptyInstance("<reset-by-ha-status-change>"));
         ((BasicBrooklynTypeRegistry)managementContext.getTypeRegistry()).clear();
         managementContext.getCatalogInitialization().clearBrooklynManagedBundles();
+
+        ((LocalManagementContext)managementContext).getGarbageCollector().gcTasks();
     }
     
     /** Starts hot standby or hot backup, in foreground
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
index 69dd588..bf28d65 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
@@ -265,7 +265,7 @@ public abstract class RebindIteration {
     protected void doRun() throws Exception {
         if (readOnlyRebindCount.get() > 1) {
             // prevent leaking
-            rebindManager.stopEntityAndDoneTasksBeforeRebinding();
+            rebindManager.stopEntityAndDoneTasksBeforeRebinding("before next read-only rebind", Duration.seconds(10), Duration.seconds(20));
         }
 
         loadManifestFiles();
@@ -560,6 +560,7 @@ public abstract class RebindIteration {
                 try {
                     Feed feed = instantiator.newFeed(feedMemento);
                     rebindContext.registerFeed(feedMemento.getId(), feed);
+                    // started during associateAdjunctsWithEntities by RebindAdjuncts
                 } catch (Exception e) {
                     exceptionHandler.onCreateFailed(BrooklynObjectType.FEED, feedMemento.getId(), feedMemento.getType(), e);
                 }
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index 2db7063..781a8c1 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -47,6 +47,7 @@ import org.apache.brooklyn.api.mgmt.rebind.mementos.TreeNode;
 import org.apache.brooklyn.api.objs.BrooklynObject;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.BrooklynFeatureEnablement;
+import org.apache.brooklyn.core.BrooklynVersion;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.enricher.AbstractEnricher;
 import org.apache.brooklyn.core.entity.Entities;
@@ -398,29 +399,60 @@ public class RebindManagerImpl implements RebindManager {
             readOnlyTask = null;
             LOG.debug("Stopped read-only rebinding ("+this+"), mgmt "+managementContext.getManagementNodeId());
         }
-        stopEntityAndDoneTasksBeforeRebinding();
+        // short waits when promoting
+        stopEntityAndDoneTasksBeforeRebinding("when stopping hot proxy read-only mode",
+                Duration.seconds(2),
+                Duration.seconds(5));
+        // note, items are subsequently unmanaged via:
+        // HighAvailabilityManagerImpl.clearManagedItems
     }
 
-    public void stopEntityAndDoneTasksBeforeRebinding() {
+    public void stopEntityAndDoneTasksBeforeRebinding(String reason, Duration delayBeforeCancelling, Duration delayBeforeAbandoning) {
+        // TODO inputs should be configurable
+
+        if (!managementContext.isRunning() || managementContext.getExecutionManager().isShutdown()) {
+            return;
+        }
+
         // wait for tasks
         Collection<Task<?>> entityTasks = ((BasicExecutionManager) managementContext.getExecutionManager()).allTasksLive()
                 .stream().filter(t -> BrooklynTaskTags.getContextEntity(t) != null).collect(Collectors.toList());
         List<Task<?>> openTasksIncludingCancelled;
-        CountdownTimer time = CountdownTimer.newInstanceStarted(Duration.seconds(15));
+        CountdownTimer timeBeforeCancelling = CountdownTimer.newInstanceStarted(delayBeforeCancelling);
+        CountdownTimer timeBeforeAbandoning = CountdownTimer.newInstanceStarted(delayBeforeAbandoning);
+        Duration backoff = Duration.millis(10);
         do {
             openTasksIncludingCancelled = entityTasks.stream().filter(t -> !t.isDone(true)).collect(Collectors.toList());
-            List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList());
             if (openTasksIncludingCancelled.isEmpty()) break;
-            if (time.isExpired() && !openTasksCancellable.isEmpty()) {
-                LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) before rebinding again: " + openTasksCancellable);
+
+            List<Task<?>> openTasksCancellable = openTasksIncludingCancelled.stream().filter(t -> !t.isDone()).collect(Collectors.toList());
+            List<Task<?>> openTasksScheduled = openTasksCancellable.stream().filter(t -> t instanceof ScheduledTask).collect(Collectors.toList());
+
+            if (!openTasksScheduled.isEmpty()) {
+                // stop scheduled tasks immediately
+                openTasksScheduled.forEach(t -> t.cancel(false));
+                continue;
+            }
+
+            if (timeBeforeCancelling!=null && timeBeforeCancelling.isExpired() && !openTasksCancellable.isEmpty()) {
+                LOG.warn("Aborting " + openTasksCancellable.size() + " incomplete task(s) "+reason+": " + openTasksCancellable);
                 openTasksCancellable.forEach(t -> t.cancel(true));
+                timeBeforeCancelling = null;
             }
-            if (time.getDurationElapsed().isShorterThan(Duration.millis(200))) {
-                LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again: " + openTasksIncludingCancelled);
+
+            if (timeBeforeAbandoning.isExpired()) break;
+
+            if (timeBeforeAbandoning.getDurationElapsed().isShorterThan(backoff)) {
+                LOG.info("Waiting on " + openTasksIncludingCancelled.size() + " task(s) "+reason+": " + openTasksIncludingCancelled);
             }
-            LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) before rebinding again, details: " +
-                    openTasksIncludingCancelled.stream().map(t -> ""+t+"("+BrooklynTaskTags.getContextEntity(t)+")").collect(Collectors.toList()));
-            Time.sleep(Duration.millis(200));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Waiting on " + openTasksIncludingCancelled.size() + " task(s) " + reason + ", details: " +
+                        openTasksIncludingCancelled.stream().map(t -> "" + t + "(" + BrooklynTaskTags.getContextEntity(t) + ")").collect(Collectors.toList()));
+            }
+
+            Time.sleep(Duration.min(timeBeforeAbandoning.getDurationRemaining(), backoff));
+            backoff = Duration.min(backoff.multiply(2), Duration.millis(200));
+
         } while (true);
 
         entityTasks.forEach(((BasicExecutionManager) managementContext.getExecutionManager())::deleteTask);
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java
index ea3ee5e..c24edcd 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/ManagementPlaneIdTest.java
@@ -85,7 +85,7 @@ public class ManagementPlaneIdTest {
         checkPlaneIdPersisted(mgmt);
     }
 
-    @Test
+    @Test(groups="Integration")  // because slow
     public void testPlaneIdRolledBack() throws Exception {
         final LocalManagementContext mgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.AUTO);
 
@@ -101,7 +101,7 @@ public class ManagementPlaneIdTest {
         });
     }
 
-    @Test
+    @Test(groups="Integration")  // because slow
     public void testColdRebindInitialisesPlaneId() throws Exception {
         final LocalManagementContext origMgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.DISABLED);
         checkPlaneIdPersisted(origMgmt);
@@ -136,7 +136,7 @@ public class ManagementPlaneIdTest {
         });
     }
 
-    @Test
+    @Test(groups="Integration")  // because slow
     public void testHaFailoverKeepsPlaneId() throws Exception {
         final LocalManagementContext origMgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.MASTER);
         final LocalManagementContext rebindMgmt = createManagementContext(PersistMode.AUTO, HighAvailabilityMode.STANDBY);