You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2013/03/14 17:44:20 UTC

svn commit: r1456523 - in /incubator/ambari/trunk: ./ ambari-server/src/main/java/org/apache/ambari/server/ ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ ambari-server/src/main/java/org/apache/ambari/server/controller/ ambari-serv...

Author: swagle
Date: Thu Mar 14 16:44:19 2013
New Revision: 1456523

URL: http://svn.apache.org/r1456523
Log:
AMBARI-1601. Server level action support. (Sumit Mohanty via swagle)

Added:
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java
Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/Role.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
    incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
    incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
    incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
    incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
    incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java
    incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Thu Mar 14 16:44:19 2013
@@ -12,6 +12,8 @@ Trunk (unreleased changes):
 
  NEW FEATURES
 
+ AMBARI-1601. Server level action support. (Sumit Mohanty via swagle)
+
  AMBARI-1620. Add heatmaps for Host and Hbase section. (jaimin)
  
  AMBARI-1634. Integrate Frontend Security work to enable security on

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/Role.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/Role.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/Role.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/Role.java Thu Mar 14 16:44:19 2013
@@ -67,5 +67,6 @@ public enum Role {
   GMETAD_SERVICE_CHECK,
   MONITOR_WEBSERVER,
   DECOMMISSION_DATANODE,
-  HUE_SERVER
+  HUE_SERVER,
+  AMBARI_SERVER_ACTION
 }

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java Thu Mar 14 16:44:19 2013
@@ -107,6 +107,11 @@ public class ActionDBInMemoryImpl implem
   public synchronized void updateHostRoleState(String hostname, long requestId,
       long stageId, String role, CommandReport report) {
     LOG.info("DEBUG stages to iterate: "+stageList.size());
+    if(null == report.getStatus()
+        || null == report.getStdOut()
+        || null == report.getStdErr()) {
+      throw new RuntimeException("Badly formed command report.");
+    }
     for (Stage s : stageList) {
       if (s.getRequestId() == requestId && s.getStageId() == stageId) {
         s.setHostRoleStatus(hostname, role,

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java Thu Mar 14 16:44:19 2013
@@ -23,6 +23,7 @@ import com.google.inject.name.Named;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.controller.HostsMap;
+import org.apache.ambari.server.serveraction.ServerActionManager;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
@@ -49,12 +50,13 @@ public class ActionManager {
   @Inject
   public ActionManager(@Named("schedulerSleeptime") long schedulerSleepTime,
       @Named("actionTimeout") long actionTimeout,
-      ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap) {
+      ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap,
+      ServerActionManager serverActionManager) {
     this.actionQueue = aq;
     this.db = db;
     this.hostsMap = hostsMap;
     scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db,
-        actionQueue, fsm, 2, hostsMap);
+        actionQueue, fsm, 2, hostsMap, serverActionManager);
     requestCounter = new AtomicLong(
         db.getLastPersistedRequestIdWhenInitialized());
   }

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java Thu Mar 14 16:44:19 2013
@@ -21,8 +21,11 @@ import org.apache.ambari.server.AmbariEx
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.controller.HostsMap;
+import org.apache.ambari.server.serveraction.ServerAction;
+import org.apache.ambari.server.serveraction.ServerActionManager;
 import org.apache.ambari.server.state.*;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
@@ -50,6 +53,7 @@ class ActionScheduler implements Runnabl
   private boolean taskTimeoutAdjustment = true;
   private final HostsMap hostsMap;
   private final Object wakeupSyncObject = new Object();
+  private final ServerActionManager serverActionManager;
 
   /**
    * true if scheduler should run ASAP.
@@ -60,7 +64,7 @@ class ActionScheduler implements Runnabl
 
   public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
       ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
-      int maxAttempts, HostsMap hostsMap) {
+      int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager) {
     this.sleepTime = sleepTimeMilliSec;
     this.hostsMap = hostsMap;
     this.actionTimeout = actionTimeoutMilliSec;
@@ -68,6 +72,7 @@ class ActionScheduler implements Runnabl
     this.actionQueue = actionQueue;
     this.fsmObject = fsmObject;
     this.maxAttempts = (short) maxAttempts;
+    this.serverActionManager = serverActionManager;
   }
 
   public void start() {
@@ -135,7 +140,7 @@ class ActionScheduler implements Runnabl
       for (String role : roleStats.keySet()) {
         RoleStats stats = roleStats.get(role);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Stats for role:"+role+", stats="+stats);
+          LOG.debug("Stats for role:" + role + ", stats=" + stats);
         }
         if (stats.isRoleFailed()) {
           failed = true;
@@ -151,18 +156,32 @@ class ActionScheduler implements Runnabl
 
       //Schedule what we have so far
       for (ExecutionCommand cmd : commandsToSchedule) {
-        try {
-          scheduleHostRole(s, cmd);
-        } catch (InvalidStateTransitionException e) {
-          LOG.warn("Could not schedule host role "+cmd.toString(), e);
-          db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(),
-              cmd.getRole());
+        if (cmd.getRole() == Role.AMBARI_SERVER_ACTION) {
+          try {
+            long now = System.currentTimeMillis();
+            s.setStartTime(cmd.getHostname(), cmd.getRole().toString(), now);
+            s.setLastAttemptTime(cmd.getHostname(), cmd.getRole().toString(), now);
+            String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME);
+            this.serverActionManager.executeAction(actionName, cmd.getCommandParams());
+            reportServerActionSuccess(s, cmd);
+          } catch (AmbariException e) {
+            LOG.warn("Could not execute server action " + cmd.toString(), e);
+            reportServerActionFailure(s, cmd, e.getMessage());
+          }
+        } else {
+          try {
+            scheduleHostRole(s, cmd);
+          } catch (InvalidStateTransitionException e) {
+            LOG.warn("Could not schedule host role " + cmd.toString(), e);
+            db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(),
+                cmd.getRole());
+          }
         }
       }
 
       //Check if ready to go to next stage
       boolean goToNextStage = true;
-      for (String role: roleStats.keySet()) {
+      for (String role : roleStats.keySet()) {
         RoleStats stats = roleStats.get(role);
         if (!stats.isSuccessFactorMet()) {
           goToNextStage = false;
@@ -175,6 +194,26 @@ class ActionScheduler implements Runnabl
     }
   }
 
+  private void reportServerActionSuccess(Stage stage, ExecutionCommand cmd) {
+    CommandReport report = new CommandReport();
+    report.setStatus(HostRoleStatus.COMPLETED.toString());
+    report.setExitCode(0);
+    report.setStdOut("Server action succeeded");
+    report.setStdErr("");
+    db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
+                           cmd.getRole().toString(), report);
+  }
+
+  private void reportServerActionFailure(Stage stage, ExecutionCommand cmd, String message) {
+    CommandReport report = new CommandReport();
+    report.setStatus(HostRoleStatus.FAILED.toString());
+    report.setExitCode(1);
+    report.setStdOut("Server action failed");
+    report.setStdErr(message);
+    db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
+                           cmd.getRole().toString(), report);
+  }
+
   /**
    * @param commandsToSchedule
    * @return Stats for the roles in the stage. It is used to determine whether stage

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java Thu Mar 14 16:44:19 2013
@@ -34,7 +34,9 @@ import org.apache.ambari.server.orm.enti
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
 import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.serveraction.ServerAction;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
 import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -207,6 +209,49 @@ public class Stage {
     execCmdList.add(wrapper);
   }
 
+  public synchronized void addServerActionCommand(
+      String actionName, Role role,  RoleCommand command, String clusterName,
+      ServiceComponentHostUpgradeEvent event, String hostName) {
+    HostRoleCommand hrc = new HostRoleCommand(hostName, role, event, command);
+    ExecutionCommand cmd = new ExecutionCommand();
+    ExecutionCommandWrapper wrapper = new ExecutionCommandWrapper(cmd);
+    hrc.setExecutionCommandWrapper(wrapper);
+    cmd.setHostname(hostName);
+    cmd.setClusterName(clusterName);
+    cmd.setServiceName("");
+    cmd.setCommandId(this.getActionId());
+    cmd.setRole(role);
+    cmd.setRoleCommand(command);
+
+    Map<String, String> roleParams = new HashMap<String, String>();
+    roleParams.put(ServerAction.ACTION_NAME, actionName);
+    cmd.setRoleParams(roleParams);
+    Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(hostName);
+    if (hrcMap == null) {
+      hrcMap = new TreeMap<String, HostRoleCommand>();
+      this.hostRoleCommands.put(hostName, hrcMap);
+    }
+    if (hrcMap.get(role.toString()) != null) {
+      throw new RuntimeException(
+          "Setting the server action the second time for same stage: stage="
+              + this.getActionId() + ", action=" + actionName);
+    }
+    hrcMap.put(role.toString(), hrc);
+    List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(hostName);
+    if (execCmdList == null) {
+      execCmdList = new ArrayList<ExecutionCommandWrapper>();
+      this.commandsToSend.put(hostName, execCmdList);
+    }
+
+    if (execCmdList.contains(wrapper)) {
+      //todo: proper exception
+      throw new RuntimeException(
+          "Setting the execution command second time for same stage: stage="
+              + this.getActionId() + ", action=" + actionName);
+    }
+    execCmdList.add(wrapper);
+  }
+
   /**
    *
    * @return list of hosts

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java Thu Mar 14 16:44:19 2013
@@ -55,6 +55,7 @@ import org.apache.ambari.server.metadata
 import org.apache.ambari.server.metadata.RoleCommandOrder;
 import org.apache.ambari.server.security.authorization.User;
 import org.apache.ambari.server.security.authorization.Users;
+import org.apache.ambari.server.serveraction.ServerAction;
 import org.apache.ambari.server.stageplanner.RoleGraph;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
@@ -1508,6 +1509,8 @@ public class AmbariManagementControllerI
        */
       cluster.refresh();
       if (requestedVersion.equals(cluster.getCurrentStackVersion())) {
+        LOG.info("Update cluster request version matches the current"
+                  + ", version=" + request);
         return null;
       }
 
@@ -1536,12 +1539,47 @@ public class AmbariManagementControllerI
       requestParameters.put(Configuration.UPGRADE_TO_STACK, gson.toJson(requestedVersion));
       requestParameters.put(Configuration.UPGRADE_FROM_STACK, gson.toJson(currentVersion));
 
-      return doStageCreation(cluster, changedServices, changedComps, changedScHosts, requestParameters);
+      List<Stage> stages = doStageCreation(cluster, changedServices,
+          changedComps, changedScHosts, requestParameters);
+
+      if (stages == null || stages.isEmpty()) {
+        return null;
+      }
+
+      addFinalizeUpgradeAction(cluster, stages);
+      persistStages(stages);
+      updateServiceStates(changedServices, changedComps, changedScHosts);
+      return getRequestStatusResponse(stages.get(0).getRequestId());
     }
 
     return null;
   }
 
+  private void addFinalizeUpgradeAction(Cluster cluster, List<Stage> stages) throws AmbariException {
+    // Add server side action as the last Stage
+    Stage lastStage = stages.get(stages.size() - 1);
+    Stage newStage = createNewStage(cluster, lastStage.getRequestId());
+    newStage.setStageId(lastStage.getStageId() + 1);
+
+    // Add an arbitrary host name as server actions are executed on the server
+    String hostName = lastStage.getOrderedHostRoleCommands().get(0).getHostName();
+
+    Map<String, String> payload = new HashMap<String, String>();
+    payload.put(ServerAction.PayloadName.CLUSTER_NAME, cluster.getClusterName());
+    payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, cluster.getDesiredStackVersion().getStackId());
+
+    ServiceComponentHostUpgradeEvent event = new ServiceComponentHostUpgradeEvent(
+        Role.AMBARI_SERVER_ACTION.toString(), hostName,
+        System.currentTimeMillis(), cluster.getDesiredStackVersion().getStackId());
+    newStage.addServerActionCommand(ServerAction.Command.FINALIZE_UPGRADE, Role.AMBARI_SERVER_ACTION,
+        RoleCommand.EXECUTE, cluster.getClusterName(), event, hostName);
+    ExecutionCommand execCmd = newStage.getExecutionCommandWrapper(hostName,
+        Role.AMBARI_SERVER_ACTION.toString()).getExecutionCommand();
+
+    execCmd.setCommandParams(payload);
+    stages.add(newStage);
+  }
+
   private boolean isUpgradeAllowed(StackInfo requestedStackInfo, StackId currentStackId) {
     String minUpgradeVersion = requestedStackInfo.getMinUpgradeVersion();
     if (minUpgradeVersion != null && !minUpgradeVersion.isEmpty()) {
@@ -1679,7 +1717,7 @@ public class AmbariManagementControllerI
     return null;
   }
 
-  private RequestStatusResponse doStageCreation(Cluster cluster,
+  private List<Stage> doStageCreation(Cluster cluster,
       Map<State, List<Service>> changedServices,
       Map<State, List<ServiceComponent>> changedComps,
       Map<String, Map<State, List<ServiceComponentHost>>> changedScHosts,
@@ -2006,17 +2044,28 @@ public class AmbariManagementControllerI
 
       RoleGraph rg = new RoleGraph(rco);
       rg.build(stage);
-      stages = rg.getStages();
+      return rg.getStages();
+    }
 
+    return null;
+  }
+
+  private void persistStages(List<Stage> stages) {
+    if(stages != null && stages.size() > 0) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Triggering Action Manager"
-            + ", clusterName=" + cluster.getClusterName()
-            + ", requestId=" + requestId.longValue()
+            + ", clusterName=" + stages.get(0).getClusterName()
+            + ", requestId=" + stages.get(0).getRequestId()
             + ", stagesCount=" + stages.size());
       }
       actionManager.sendActions(stages);
     }
