You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2013/03/06 20:06:14 UTC

svn commit: r1453482 - in /incubator/ambari/trunk: ./ ambari-agent/src/main/python/ambari_agent/ ambari-agent/src/test/python/ ambari-agent/src/test/python/dummy_files/ ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ ambari-server/s...

Author: swagle
Date: Wed Mar  6 19:06:13 2013
New Revision: 1453482

URL: http://svn.apache.org/r1453482
Log:
AMBARI-1560. Upgrade action/task support in server. (Sumit Mohanty via swagle)

Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
    incubator/ambari/trunk/ambari-agent/src/test/python/TestLiveStatus.py
    incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py
    incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
    incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Wed Mar  6 19:06:13 2013
@@ -12,6 +12,8 @@ Trunk (unreleased changes):
 
  NEW FEATURES
 
+ AMBARI-1560. Upgrade action/task support in server. (Sumit Mohanty via swagle)
+
  AMBARI-1553. List cluster-level configurations with host-level, if any (ncole)
 
  AMBARI-1557. Adding Hue service to the HDP stack definition along with the 

Modified: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py Wed Mar  6 19:06:13 2013
@@ -110,7 +110,8 @@ class ActionQueue(threading.Thread):
           component = command['componentName']
           globalConfig = command['configurations']['global']
           try:
-            livestatus = LiveStatus(cluster, service, component, globalConfig)
+            livestatus = LiveStatus(cluster, service, component,
+              globalConfig, self.config)
             result = livestatus.build()
             logger.info("Got live status for component " + component + " of service " + str(service) +\
                         " of cluster " + str(cluster) + "\n" + pprint.pformat(result))

Modified: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/LiveStatus.py?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/LiveStatus.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/LiveStatus.py Wed Mar  6 19:06:13 2013
@@ -26,6 +26,7 @@ import socket
 import time
 import traceback
 from pprint import pprint, pformat
+from StackVersionsFileHandler import StackVersionsFileHandler
 
 logger = logging.getLogger()
 
@@ -91,11 +92,13 @@ class LiveStatus:
   LIVE_STATUS = "STARTED"
   DEAD_STATUS = "INSTALLED"
 
-  def __init__(self, cluster, service, component, globalConfig):
+  def __init__(self, cluster, service, component, globalConfig, config):
     self.cluster = cluster
     self.service = service
     self.component = component
     self.globalConfig = globalConfig
+    versionsFileDir = config.get('agent', 'prefix')
+    self.versionsHandler = StackVersionsFileHandler(versionsFileDir)
 
 
   def belongsToService(self, component):
@@ -117,7 +120,9 @@ class LiveStatus:
                        "msg" : "",
                        "status" : status,
                        "clusterName" : self.cluster,
