You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/07/29 21:32:19 UTC

[24/31] git commit: unmanage and stop rebind on failure, before publishing the new state, and cancel tasks on demotion

unmanage and stop rebind on failure, before publishing the new state, and cancel tasks on demotion


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/13299c62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/13299c62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/13299c62

Branch: refs/heads/master
Commit: 13299c62f8c55fec90630ff5618dd4857dc8f2f9
Parents: 08a91f6
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Jul 23 16:56:24 2014 -0700
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Jul 29 14:40:46 2014 -0400

----------------------------------------------------------------------
 .../brooklyn/entity/rebind/RebindManager.java   |  1 +
 .../entity/basic/AbstractApplication.java       |  2 +-
 .../ha/HighAvailabilityManagerImpl.java         | 59 ++++++++++++++++----
 3 files changed, 50 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/13299c62/api/src/main/java/brooklyn/entity/rebind/RebindManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/entity/rebind/RebindManager.java b/api/src/main/java/brooklyn/entity/rebind/RebindManager.java
index 854aa78..978fa3f 100644
--- a/api/src/main/java/brooklyn/entity/rebind/RebindManager.java
+++ b/api/src/main/java/brooklyn/entity/rebind/RebindManager.java
@@ -74,6 +74,7 @@ public interface RebindManager {
      */
     public void start();
 
+    /** Stops persisting. Waits for any current persistence to complete. */
     public void stop();
 
     /** @deprecated since 0.7.0; use {@link #waitForPendingComplete(Duration)} */

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/13299c62/core/src/main/java/brooklyn/entity/basic/AbstractApplication.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/AbstractApplication.java b/core/src/main/java/brooklyn/entity/basic/AbstractApplication.java
index 3ded835..b7edf59 100644
--- a/core/src/main/java/brooklyn/entity/basic/AbstractApplication.java
+++ b/core/src/main/java/brooklyn/entity/basic/AbstractApplication.java
@@ -224,7 +224,7 @@ public abstract class AbstractApplication extends AbstractEntity implements Star
             throw e;
         } catch (RuntimeException e) {
             if (getManagementContext().isRunning()) {
-                log.warn("Problem storing application event "+state+" for "+this, e);
+                log.warn("Problem recording application event '"+state+"' for "+this, e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/13299c62/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java b/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java
index f42abad..5a13d31 100644
--- a/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java
+++ b/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collection;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import brooklyn.BrooklynVersion;
 import brooklyn.entity.Application;
+import brooklyn.entity.Entity;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.rebind.RebindManager;
 import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord;
@@ -43,6 +45,7 @@ import brooklyn.management.ha.BasicMasterChooser.AlphabeticMasterChooser;
 import brooklyn.management.ha.ManagementPlaneSyncRecordPersister.Delta;
 import brooklyn.management.internal.ManagementContextInternal;
 import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.task.BasicTask;
 import brooklyn.util.task.ScheduledTask;
@@ -321,14 +324,6 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
         if (LOG.isTraceEnabled()) LOG.trace("Published management-node health: {}", memento);
     }
     
-    /**
-     * Publishes (via {@link #persister}) the state of this management node with itself set to master.
-     */
-    protected synchronized void publishDemotionFromMasterOnFailure() {
-        checkState(getNodeState() == ManagementNodeState.FAILED, "node status must be failed on publish, but is %s", getNodeState());
-        publishDemotionFromMaster(true);
-    }
-    
     protected synchronized void publishDemotionFromMaster(boolean clearMaster) {
         checkState(getNodeState() != ManagementNodeState.MASTER, "node status must not be master when demoting", getNodeState());
         
@@ -403,6 +398,12 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
     protected void checkMaster(boolean initializing) {
         ManagementPlaneSyncRecord memento = loadManagementPlaneSyncRecord(false);
         
+        if (getNodeState() == ManagementNodeState.FAILED) {
+            // if we have failed then no point in checking who is master
+            // (if somehow this node is subsequently clearFailure() then it will resume)
+            return;
+        }
+        
         String currMasterNodeId = memento.getMasterNodeId();
         ManagementNodeSyncRecord currMasterNodeRecord = memento.getManagementNodes().get(currMasterNodeId);
         ManagementNodeSyncRecord ownNodeRecord = memento.getManagementNodes().get(ownNodeId);
@@ -502,12 +503,17 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
             managementContext.getRebindManager().rebind(managementContext.getCatalog().getRootClassLoader());
         } catch (Exception e) {
             LOG.error("Management node enountered problem during rebind when promoting self to master; demoting to FAILED and rethrowing: "+e);
-            nodeState = ManagementNodeState.FAILED;
-            publishDemotionFromMasterOnFailure();
+            demoteToFailed();
             throw Exceptions.propagate(e);
         }
         managementContext.getRebindManager().start();
     }
+    
+    protected void demoteToFailed() {
+        nodeState = ManagementNodeState.FAILED;
+        onDemotion();
+        publishDemotionFromMaster(true);
+    }
 
     protected void demoteToStandby() {
         if (!running) {
@@ -516,10 +522,41 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
         }
 
         nodeState = ManagementNodeState.STANDBY;
+        onDemotion();
+        publishDemotionFromMaster(false);
+    }
+    
+    protected void onDemotion() {
         managementContext.getRebindManager().stop();
         for (Application app: managementContext.getApplications())
             Entities.unmanage(app);
-        publishDemotionFromMaster(false);
+        // let's try forcibly interrupting tasks on managed entities
+        Collection<Exception> exceptions = MutableSet.of();
+        int tasks = 0;
+        LOG.debug("Cancelling tasks on demotion");
+        try {
+            for (Entity entity: managementContext.getEntityManager().getEntities()) {
+                for (Task<?> t: managementContext.getExecutionContext(entity).getTasks()) {
+                    if (!t.isDone()) {
+                        tasks++;
+                        try {
+                            LOG.debug("Cancelling "+t+" on "+entity);
+                            t.cancel(true);
+                        } catch (Exception e) {
+                            Exceptions.propagateIfFatal(e);
+                            exceptions.add(e);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            Exceptions.propagateIfFatal(e);
+            LOG.warn("Error inspecting tasks to cancel on demotion: "+e, e);
+        }
+        if (!exceptions.isEmpty())
+            LOG.warn("Error when cancelling tasks on demotion: "+Exceptions.create(exceptions));
+        if (tasks>0)
+            LOG.info("Cancelled "+tasks+" tasks on demotion");
     }
 
     /**