+  }
 
+  private void updateServiceStates(
+      Map<State, List<Service>> changedServices,
+      Map<State, List<ServiceComponent>> changedComps,
+      Map<String, Map<State, List<ServiceComponentHost>>> changedScHosts) {
     if (changedServices != null) {
       for (Entry<State, List<Service>> entry : changedServices.entrySet()) {
         State newState = entry.getKey();
@@ -2050,12 +2099,6 @@ public class AmbariManagementControllerI
         }
       }
     }
-
-    if (stages == null || stages.isEmpty()
-        || requestId == null) {
-      return null;
-    }
-    return getRequestStatusResponse(requestId.longValue());
   }
 
   private boolean isValidStateTransition(State oldState,
@@ -2429,8 +2472,15 @@ public class AmbariManagementControllerI
 
     Cluster cluster = clusters.getCluster(clusterNames.iterator().next());
 
-    return doStageCreation(cluster, changedServices,
+    List<Stage> stages = doStageCreation(cluster, changedServices,
         changedComps, changedScHosts, null);
+    persistStages(stages);
+    updateServiceStates(changedServices, changedComps, changedScHosts);
+    if (stages == null || stages.isEmpty()) {
+      return null;
+    }
+
+    return getRequestStatusResponse(stages.get(0).getRequestId());
   }
 
   @Override
@@ -2688,10 +2738,18 @@ public class AmbariManagementControllerI
 
     Cluster cluster = clusters.getCluster(clusterNames.iterator().next());
 
-    return doStageCreation(cluster, null,
+    List<Stage> stages = doStageCreation(cluster, null,
         changedComps, changedScHosts, null);
+    persistStages(stages);
+    updateServiceStates(null, changedComps, changedScHosts);
+    if (stages == null || stages.isEmpty()) {
+      return null;
+    }
+
+    return getRequestStatusResponse(stages.get(0).getRequestId());
   }
 
+
   @Override
   public synchronized void updateHosts(Set<HostRequest> requests)
       throws AmbariException {
@@ -2969,7 +3027,6 @@ public class AmbariManagementControllerI
           + " state changes for a set of service components at the same time");
     }
 