-                       "serviceName" : self.service
+                       "serviceName" : self.service,
+                       "stackVersion": self.versionsHandler.
+                                    read_stack_version(component["componentName"])
         }
         break
     logger.info("The live status for component " + str(self.component) + " of service " + \

Modified: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py Wed Mar  6 19:06:13 2013
@@ -66,7 +66,7 @@ class UpgradeExecutor:
     srcStackTuple = self.split_stack_version(srcStack)
     tgtStackTuple = self.split_stack_version(srcStack)
 
-    if srcStackTuple == None or tgtStackTuple == None:
+    if srcStackTuple is None or tgtStackTuple is None:
       errorstr = "Source (%s) or target (%s) version does not match pattern \
       <Name>-<Version>" % (srcStack, tgtStack)
       logger.info(errorstr)

Modified: incubator/ambari/trunk/ambari-agent/src/test/python/TestLiveStatus.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestLiveStatus.py?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestLiveStatus.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestLiveStatus.py Wed Mar  6 19:06:13 2013
@@ -20,15 +20,21 @@ limitations under the License.
 
 from unittest import TestCase
 from ambari_agent.LiveStatus import LiveStatus
-from ambari_agent import AmbariConfig
+from ambari_agent.AmbariConfig import AmbariConfig
 import socket
 import os
 
 class TestLiveStatus(TestCase):
   def test_build(self):
     for component in LiveStatus.COMPONENTS:
-      livestatus = LiveStatus('', component['serviceName'], component['componentName'], {})
+      config = AmbariConfig().getConfig()
+      config.set('agent', 'prefix', "dummy_files")
+      livestatus = LiveStatus('', component['serviceName'], component['componentName'], {}, config)
+      livestatus.versionsHandler.versionsFilePath = os.path.join("dummy_files","dummy_current_stack")
       result = livestatus.build()
       print "LiveStatus of {0}: {1}".format(component['serviceName'], str(result))
       self.assertEquals(len(result) > 0, True, 'Livestatus should not be empty')
+      if component['componentName'] == 'GANGLIA_SERVER':
+        self.assertEquals(result['stackVersion'],'HDP-1.2.2',
+                      'Livestatus should contain component stack version')
   

Modified: incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py Wed Mar  6 19:06:13 2013
@@ -41,9 +41,9 @@ class TestStackVersionsFileHandler(TestC
   @patch.object(stackVersionsFileHandler, 'touch_file')
   def test_read_stack_version(self, touch_method):
     stackVersionsFileHandler.versionsFilePath = dummyVersionsFile
-    result = stackVersionsFileHandler.read_stack_version("NAGIOS")
+    result = stackVersionsFileHandler.read_stack_version("NAGIOS_SERVER")
     self.assertEquals(result, "HDP-1.2.1")
-    result = stackVersionsFileHandler.read_stack_version("GANGLIA")
+    result = stackVersionsFileHandler.read_stack_version("GANGLIA_SERVER")
     self.assertEquals(result, "HDP-1.2.2")
     result = stackVersionsFileHandler.read_stack_version("NOTEXISTING")
     self.assertEquals(result, stackVersionsFileHandler.DEFAULT_VER)
@@ -55,18 +55,18 @@ class TestStackVersionsFileHandler(TestC
     stackVersionsFileHandler.versionsFilePath = dummyVersionsFile
     result = stackVersionsFileHandler.read_all_stack_versions()
     self.assertEquals(len(result.keys()), 4)
-    self.assertEquals(result["NAGIOS"], "HDP-1.2.1")
-    self.assertEquals(result["HBASE"], "HDP-1.3.0")
+    self.assertEquals(result["NAGIOS_SERVER"], "HDP-1.2.1")
+    self.assertEquals(result["HCATALOG"], "HDP-1.3.0")
     self.assertTrue(touch_method.called)
 
 
   def test_extract(self):
-    s = "   NAGIOS	\t  HDP-1.3.0  "
+    s = "   NAGIOS_SERVER	\t  HDP-1.3.0  "
     comp, ver = stackVersionsFileHandler.extract(s)
-    self.assertEqual(comp, "NAGIOS")
+    self.assertEqual(comp, "NAGIOS_SERVER")
     self.assertEqual(ver, "HDP-1.3.0")
     # testing wrong value
-    s = "   NAGIOS	"
+    s = "   NAGIOS_SERVER	"
     comp, ver = stackVersionsFileHandler.extract(s)
     self.assertEqual(comp, stackVersionsFileHandler.DEFAULT_VER)
     self.assertEqual(ver, stackVersionsFileHandler.DEFAULT_VER)

Modified: incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack (original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack Wed Mar  6 19:06:13 2013
@@ -1,4 +1,4 @@
-HDFS    HDP-1.2.0
-NAGIOS    HDP-1.2.1
-HBASE    HDP-1.3.0
-GANGLIA  HDP-1.2.2
+DATANODE    HDP-1.2.0
+NAGIOS_SERVER    HDP-1.2.1
+HCATALOG    HDP-1.3.0
+GANGLIA_SERVER  HDP-1.2.2

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java Wed Mar  6 19:06:13 2013
@@ -89,6 +89,9 @@ public class ActionManager {
     return db.getAction(StageUtils.getActionId(requestId, stageId));
   }
 
+  /**
+   * Persists command reports into the db
+   */
   public void processTaskResponse(String hostname, List<CommandReport> reports) {
     if (reports == null) {
       return;

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java Wed Mar  6 19:06:13 2013
@@ -94,11 +94,7 @@ public class HeartBeatHandler {
     if (currentResponseId == null) {
       //Server restarted, or unknown host.
       LOG.error("CurrentResponseId unknown - send register command");
-      response = new HeartBeatResponse();
-      RegistrationCommand regCmd = new RegistrationCommand();
-      response.setResponseId(0);
-      response.setRegistrationCommand(regCmd);
-      return response;
+      return createRegisterCommand();
     }
 
     if (LOG.isDebugEnabled()) {
@@ -113,10 +109,7 @@ public class HeartBeatHandler {
       return hostResponses.get(hostname);
     }else if (heartbeat.getResponseId() != currentResponseId) {
       LOG.error("Error in responseId sequence - sending agent restart command");
-      response = new HeartBeatResponse();
-      response.setRestartAgent(true);
-      response.setResponseId(currentResponseId);
-      return response;
+      return createRestartCommand(currentResponseId);
     }
 
     response = new HeartBeatResponse();
@@ -127,11 +120,7 @@ public class HeartBeatHandler {
     if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {
       // After loosing heartbeat agent should reregister
       LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");
-      response = new HeartBeatResponse();
-      RegistrationCommand regCmd = new RegistrationCommand();
-      response.setResponseId(0);
-      response.setRegistrationCommand(regCmd);
-      return response;
+      return createRegisterCommand();
     }
 
     hostResponseIds.put(hostname, currentResponseId);
@@ -161,18 +150,28 @@ public class HeartBeatHandler {
     } catch (InvalidStateTransitionException ex) {
       LOG.warn("Asking agent to reregister due to " + ex.getMessage(),  ex);
       hostObject.setState(HostState.INIT);
-      RegistrationCommand regCmd = new RegistrationCommand();
-      response.setRegistrationCommand(regCmd);
-      return response;
+      return createRegisterCommand();
     }
 
     //Examine heartbeat for command reports
+    processCommandReports(heartbeat, hostname, clusterFsm, now);
+
+    // Examine heartbeart for component live status reports
+    processStatusReports(heartbeat, hostname, clusterFsm);
+
+    // Send commands if node is active
+    if (hostObject.getState().equals(HostState.HEALTHY)) {
+      sendCommands(hostname, response);
+    }
+    return response;
+  }
+
+  protected void processCommandReports(HeartBeat heartbeat,
+                                       String hostname,
+                                       Clusters clusterFsm, long now) throws
+          AmbariException {
     List<CommandReport> reports = heartbeat.getReports();
     for (CommandReport report : reports) {
-      String clusterName = report.getClusterName();
-      if ((clusterName == null) || "".equals(clusterName)) {
-        clusterName = "cluster1";
-      }
       Cluster cl = clusterFsm.getCluster(report.getClusterName());
       String service = report.getServiceName();
       if (service == null || "".equals(service)) {
@@ -185,18 +184,23 @@ public class HeartBeatHandler {
           Service svc = cl.getService(service);
           ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
           ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
+          String schName = scHost.getServiceComponentName();
           if (report.getStatus().equals("COMPLETED")) {
-            scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(scHost
-                .getServiceComponentName(), hostname, now));
+            // Updating stack version, if needed
+            if (scHost.getState().equals(State.UPGRADING)) {
+              scHost.setStackVersion(scHost.getDesiredStackVersion());
+            }
+            scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
+                    hostname, now));
           } else if (report.getStatus().equals("FAILED")) {
-            scHost.handleEvent(new ServiceComponentHostOpFailedEvent(scHost
-                .getServiceComponentName(), hostname, now));
+            scHost.handleEvent(new ServiceComponentHostOpFailedEvent(schName,
+                    hostname, now));
           } else if (report.getStatus().equals("IN_PROGRESS")) {
-            scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(scHost
-                .getServiceComponentName(), hostname, now));
+            scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
+                    hostname, now));
           }
         } catch (ServiceComponentNotFoundException scnex) {
-          LOG.info("Service component not found ", scnex);
+          LOG.warn("Service component not found ", scnex);
         } catch (InvalidStateTransitionException ex) {
           LOG.warn("State machine exception", ex);
         }
@@ -204,8 +208,12 @@ public class HeartBeatHandler {
     }
     //Update state machines from reports
     actionManager.processTaskResponse(hostname, reports);
+  }
 
-    // Examine heartbeart for component live status reports
+  protected void processStatusReports(HeartBeat heartbeat,
+                                      String hostname,
+                                      Clusters clusterFsm) throws
+                                                            AmbariException {
     Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
     for (Cluster cl : clusters) {
       for (ComponentStatus status : heartbeat.componentStatus) {
@@ -221,16 +229,16 @@ public class HeartBeatHandler {
               State prevState = scHost.getState();
               State liveState = State.valueOf(State.class, status.getStatus());
               if (prevState.equals(State.INSTALLED)
-                  || prevState.equals(State.START_FAILED)
-                  || prevState.equals(State.STARTED)
-                  || prevState.equals(State.STOP_FAILED)) {
+                      || prevState.equals(State.START_FAILED)
+                      || prevState.equals(State.STARTED)
+                      || prevState.equals(State.STOP_FAILED)) {
                 scHost.setState(liveState);
                 if (!prevState.equals(liveState)) {
                   LOG.info("State of service component " + componentName
-                      + " of service " + status.getServiceName()
-                      + " of cluster " + status.getClusterName()
-                      + " has changed from " + prevState + " to " + liveState
-                      + " at host " + hostname);
+                          + " of service " + status.getServiceName()
+                          + " of cluster " + status.getClusterName()
+                          + " has changed from " + prevState + " to " + liveState
+                          + " at host " + hostname);
                 }
               }
 
@@ -246,73 +254,76 @@ public class HeartBeatHandler {
           }
           catch (ServiceNotFoundException e) {
             LOG.warn("Received a live status update for a non-initialized"
-                + " service"
-                + ", clusterName=" + status.getClusterName()
-                + ", serviceName=" + status.getServiceName());
+                    + " service"
+                    + ", clusterName=" + status.getClusterName()
+                    + ", serviceName=" + status.getServiceName());
             // FIXME ignore invalid live update and continue for now?
             continue;
           }
           catch (ServiceComponentNotFoundException e) {
             LOG.warn("Received a live status update for a non-initialized"
-                + " servicecomponent"
-                + ", clusterName=" + status.getClusterName()
-                + ", serviceName=" + status.getServiceName()
-                + ", componentName=" + status.getComponentName());
+                    + " servicecomponent"
+                    + ", clusterName=" + status.getClusterName()
+                    + ", serviceName=" + status.getServiceName()
+                    + ", componentName=" + status.getComponentName());
             // FIXME ignore invalid live update and continue for now?
             continue;
           }
           catch (ServiceComponentHostNotFoundException e) {
             LOG.warn("Received a live status update for a non-initialized"
-                + " service"
-                + ", clusterName=" + status.getClusterName()
-                + ", serviceName=" + status.getServiceName()
-                + ", componentName=" + status.getComponentName()
-                + ", hostname=" + hostname);
+                    + " service"
+                    + ", clusterName=" + status.getClusterName()
+                    + ", serviceName=" + status.getServiceName()
+                    + ", componentName=" + status.getComponentName()
+                    + ", hostname=" + hostname);
             // FIXME ignore invalid live update and continue for now?
             continue;
           }
           catch (RuntimeException e) {
             LOG.warn("Received a live status with invalid payload"
-                + " service"
-                + ", clusterName=" + status.getClusterName()
-                + ", serviceName=" + status.getServiceName()
-                + ", componentName=" + status.getComponentName()
-                + ", hostname=" + hostname
-                + ", error=" + e.getMessage());
+                    + " service"
+                    + ", clusterName=" + status.getClusterName()
+                    + ", serviceName=" + status.getServiceName()
+                    + ", componentName=" + status.getComponentName()
+                    + ", hostname=" + hostname
+                    + ", error=" + e.getMessage());
             continue;
           }
         }
       }
     }