-
     // TODO additional validation?
     for (ServiceComponentHostRequest request : requests) {
       Cluster cluster = clusters.getCluster(request.getClusterName());
@@ -2996,8 +3053,15 @@ public class AmbariManagementControllerI
 
     Cluster cluster = clusters.getCluster(clusterNames.iterator().next());
 
-    return doStageCreation(cluster, null,
+    List<Stage> stages = doStageCreation(cluster, null,
         null, changedScHosts, null);
+    persistStages(stages);
+    updateServiceStates(null, null, changedScHosts);
+    if (stages == null || stages.isEmpty()) {
+      return null;
+    }
+
+    return getRequestStatusResponse(stages.get(0).getRequestId());
   }
 
   @Override

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java Thu Mar 14 16:44:19 2013
@@ -27,6 +27,8 @@ import org.apache.ambari.server.actionma
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.orm.PersistenceType;
+import org.apache.ambari.server.serveraction.ServerActionManager;
+import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
 import org.apache.ambari.server.state.*;
 import org.apache.ambari.server.state.cluster.ClusterFactory;
 import org.apache.ambari.server.state.cluster.ClusterImpl;
@@ -83,7 +85,6 @@ public class ControllerModule extends Ab
 
     install(jpaPersistModule);
 
-
     bind(Gson.class).in(Scopes.SINGLETON);
     bind(Clusters.class).to(ClustersImpl.class);
     bind(ActionDBAccessor.class).to(ActionDBAccessorImpl.class);
@@ -92,7 +93,7 @@ public class ControllerModule extends Ab
     bind(AmbariManagementController.class)
         .to(AmbariManagementControllerImpl.class);
     bind(HBaseMasterPortScanner.class).in(Singleton.class);;
-
+    bind(ServerActionManager.class).to(ServerActionManagerImpl.class);
   }
 
   private void installFactories() {

Added: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java?rev=1456523&view=auto
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java (added)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java Thu Mar 14 16:44:19 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction;
+
+public class ServerAction {
+
+  public static final String ACTION_NAME = "ACTION_NAME";
+
+  /**
+   * The commands supported by the server. A command is a named alias to the
+   * action implementation at the server
+   */
+  public static class Command {
+    /**
+     * Finalize the upgrade request
+     */
+    public static final String FINALIZE_UPGRADE = "FINALIZE_UPGRADE";
+  }
+
+  public static class PayloadName {
+    public final static String CURRENT_STACK_VERSION = "current_stack_version";
+    public final static String CLUSTER_NAME = "cluster_name";
+  }
+}

Added: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java?rev=1456523&view=auto
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java (added)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java Thu Mar 14 16:44:19 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction;
+
+import org.apache.ambari.server.AmbariException;
+
+import java.util.Map;
+
+/**
+ * Server action manager interface.
+ */
+public interface ServerActionManager {
+
+  public void executeAction(String actionName, Map<String, String> payload)
+      throws AmbariException;
+}

Added: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java?rev=1456523&view=auto
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java (added)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java Thu Mar 14 16:44:19 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction;
+
+import com.google.inject.Singleton;
+import com.google.inject.Inject;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.StackId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Server action manager implementation.
+ */
+@Singleton
+public class ServerActionManagerImpl implements ServerActionManager {
+
+  private final static Logger LOG =
+      LoggerFactory.getLogger(ServerActionManagerImpl.class);
+
+  private Clusters clusters;
+
+  @Inject
+  public ServerActionManagerImpl(Clusters clusters) {
+    this.clusters = clusters;
+  }
+
+  @Override
+  public void executeAction(String actionName, Map<String, String> payload)
+      throws AmbariException {
+    LOG.info("Executing server action : "
+        + actionName + " with payload "
+        + payload);
+
+    if (actionName.equals(ServerAction.Command.FINALIZE_UPGRADE)) {
+      updateClusterStackVersion(payload);
+    } else {
+      throw new AmbariException("Unsupported action " + actionName);
+    }
+  }
+
+  private void updateClusterStackVersion(Map<String, String> payload) throws AmbariException {
+    if (payload == null
+        || !payload.containsKey(ServerAction.PayloadName.CLUSTER_NAME)
+        || !payload.containsKey(ServerAction.PayloadName.CURRENT_STACK_VERSION)) {
+      throw new AmbariException("Invalid payload.");
+    }
+
+    StackId currentStackId = new StackId(payload.get(ServerAction.PayloadName.CURRENT_STACK_VERSION));
+    final Cluster cluster = clusters.getCluster(payload.get(ServerAction.PayloadName.CLUSTER_NAME));
+    cluster.setCurrentStackVersion(currentStackId);
+    cluster.refresh();
+  }
+}

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java Thu Mar 14 16:44:19 2013
@@ -494,6 +494,7 @@ public class ClusterImpl implements Clus
         } else {
           clusterStateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
           clusterStateDAO.merge(clusterStateEntity);
+          clusterEntity = clusterDAO.merge(clusterEntity);
         }
     } catch (RollbackException e) {
       LOG.warn("Unable to set version " + stackVersion + " for cluster " + getClusterName());

Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java Thu Mar 14 16:44:19 2013
@@ -79,7 +79,7 @@ public class TestActionDBAccessorImpl {
     db = injector.getInstance(ActionDBAccessorImpl.class);
     
     am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
-        new HostsMap((String) null));
+        new HostsMap((String) null), null);
   }
 
   @After

Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java Thu Mar 14 16:44:19 2013
@@ -76,7 +76,7 @@ public class TestActionManager {
   public void testActionResponse() {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
-        clusters, db, new HostsMap((String) null));
+        clusters, db, new HostsMap((String) null), null);
     populateActionDB(db, hostname);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());