+  }
 
-    // Send commands if node is active
-    if (hostObject.getState().equals(HostState.HEALTHY)) {
-      List<AgentCommand> cmds = actionQueue.dequeueAll(heartbeat.getHostname());
-      if (cmds != null && !cmds.isEmpty()) {
-        for (AgentCommand ac : cmds) {
-          try {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac));
-            }
-          } catch (Exception e) {
-            throw new AmbariException("Could not get jaxb string for command", e);
+  /**
+   * Adds commands from action queue to a heartbeat responce
+   */
+  protected void sendCommands(String hostname, HeartBeatResponse response)
+                                                        throws AmbariException {
+    List<AgentCommand> cmds = actionQueue.dequeueAll(hostname);
+    if (cmds != null && !cmds.isEmpty()) {
+      for (AgentCommand ac : cmds) {
+        try {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac));
           }
-          switch (ac.getCommandType()) {
-            case EXECUTION_COMMAND: {
-              response.addExecutionCommand((ExecutionCommand) ac);
-              break;
-            }
-            case STATUS_COMMAND: {
-              response.addStatusCommand((StatusCommand) ac);
-              break;
-            }
-              default:
-                  LOG.error("There is no action for agent command ="+ ac.getCommandType().name() );
+        } catch (Exception e) {
+          throw new AmbariException("Could not get jaxb string for command", e);
+        }
+        switch (ac.getCommandType()) {
+          case EXECUTION_COMMAND: {
+            response.addExecutionCommand((ExecutionCommand) ac);
+            break;
+          }
+          case STATUS_COMMAND: {
+            response.addStatusCommand((StatusCommand) ac);
+            break;
           }
+          default:
+            LOG.error("There is no action for agent command ="+
+                    ac.getCommandType().name() );
         }
       }
     }