@@ -112,7 +112,7 @@ public class TestActionManager {
   public void testLargeLogs() {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
-        clusters, db, new HostsMap((String) null));
+        clusters, db, new HostsMap((String) null), null);
     populateActionDB(db, hostname);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());

Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java Thu Mar 14 16:44:19 2013
@@ -25,23 +25,28 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import junit.framework.Assert;
 
 import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionScheduler.RoleStats;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.AgentCommand;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.controller.HostsMap;
+import org.apache.ambari.server.serveraction.ServerAction;
+import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
 import org.apache.ambari.server.utils.StageUtils;
-import org.apache.ambari.server.utils.TestStageUtils;
 import org.junit.Test;
 
 public class TestActionScheduler {
@@ -74,7 +79,7 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
-        10000, new HostsMap((String) null));
+        10000, new HostsMap((String) null), null);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
     scheduler.start();
@@ -139,7 +144,7 @@ public class TestActionScheduler {
 
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, 
-        new HostsMap((String) null));
+        new HostsMap((String) null), null);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
     scheduler.start();
@@ -151,7 +156,82 @@ public class TestActionScheduler {
     assertEquals(stages.get(0).getHostRoleStatus(hostname, "NAMENODE"),
         HostRoleStatus.TIMEDOUT);
   }
-  
+
+  /**
+   * Test server action
+   */
+  @Test
+  public void testServerAction() throws Exception {
+    ActionQueue aq = new ActionQueue();
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    ActionDBAccessor db = new ActionDBInMemoryImpl();
+    String hostname = "ahost.ambari.apache.org";
+    List<Stage> stages = new ArrayList<Stage>();
+    Map<String, String> payload = new HashMap<String, String>();
+    payload.put(ServerAction.PayloadName.CLUSTER_NAME, "cluster1");
+    payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, "HDP-0.2");
+    Stage s = getStageWithServerAction(1, 977, hostname, payload);
+    stages.add(s);
+    db.persistActions(stages);
+
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), new ServerActionManagerImpl(fsm));
+    scheduler.start();
+
+    while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
+        .equals(HostRoleStatus.COMPLETED)) {
+      Thread.sleep(100);
+    }
+    scheduler.stop();
+    assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"),
+        HostRoleStatus.COMPLETED);
+
+    stages = new ArrayList<Stage>();
+    payload.remove(ServerAction.PayloadName.CLUSTER_NAME);
+    s = getStageWithServerAction(1, 23, hostname, payload);
+    stages.add(s);
+    db.persistActions(stages);
+
+    scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), new ServerActionManagerImpl(fsm));
+    scheduler.start();
+
+    while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
+        .equals(HostRoleStatus.FAILED)) {
+      Thread.sleep(100);
+    }
+    scheduler.stop();
+    assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"),
+        HostRoleStatus.FAILED);
+  }
+
+  private static Stage getStageWithServerAction(long requestId, long stageId, String hostName,
+                                                Map<String, String> payload) {
+    Stage stage = new Stage(requestId, "/tmp", "cluster1");
+    stage.setStageId(stageId);
+    long now = System.currentTimeMillis();
+    stage.addServerActionCommand(ServerAction.Command.FINALIZE_UPGRADE, Role.AMBARI_SERVER_ACTION,
+        RoleCommand.EXECUTE, "cluster1",
+        new ServiceComponentHostUpgradeEvent("AMBARI_SERVER_ACTION", hostName, now, "HDP-0.2"),
+        hostName);
+    ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
+        Role.AMBARI_SERVER_ACTION.toString()).getExecutionCommand();
+
+    execCmd.setCommandParams(payload);
+    return stage;
+  }
+
+
   @Test
   public void testSuccessFactors() {
     Stage s = StageUtils.getATestStage(1, 1);

Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java Thu Mar 14 16:44:19 2013
@@ -135,8 +135,7 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testHeartbeat() throws Exception {
-    ActionManager am = new ActionManager(0, 0, null, null,
-        new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Clusters fsm = clusters;
     fsm.addHost(DummyHostname1);
     Host hostObject = clusters.getHost(DummyHostname1);
@@ -174,8 +173,7 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testStatusHeartbeat() throws Exception {
-    ActionManager am = new ActionManager(0, 0, null, null,
-            new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
 
     Cluster cluster = getDummyCluster();
 
@@ -229,8 +227,7 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testLiveStatusUpdateAfterStopFailed() throws Exception {
-    ActionManager am = new ActionManager(0, 0, null, null,
-            new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -302,7 +299,7 @@ public class TestHeartbeatHandler {
     clusters.addCluster(DummyCluster);
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
-        new HostsMap((String) null));
+        new HostsMap((String) null), null);
     populateActionDB(db, DummyHostname1);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());
@@ -347,8 +344,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegistration() throws AmbariException,
       InvalidStateTransitionException {
-    ActionManager am = new ActionManager(0, 0, null, null,
-        new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
         injector);
@@ -373,8 +369,7 @@ public class TestHeartbeatHandler {
   
   @Test
   public void testRegistrationPublicHostname() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = new ActionManager(0, 0, null, null,
-        new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
         injector);
@@ -405,8 +400,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testInvalidOSRegistration() throws AmbariException,
       InvalidStateTransitionException {
-    ActionManager am = new ActionManager(0, 0, null, null,
-        new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
         injector);
@@ -433,8 +427,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegisterNewNode()
       throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = new ActionManager(0, 0, null, null,
-        new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Clusters fsm = clusters;
     fsm.addHost(DummyHostname1);
     Host hostObject = clusters.getHost(DummyHostname1);
@@ -522,8 +515,7 @@ public class TestHeartbeatHandler {
     HeartbeatMonitor hm = mock(HeartbeatMonitor.class);
     when(hm.generateStatusCommands(anyString())).thenReturn(dummyCmds);
 
-    ActionManager am = new ActionManager(0, 0, null, null,
-            new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Clusters fsm = clusters;
     ActionQueue actionQueue = new ActionQueue();
     HeartBeatHandler handler = new HeartBeatHandler(fsm, actionQueue, am,
@@ -548,8 +540,7 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testTaskInProgressHandling() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = new ActionManager(0, 0, null, null,
-            new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -602,8 +593,7 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testUpgradeSpecificHandling() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = new ActionManager(0, 0, null, null,
-        new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -692,8 +682,7 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testStatusHeartbeatWithVersion() throws Exception {
-    ActionManager am = new ActionManager(0, 0, null, null,
-        new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -762,8 +751,7 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testComponentUpgradeCompleteReport() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = new ActionManager(0, 0, null, null,
-            new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -840,8 +828,7 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testComponentUpgradeInProgressReport() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = new ActionManager(0, 0, null, null,
-            new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -920,8 +907,7 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testComponentUpgradeFailReport() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = new ActionManager(0, 0, null, null,
-            new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    ActionManager am = getMockActionManager();
     Cluster cluster = getDummyCluster();
 
     @SuppressWarnings("serial")
@@ -1001,6 +987,11 @@ public class TestHeartbeatHandler {
             State.INSTALL_FAILED, serviceComponentHost2.getState());
   }
 
+  private ActionManager getMockActionManager() {
+    return new ActionManager(0, 0, null, null,
+              new ActionDBInMemoryImpl(), new HostsMap((String) null), null);
+  }
+
 
   private ComponentStatus createComponentStatus(String clusterName, String serviceName, String message,
                                                 State state, String componentName, String stackVersion) {

Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java Thu Mar 14 16:44:19 2013
@@ -72,7 +72,6 @@ public class AmbariMetaInfoTest {
   private AmbariMetaInfo metaInfo = null;
   private final static Logger LOG =
       LoggerFactory.getLogger(AmbariMetaInfoTest.class);
-
   
 
   @Rule
@@ -267,8 +266,7 @@ public class AmbariMetaInfoTest {
     Assert.assertFalse(ambariMetaInfo.isSupportedStack(".svn", ""));
     Assert.assertFalse(ambariMetaInfo.isSupportedStack(".svn", ""));
   }
-  
-  
+
   @Test
   public void testGetComponent() throws Exception {
     ComponentInfo component = metaInfo.getComponent(STACK_NAME_HDP,
@@ -331,8 +329,7 @@ public class AmbariMetaInfoTest {
       Assert.assertTrue(e instanceof StackAccessException);
     }
   }
-  
-  
+
   @Test
   public void testGetStackInfo() throws Exception {
     StackInfo stackInfo = metaInfo.getStackInfo(STACK_NAME_HDP, STACK_VERSION_HDP);
@@ -345,8 +342,7 @@ public class AmbariMetaInfoTest {
       Assert.assertTrue(e instanceof StackAccessException);
     }
   }
-  
-  
+
   @Test
   public void testGetProperties() throws Exception {
     Set<PropertyInfo> properties = metaInfo.getProperties(STACK_NAME_HDP, STACK_VERSION_HDP, SERVICE_NAME_HDFS);
@@ -383,7 +379,5 @@ public class AmbariMetaInfoTest {
     } catch (StackAccessException e) {
       Assert.assertTrue(e instanceof StackAccessException);
     }
-    
   }
-  
 }

Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java?rev=1456523&r1=1456522&r2=1456523&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java Thu Mar 14 16:44:19 2013
@@ -54,6 +54,9 @@ import org.apache.ambari.server.orm.InMe
 import org.apache.ambari.server.orm.dao.RoleDAO;
 import org.apache.ambari.server.orm.entities.RoleEntity;
 import org.apache.ambari.server.security.authorization.Users;
+import org.apache.ambari.server.serveraction.ServerAction;
+import org.apache.ambari.server.serveraction.ServerActionManager;
+import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
@@ -4165,8 +4168,25 @@ public class AmbariManagementControllerT
     } catch (StackAccessException e) {
       Assert.assertTrue(e instanceof StackAccessException);
     }
+  }
 
+  @Test
+  public void testServerActionForUpgradeFinalization() throws AmbariException {
+    String clusterName = "foo1";
+    StackId currentStackId = new StackId("HDP-0.1");
+    StackId newStackId = new StackId("HDP-0.2");
+
+    createCluster(clusterName);
+    Cluster c = clusters.getCluster(clusterName);
+    c.setDesiredStackVersion(currentStackId);
+    Assert.assertTrue(c.getCurrentStackVersion().equals(currentStackId));
 
+    ServerActionManager serverActionManager = new ServerActionManagerImpl(clusters);
+    Map<String, String> payload = new HashMap<String, String>();
+    payload.put(ServerAction.PayloadName.CLUSTER_NAME, clusterName);
+    payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, newStackId.getStackId());
+    serverActionManager.executeAction(ServerAction.Command.FINALIZE_UPGRADE, payload);
+    Assert.assertTrue(c.getCurrentStackVersion().equals(newStackId));
   }
 
   @Test
@@ -4359,7 +4379,8 @@ public class AmbariManagementControllerT
     ExpectedUpgradeTasks expectedTasks = new ExpectedUpgradeTasks(hosts);
     expectedTasks.expectTask(Role.PIG, host1);
     expectedTasks.expectTask(Role.PIG, host2);
-    validateGeneratedStages(stages, 1, expectedTasks);
+    expectedTasks.expectTask(Role.AMBARI_SERVER_ACTION);
+    validateGeneratedStages(stages, 2, expectedTasks);
 
     resetCluster(c, currentStackId);
 
@@ -4382,12 +4403,12 @@ public class AmbariManagementControllerT
     expectedTasks.expectTask(Role.JOBTRACKER, host1);
     expectedTasks.expectTask(Role.TASKTRACKER, host2);
     expectedTasks.expectTask(Role.MAPREDUCE_CLIENT, host2);
-    validateGeneratedStages(stages, 4, expectedTasks);
+    validateGeneratedStages(stages, 5, expectedTasks);
 
     // Upgrade again
     trackAction = controller.updateCluster(r);
     stages = actionDB.getAllStages(trackAction.getRequestId());
-    validateGeneratedStages(stages, 4, expectedTasks);
+    validateGeneratedStages(stages, 5, expectedTasks);
 
     // some host components are upgraded
     c.getService(pigServiceName).getServiceComponent(pigComponentName).getServiceComponentHost(host1)
@@ -4401,7 +4422,7 @@ public class AmbariManagementControllerT
 
     trackAction = controller.updateCluster(r);
     stages = actionDB.getAllStages(trackAction.getRequestId());
-    validateGeneratedStages(stages, 4, expectedTasks);
+    validateGeneratedStages(stages, 5, expectedTasks);
 
     c.getService(mrServiceName).getServiceComponent(mrJobTrackerComp).getServiceComponentHost(host1)
         .setState(State.UPGRADE_FAILED);
@@ -4409,7 +4430,7 @@ public class AmbariManagementControllerT
         .setState(State.UPGRADE_FAILED);
     trackAction = controller.updateCluster(r);
     stages = actionDB.getAllStages(trackAction.getRequestId());
-    validateGeneratedStages(stages, 4, expectedTasks);
+    validateGeneratedStages(stages, 5, expectedTasks);
 
     // Add HDFS and upgrade
     createService(clusterName, hdfsService, State.INIT);
@@ -4439,7 +4460,8 @@ public class AmbariManagementControllerT
     expectedTasks.expectTask(Role.DATANODE, host2);
     expectedTasks.expectTask(Role.NAMENODE, host1);
     expectedTasks.expectTask(Role.HDFS_CLIENT, host2);
-    validateGeneratedStages(stages, 7, expectedTasks);
+    expectedTasks.expectTask(Role.AMBARI_SERVER_ACTION);
+    validateGeneratedStages(stages, 8, expectedTasks);
   }
 
   private void resetServiceState(String hdfsService, StackId currentStackId, Cluster c) throws AmbariException {
@@ -4460,12 +4482,23 @@ public class AmbariManagementControllerT
     for (Stage stage : stages) {
       int currRoleOrder = -1;
       for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
-        Assert.assertTrue(command.toString(), expectedTasks.isTaskExpected(command.getRole(), command.getHostName()));
-        currRoleOrder = expectedTasks.getRoleOrder(command.getRole());
-        ExecutionCommand execCommand = command.getExecutionCommandWrapper().getExecutionCommand();
-        Assert.assertTrue(execCommand.getCommandParams().containsKey("source_stack_version"));
-        Assert.assertTrue(execCommand.getCommandParams().containsKey("target_stack_version"));
-        Assert.assertEquals(RoleCommand.UPGRADE, execCommand.getRoleCommand());
+        if(command.getRole() == Role.AMBARI_SERVER_ACTION) {
+          Assert.assertTrue(command.toString(), expectedTasks.isTaskExpected(command.getRole()));
+          currRoleOrder = expectedTasks.getRoleOrder(command.getRole());
+          ExecutionCommand execCommand = command.getExecutionCommandWrapper().getExecutionCommand();
+          Assert.assertTrue(
+              execCommand.getCommandParams().containsKey(ServerAction.PayloadName.CURRENT_STACK_VERSION));
+          Assert.assertTrue(
+              execCommand.getCommandParams().containsKey(ServerAction.PayloadName.CLUSTER_NAME));
+          Assert.assertEquals(RoleCommand.EXECUTE, execCommand.getRoleCommand());
+        } else {
+          Assert.assertTrue(command.toString(), expectedTasks.isTaskExpected(command.getRole(), command.getHostName()));
+          currRoleOrder = expectedTasks.getRoleOrder(command.getRole());
+          ExecutionCommand execCommand = command.getExecutionCommandWrapper().getExecutionCommand();
+          Assert.assertTrue(execCommand.getCommandParams().containsKey("source_stack_version"));
+          Assert.assertTrue(execCommand.getCommandParams().containsKey("target_stack_version"));
+          Assert.assertEquals(RoleCommand.UPGRADE, execCommand.getRoleCommand());
+        }
       }
 
       List<HostRoleCommand> commands = stage.getOrderedHostRoleCommands();
@@ -4495,7 +4528,8 @@ public class AmbariManagementControllerT
   }
 
   class ExpectedUpgradeTasks {
-    private static final int ROLE_COUNT = 24;
+    private static final int ROLE_COUNT = 25;
+    private static final String DEFAULT_HOST = "default_host";
     private ArrayList<Map<String, Boolean>> expectedList;
     private Map<Role, Integer> roleToIndex;
 
@@ -4511,10 +4545,20 @@ public class AmbariManagementControllerT
       expectedList.get(roleToIndex.get(role)).put(host, true);
     }
 
+    public void expectTask(Role role) {
+      Assert.assertEquals(Role.AMBARI_SERVER_ACTION, role);
+      expectTask(role, DEFAULT_HOST);
+    }
+
     public boolean isTaskExpected(Role role, String host) {
       return expectedList.get(roleToIndex.get(role)).get(host);
     }
 
+    public boolean isTaskExpected(Role role) {
+      Assert.assertEquals(Role.AMBARI_SERVER_ACTION, role);
+      return isTaskExpected(role, DEFAULT_HOST);
+    }
+
     public int getRoleOrder(Role role) {
       return roleToIndex.get(role);
     }
@@ -4565,6 +4609,7 @@ public class AmbariManagementControllerT
       this.roleToIndex.put(Role.GANGLIA_SERVER, 21);
       this.roleToIndex.put(Role.GANGLIA_MONITOR, 22);
       this.roleToIndex.put(Role.NAGIOS_SERVER, 23);
+      this.roleToIndex.put(Role.AMBARI_SERVER_ACTION, 24);
     }
   }