-    return response;
   }
 
   public String getOsType(String os, String osRelease) {
@@ -329,6 +340,21 @@ public class HeartBeatHandler {
     return osType.toLowerCase();
   }
 
+  protected HeartBeatResponse createRegisterCommand() {
+    HeartBeatResponse response = new HeartBeatResponse();
+    RegistrationCommand regCmd = new RegistrationCommand();
+    response.setResponseId(0);
+    response.setRegistrationCommand(regCmd);
+    return response;
+  }
+
+  protected HeartBeatResponse createRestartCommand(Long currentResponseId) {
+    HeartBeatResponse response = new HeartBeatResponse();
+    response.setRestartAgent(true);
+    response.setResponseId(currentResponseId);
+    return response;
+  }
+
   public RegistrationResponse handleRegistration(Register register)
     throws InvalidStateTransitionException, AmbariException {
     String hostname = register.getHostname();

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java Wed Mar  6 19:06:13 2013
@@ -173,6 +173,10 @@ public class ServiceComponentHostImpl im
          State.STOPPING,
          ServiceComponentHostEventType.HOST_SVCCOMP_STOP,
          new ServiceComponentHostOpStartedTransition())
+    .addTransition(State.INSTALLED,
+         State.UPGRADING,
+         ServiceComponentHostEventType.HOST_SVCCOMP_UPGRADE,
+         new ServiceComponentHostOpStartedTransition())
 
      .addTransition(State.STARTING,
          State.STARTING,
@@ -333,10 +337,7 @@ public class ServiceComponentHostImpl im
          ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
          new ServiceComponentHostOpCompletedTransition())
     
-     .addTransition(State.INSTALLED,
-         State.INSTALLED,
-         ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
-         new ServiceComponentHostOpCompletedTransition())
+
     
      .addTransition(State.INSTALLING,
          State.INSTALLING,
@@ -361,6 +362,10 @@ public class ServiceComponentHostImpl im
          ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,
          new ServiceComponentHostOpStartedTransition())
 
+    .addTransition(State.INSTALLED,
+         State.INSTALLED,
+         ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
+         new ServiceComponentHostOpCompletedTransition())
      .addTransition(State.INSTALLED,
          State.UNINSTALLING,
          ServiceComponentHostEventType.HOST_SVCCOMP_UNINSTALL,
@@ -369,6 +374,10 @@ public class ServiceComponentHostImpl im
          State.INSTALLING,
          ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,
          new ServiceComponentHostOpStartedTransition())
+    .addTransition(State.INSTALLED,
+         State.UPGRADING,
+         ServiceComponentHostEventType.HOST_SVCCOMP_UPGRADE,
+         new ServiceComponentHostOpStartedTransition())
 
      .addTransition(State.UPGRADING,
          State.UPGRADING,

Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java Wed Mar  6 19:06:13 2013
@@ -760,6 +760,248 @@ public class TestHeartbeatHandler {
         stack130, serviceComponentHost3.getStackVersion());
   }
 
+  @Test
+  public void testComponentUpgradeCompleteReport() throws AmbariException, InvalidStateTransitionException {
+    ActionManager am = new ActionManager(0, 0, null, null,
+            new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    Cluster cluster = getDummyCluster();
+
+    @SuppressWarnings("serial")
+    Set<String> hostNames = new HashSet<String>(){{
+      add(DummyHostname1);
+    }};
+
+    clusters.mapHostsToCluster(hostNames, DummyCluster);
+
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(HDFS_CLIENT).persist();
+    hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+            getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+            getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+    StackId stack130 = new StackId("HDP-1.3.0");
+    StackId stack122 = new StackId("HDP-1.2.2");
+
+    serviceComponentHost1.setState(State.UPGRADING);
+    serviceComponentHost2.setState(State.INSTALLING);
+
+    serviceComponentHost1.setStackVersion(stack122);
+    serviceComponentHost1.setDesiredStackVersion(stack130);
+    serviceComponentHost2.setStackVersion(stack122);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+    CommandReport cr1 = new CommandReport();
+    cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr1.setTaskId(1);
+    cr1.setClusterName(DummyCluster);
+    cr1.setServiceName(HDFS);
+    cr1.setRole(DATANODE);
+    cr1.setStatus(HostRoleStatus.COMPLETED.toString());
+    cr1.setStdErr("none");
+    cr1.setStdOut("dummy output");
+    cr1.setExitCode(0);
+
+    CommandReport cr2 = new CommandReport();
+    cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr2.setTaskId(2);
+    cr2.setClusterName(DummyCluster);
+    cr2.setServiceName(HDFS);
+    cr2.setRole(NAMENODE);
+    cr2.setStatus(HostRoleStatus.COMPLETED.toString());
+    cr2.setStdErr("none");
+    cr2.setStdOut("dummy output");
+    cr2.setExitCode(0);
+    ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+    reports.add(cr1);
+    reports.add(cr2);
+    hb.setReports(reports);
+
+    ActionQueue aq = new ActionQueue();
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+    handler.handleHeartBeat(hb);
+    assertEquals("Stack version for SCH should be updated to " +
+            serviceComponentHost1.getDesiredStackVersion(),
+            stack130, serviceComponentHost1.getStackVersion());
+    assertEquals("Stack version for SCH should not change ",
+            stack122, serviceComponentHost2.getStackVersion());
+  }
+
+  @Test
+  public void testComponentUpgradeInProgressReport() throws AmbariException, InvalidStateTransitionException {
+    ActionManager am = new ActionManager(0, 0, null, null,
+            new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    Cluster cluster = getDummyCluster();
+
+    @SuppressWarnings("serial")
+    Set<String> hostNames = new HashSet<String>(){{
+      add(DummyHostname1);
+    }};
+
+    clusters.mapHostsToCluster(hostNames, DummyCluster);
+
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(HDFS_CLIENT).persist();
+    hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+            getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+            getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+    StackId stack130 = new StackId("HDP-1.3.0");
+    StackId stack122 = new StackId("HDP-1.2.2");
+
+    serviceComponentHost1.setState(State.UPGRADING);
+    serviceComponentHost2.setState(State.INSTALLING);
+
+    serviceComponentHost1.setStackVersion(stack122);
+    serviceComponentHost1.setDesiredStackVersion(stack130);
+    serviceComponentHost2.setStackVersion(stack122);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+    CommandReport cr1 = new CommandReport();
+    cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr1.setTaskId(1);
+    cr1.setClusterName(DummyCluster);
+    cr1.setServiceName(HDFS);
+    cr1.setRole(DATANODE);
+    cr1.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    cr1.setStdErr("none");
+    cr1.setStdOut("dummy output");
+    cr1.setExitCode(777);
+
+    CommandReport cr2 = new CommandReport();
+    cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr2.setTaskId(2);
+    cr2.setClusterName(DummyCluster);
+    cr2.setServiceName(HDFS);
+    cr2.setRole(NAMENODE);
+    cr2.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    cr2.setStdErr("none");
+    cr2.setStdOut("dummy output");
+    cr2.setExitCode(777);
+    ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+    reports.add(cr1);
+    reports.add(cr2);
+    hb.setReports(reports);
+
+    ActionQueue aq = new ActionQueue();
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+    handler.handleHeartBeat(hb);
+    assertEquals("State of SCH not change while operation is in progress",
+            State.UPGRADING, serviceComponentHost1.getState());
+    assertEquals("Stack version of SCH should not change after in progress report",
+            stack130, serviceComponentHost1.getDesiredStackVersion());
+    assertEquals("State of SCH not change while operation is  in progress",
+            State.INSTALLING, serviceComponentHost2.getState());
+  }
+
+
+  @Test
+  public void testComponentUpgradeFailReport() throws AmbariException, InvalidStateTransitionException {
+    ActionManager am = new ActionManager(0, 0, null, null,
+            new ActionDBInMemoryImpl(), new HostsMap((String) null));
+    Cluster cluster = getDummyCluster();
+
+    @SuppressWarnings("serial")
+    Set<String> hostNames = new HashSet<String>(){{
+      add(DummyHostname1);
+    }};
+
+    clusters.mapHostsToCluster(hostNames, DummyCluster);
+
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(HDFS_CLIENT).persist();
+    hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+            getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+            getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+    StackId stack130 = new StackId("HDP-1.3.0");
+    StackId stack122 = new StackId("HDP-1.2.2");
+
+    serviceComponentHost1.setState(State.UPGRADING);
+    serviceComponentHost2.setState(State.INSTALLING);
+
+    serviceComponentHost1.setStackVersion(stack122);
+    serviceComponentHost1.setDesiredStackVersion(stack130);
+    serviceComponentHost2.setStackVersion(stack122);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+    CommandReport cr1 = new CommandReport();
+    cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr1.setTaskId(1);
+    cr1.setClusterName(DummyCluster);
+    cr1.setServiceName(HDFS);
+    cr1.setRole(DATANODE);
+    cr1.setStatus(HostRoleStatus.FAILED.toString());
+    cr1.setStdErr("none");
+    cr1.setStdOut("dummy output");
+    cr1.setExitCode(0);
+
+    CommandReport cr2 = new CommandReport();
+    cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr2.setTaskId(2);
+    cr2.setClusterName(DummyCluster);
+    cr2.setServiceName(HDFS);
+    cr2.setRole(NAMENODE);
+    cr2.setStatus(HostRoleStatus.FAILED.toString());
+    cr2.setStdErr("none");
+    cr2.setStdOut("dummy output");
+    cr2.setExitCode(0);
+    ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+    reports.add(cr1);
+    reports.add(cr2);
+    hb.setReports(reports);
+
+    ActionQueue aq = new ActionQueue();
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+    handler.handleHeartBeat(hb);
+    assertEquals("State of SCH should change after fail report",
+            State.UPGRADE_FAILED, serviceComponentHost1.getState());
+    assertEquals("State of SCH should change after fail report",
+            State.INSTALL_FAILED, serviceComponentHost2.getState());
+    assertEquals("Stack version of SCH should not change after fail report",
+            stack122, serviceComponentHost1.getStackVersion());
+    assertEquals("Stack version of SCH should not change after fail report",
+            stack130, serviceComponentHost1.getDesiredStackVersion());
+    assertEquals("Stack version of SCH should not change after fail report",
+            State.INSTALL_FAILED, serviceComponentHost2.getState());
+  }
+
+
   private ComponentStatus createComponentStatus(String clusterName, String serviceName, String message,
                                                 State state, String componentName, String stackVersion) {
     ComponentStatus componentStatus1 = new ComponentStatus();