You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/12/09 10:13:07 UTC

svn commit: r1212291 - in /incubator/ambari/trunk: ./ agent/src/main/python/ambari_agent/ agent/src/test/python/ client/src/main/java/org/apache/ambari/common/rest/agent/ controller/src/main/java/org/apache/ambari/controller/ controller/src/main/java/o...

Author: ddas
Date: Fri Dec  9 09:13:06 2011
New Revision: 1212291

URL: http://svn.apache.org/viewvc?rev=1212291&view=rev
Log:
AMBARI-141. Update the heartbeat on controller/agent.

Added:
    incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py
    incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java
    incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestStackFlattener.java
      - copied, changed from r1212286, incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/StackFlattenerTest.java
    incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/TestClusterImpl.java
      - copied, changed from r1212288, incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/ClusterImplTest.java
Removed:
    incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/StackFlattenerTest.java
    incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/ClusterImplTest.java
Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py
    incubator/ambari/trunk/agent/src/test/python/unitTests.py
    incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java
    incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/ControllerModule.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
    incubator/ambari/trunk/pom.xml

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Fri Dec  9 09:13:06 2011
@@ -2,6 +2,8 @@ Ambari Change log
 
 Release 0.1.0 - unreleased
 
+  AMBARI-141. Update the heartbeat on controller/agent (ddas)
+
   AMBARI-147. Create a stack flattener and introduce Guice. (omalley)
 
   AMBARI-145. FSMs are created for only those components that have 

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py Fri Dec  9 09:13:06 2011
@@ -22,6 +22,7 @@ import logging
 import logging.handlers
 import Queue
 import threading
+from shell import shellRunner
 from FileUtil import writeFile, createStructure, deleteStructure
 from shell import shellRunner
 import json
@@ -29,6 +30,7 @@ import os
 import time
 
 logger = logging.getLogger()
+installScriptHash = -1
 
 class ActionQueue(threading.Thread):
   global q, r, clusterId, clusterDefinitionRevision
@@ -54,6 +56,26 @@ class ActionQueue(threading.Thread):
   def put(self, response):
     if 'actions' in response:
       actions = response['actions']
+      # for the servers, take a diff of what's running, and what the controller
+      # asked the agent to start. Kill all those servers that the controller
+      # didn't ask us to start
+      sh = shellRunner()
+      runningServers = sh.getServerTracker()
+
+      # get the list of servers the controller wants running
+      serversToRun = {}
+      for action in actions:
+        if action['kind'] == 'START_ACTION':
+          processKey = sh.getServerKey(action['clusterId'],action['clusterDefinitionRevision'],
+            action['component'], action['role'])
+          serversToRun[processKey] = 1
+
+      # create stop actions for the servers that the controller wants stopped
+      for server in runningServers.keys():
+        if server not in serversToRun:
+          sh.stopProcess(server)
+      # now put all the actions in the queue. The ordering is important (we stopped
+      # all unneeded servers first)
       for action in actions:
         q.put(action)
 
@@ -63,12 +85,12 @@ class ActionQueue(threading.Thread):
       while not q.empty():
         action = q.get()
         switches = {
-                     'START_ACTION'            : self.startAction,
-                     'STOP_ACTION'             : self.stopAction,
-                     'RUN_ACTION'              : self.runAction,
-                     'CREATE_STRUCTURE_ACTION' : self.createStructureAction,
-                     'DELETE_STRUCTURE_ACTION' : self.deleteStructureAction,
-                     'WRITE_FILE_ACTION'       : self.writeFileAction
+                     'START_ACTION'              : self.startAction,
+                     'RUN_ACTION'                : self.runAction,
+                     'CREATE_STRUCTURE_ACTION'   : self.createStructureAction,
+                     'DELETE_STRUCTURE_ACTION'   : self.deleteStructureAction,
+                     'WRITE_FILE_ACTION'         : self.writeFileAction,
+                     'INSTALL_AND_CONFIG_ACTION' : self.installAndConfigAction
                    }
         try:
           result = switches.get(action['kind'], self.unknownAction)(action)
@@ -112,21 +134,31 @@ class ActionQueue(threading.Thread):
       action['command'], 
       action['user'], result)
 
-  # Run stop action, stop a server process.
-  def stopAction(self, action):
-    result = self.genResult(action)
-    return self.sh.stopProcess(action['workDirComponent'],
-      action['clusterId'], 
-      action['clusterDefinitionRevision'],
-      action['component'],
-      action['role'], 
-      action['signal'], result)
-
   # Write file action
   def writeFileAction(self, action):
     result = self.genResult(action)
     return writeFile(action, result)
 
+  # Install and configure action
+  def installAndConfigAction(self, action):
+    w = self.writeFileAction(action)
+    commandResult = {}
+    if w['exitCode']!=0:
+      commandResult['output'] = out
+      commandResult['error'] = err
+      commandResult['exitCode'] = exitCode
+      r['commandResult'] = commandResult
+      return r
+    r = self.sh.run(action['command'])
+    if r['exitCode'] != 0:
+      commandResult['output'] = out
+      commandResult['error'] = err
+    else:
+      installScriptHash = action['id'] 
+    commandResult['exitCode'] = r['exitCode']
+    r['commandResult'] = commandResult
+    return r
+
   # Run command action
   def runAction(self, action):
     result = self.genResult(action)
@@ -160,3 +192,6 @@ class ActionQueue(threading.Thread):
   def isIdle(self):
     return q.empty()
 
+  # Get the hash of the script currently used for install/config
+  def getInstallScriptHash(self):
+    return installScriptHash

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py Fri Dec  9 09:13:06 2011
@@ -41,7 +41,8 @@ class Heartbeat:
                   'timestamp'           : timestamp,
                   'hostname'            : socket.gethostname(),
                   'hardwareProfile'     : self.hardware.get(),
-                  'idle'                : self.actionQueue.isIdle()
+                  'idle'                : self.actionQueue.isIdle(),
+                  'installScriptHash'   : self.actionQueue.getInstallScriptHash()
                 }
     if len(queueResult)!=0:
       heartbeat['actionResults'] = queueResult

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py Fri Dec  9 09:13:06 2011
@@ -121,7 +121,7 @@ class shellRunner:
       logger.warn("%s %s %s can not switch user for START_ACTION." % (clusterId, component, role))
     code = 0
     commandResult = {}
-    process = clusterId+"/"+clusterDefinitionRevision+"/"+component+"/"+role
+    process = self.getServerKey(clusterId,clusterDefinitionRevision,component,role)
     if not process in serverTracker:
       cmd = sys.executable
       tempfilename = tempfile.mktemp()
@@ -148,27 +148,16 @@ class shellRunner:
     return result
 
   # Stop a process and remove presisted state
-  def stopProcess(self, workdir, clusterId, clusterDefinitionRevision, component, role, sig, result):
+  def stopProcess(self, processKey):
     global serverTracker
-    oldDir = os.getcwd()
-    os.chdir(workdir)
-    process = clusterId+"/"+clusterDefinitionRevision+"/"+component+"/"+role
-    commandResult = {'exitCode': 0}
+    keyFragments = processKey.split('/')
+    process = self.getServerKey(keyFragments[0],keyFragments[1],keyFragments[2],keyFragments[3])
     if process in serverTracker:
-      if sig=='TERM':
-        os.kill(serverTracker[process], signal.SIGTERM)
-        # TODO: gracefully check if process is still alive
-        # before remove from serverTracker
-        del serverTracker[process]
-      else:
-        os.kill(serverTracker[process], signal.SIGKILL)
-        del serverTracker[process]
-    result['commandResult'] = commandResult
-    try:
-      os.chdir(oldDir)
-    except Exception:
-      logger.warn("%s %s %s can not restore environment for STOP_ACTION." % (clusterId, component, role))
-    return result
+      os.kill(serverTracker[process], signal.SIGKILL)
+      del serverTracker[process]
 
   def getServerTracker(self):
     return serverTracker
+
+  def getServerKey(self,clusterId, clusterDefinitionRevision, component, role):
+    return clusterId+"/"+clusterDefinitionRevision+"/"+component+"/"+role

Added: incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py?rev=1212291&view=auto
==============================================================================
--- incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py (added)
+++ incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py Fri Dec  9 09:13:06 2011
@@ -0,0 +1,57 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import os, errno
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.AmbariConfig import AmbariConfig
+
+class TestAgentActions(TestCase):
+  def test_installAndConfigAction(self):
+    path = "/tmp/ambari_file_test/_file_write_test_1"
+    configFile = {
+      "data"       : "test",
+      "owner"      : os.getuid(),
+      "group"      : os.getgid() ,
+      "permission" : 0700,
+      "path"       : path,
+      "umask"      : 022
+    }
+
+    #note that the command in the action is just a listing of the path created
+    #we just want to ensure that 'ls' can run on the data file (in the actual world
+    #this 'ls' would be a puppet or a chef command that would work on a data
+    #file
+    action = { 
+      'id' : 'tttt',
+      'kind' : 'INSTALL_AND_CONFIG_ACTION',
+      'clusterId' : 'abc', 
+      'role' : 'namenode', 
+      'component' : 'hdfs', 
+      'workDirComponent' : 'abc-hdfs',
+      'file' : configFile,
+      'clusterDefinitionRevision' : 12,
+      'command' : ['/bin/ls',path]
+    }
+    result = { }
+    actionQueue = ActionQueue(AmbariConfig().getConfig())
+    result = actionQueue.installAndConfigAction(action)
+    self.assertEqual(result['exitCode'], 0, "installAndConfigAction test failed. Returned %d " % result['exitCode'])
+    self.assertEqual(result['output'], path + "\n", "installAndConfigAction test failed Returned %s " % result['output'])

Modified: incubator/ambari/trunk/agent/src/test/python/unitTests.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/test/python/unitTests.py?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/test/python/unitTests.py (original)
+++ incubator/ambari/trunk/agent/src/test/python/unitTests.py Fri Dec  9 09:13:06 2011
@@ -34,7 +34,8 @@ def all_tests_suite():
     'TestServerStatus',
     'TestFileUtil',
     'TestActionQueue',
-    'TestAmbariComponent'
+    'TestAmbariComponent',
+    'TestAgentActions'
   ])
   return TestAgent([suite])
 

Modified: incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java (original)
+++ incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java Fri Dec  9 09:13:06 2011
@@ -161,7 +161,8 @@ public class Action {
   
   public static enum Kind {
     RUN_ACTION, START_ACTION, STOP_ACTION, STATUS_ACTION, 
-    CREATE_STRUCTURE_ACTION, DELETE_STRUCTURE_ACTION, WRITE_FILE_ACTION;
+    CREATE_STRUCTURE_ACTION, DELETE_STRUCTURE_ACTION, WRITE_FILE_ACTION,
+    INSTALL_AND_CONFIG_ACTION;
     public static class KindAdaptor extends XmlAdapter<String, Kind> {
       @Override
       public String marshal(Kind obj) throws Exception {

Modified: incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java (original)
+++ incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java Fri Dec  9 09:13:06 2011
@@ -36,7 +36,7 @@ import javax.xml.bind.annotation.XmlType
 @XmlAccessorType(XmlAccessType.FIELD)
 @XmlType(name = "", propOrder = {"responseId","timestamp", 
     "hostname", "hardwareProfile", "installedRoleStates",
-    "stateChangeStatus", "actionResults", "idle"})
+    "serverStates", "deployState", "actionResults", "idle"})
 public class HeartBeat {
   @XmlElement
   private short responseId = -1;
@@ -49,7 +49,7 @@ public class HeartBeat {
   @XmlElement
   private List<AgentRoleState> installedRoleStates;
   @XmlElement
-  private boolean stateChangeStatus;
+  private int installScriptHash;
   @XmlElement
   private List<ActionResult> actionResults;
   @XmlElement
@@ -87,8 +87,8 @@ public class HeartBeat {
     return installedRoleStates;
   }
   
-  public boolean getStateChangeStatus() {
-    return stateChangeStatus;
+  public int getInstallScriptHash() {
+    return installScriptHash;
   }
   
   public void setTimestamp(long timestamp) {
@@ -115,7 +115,7 @@ public class HeartBeat {
     this.idle = idle;
   }
   
-  public void setStateChangeStatus(boolean stateChangeStatus) {
-    this.stateChangeStatus = stateChangeStatus;
+  public void setInstallScriptHash(int hash) {
+    this.installScriptHash = hash;
   }
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java Fri Dec  9 09:13:06 2011
@@ -132,7 +132,7 @@ public class Cluster {
      * @return the latestRevision
      */
     public int getLatestRevisionNumber() {
-        return this.latestRevisionNumber;
+        return latestRevisionNumber;
     }
     
     /**

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/ControllerModule.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/ControllerModule.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/ControllerModule.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/ControllerModule.java Fri Dec  9 09:13:06 2011
@@ -24,6 +24,7 @@ import org.apache.ambari.controller.rest
 import org.apache.ambari.controller.rest.resources.StacksResource;
 import org.apache.ambari.datastore.PersistentDataStore;
 import org.apache.ambari.datastore.impl.ZookeeperDS;
+import org.apache.ambari.resource.statemachine.StateMachineInvoker;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
@@ -37,7 +38,8 @@ public class ControllerModule extends Ab
     requestStaticInjection(ClustersResource.class,
                            NodesResource.class,
                            StacksResource.class,
-                           ControllerResource.class);
+                           ControllerResource.class,
+                           StateMachineInvoker.class);
     install(new FactoryModuleBuilder()
               .implement(Cluster.class,Cluster.class)
               .build(ClusterFactory.class));

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java Fri Dec  9 09:13:06 2011
@@ -29,6 +29,7 @@ import org.apache.ambari.controller.Node
 import org.apache.ambari.common.rest.agent.Action;
 import org.apache.ambari.common.rest.agent.Action.Kind;
 import org.apache.ambari.common.rest.agent.ActionResult;
+import org.apache.ambari.common.rest.agent.AgentRoleState;
 import org.apache.ambari.common.rest.agent.Command;
 import org.apache.ambari.common.rest.agent.ConfigFile;
 import org.apache.ambari.common.rest.agent.ControllerResponse;
@@ -86,21 +87,33 @@ public class HeartbeatHandler {
     List<Action> allActions = new ArrayList<Action>();
 
     //if the command-execution takes longer than one heartbeat interval
-    //the check for idleness will prevent the same node getting the same 
-    //command more than once. In the future this could be improved
+    //the check for idleness will prevent the same node getting more 
+    //commands. In the future this could be improved
     //to reflect the command execution state more accurately.
     if (heartbeat.getIdle()) {
+      
       List<ClusterNameAndRev> clustersNodeBelongsTo = 
           getClustersNodeBelongsTo(hostname);
+      
+      //TODO: have an API in Clusters that can return a script 
+      //pertaining to all clusters
+      String script = 
+          clusters.getInstallAndConfigureScript(
+              clustersNodeBelongsTo.get(0).getClusterName(), 
+              clustersNodeBelongsTo.get(0).getRevision());
+      if (script == null) {
+        return createResponse(responseId,allActions,heartbeat);
+      }
+      //send the deploy script
+      getInstallAndConfigureAction(script, allActions);
 
-      for (ClusterNameAndRev clusterIdAndRev : clustersNodeBelongsTo) {
+      if (!installAndConfigDone(script,heartbeat)) {
+        return createResponse(responseId,allActions,heartbeat);
+      }
 
-        String script = 
-            clusters.getInstallAndConfigureScript(clusterName, 
-                clusterRev);
-        
-        //send the deploy script
-        getInstallAndConfigureAction(script,clusterIdAndRev, allActions);
+      for (ClusterNameAndRev clusterIdAndRev : clustersNodeBelongsTo) {
+        clusterName = clusterIdAndRev.getClusterName();
+        clusterRev = clusterIdAndRev.getRevision();
 
         //get the cluster object corresponding to the clusterId
         Cluster cluster = clusters.getClusterByName(clusterName);
@@ -130,7 +143,8 @@ public class HeartbeatHandler {
                 //check the expected state of the agent and whether the start
                 //was successful
                 if (wasStartRoleSuccessful(clusterIdAndRev, 
-                    role.getRoleName(), response, heartbeat)) {
+                    service.getServiceName(), role.getRoleName(), response, 
+                    heartbeat)) {
                   //raise an event to the state machine for a successful 
                   //role-start
                   StateMachineInvoker.getAMBARIEventHandler()
@@ -145,7 +159,8 @@ public class HeartbeatHandler {
                 //raise an event to the state machine for a successful 
                 //role-stop instance
                 if (wasStopRoleSuccessful(clusterIdAndRev, 
-                    role.getRoleName(), response, heartbeat)) {
+                    service.getServiceName(), role.getRoleName(), response, 
+                    heartbeat)) {
                   StateMachineInvoker.getAMBARIEventHandler()
                   .handle(new RoleEvent(RoleEventType.STOP_SUCCESS, role));
                 }
@@ -159,32 +174,42 @@ public class HeartbeatHandler {
         }
       }
     }
+    return createResponse(responseId,allActions,heartbeat);
+  }
+  
+  private ControllerResponse createResponse(short responseId, 
+      List<Action> allActions, HeartBeat heartbeat) {
     ControllerResponse r = new ControllerResponse();
     r.setResponseId(responseId);
-    //TODO: need to persist this state (if allActions are different from the 
-    //last allActions)
     r.setActions(allActions);
     agentToHeartbeatResponseMap.put(heartbeat.getHostname(), r);
     return r;
   }
   
+  private boolean installAndConfigDone(String script, HeartBeat heartbeat) {
+    if (script == null || heartbeat.getInstallScriptHash() == -1) {
+      return false;
+    }
+    if (script.hashCode() == heartbeat.getInstallScriptHash()) {
+      return true;
+    }
+    return false;
+  }
+    
   private boolean wasStartRoleSuccessful(ClusterNameAndRev clusterIdAndRev, 
-      String roleName, ControllerResponse response, HeartBeat heartbeat) {
-    //Check whether the statechange was successful on the agent, and if
-    //the state information sent to the agent in the previous heartbeat
-    //included the start-action for the role in question.
-    if (!heartbeat.getStateChangeStatus()) {
+      String component, String roleName, ControllerResponse response, 
+      HeartBeat heartbeat) {
+    List<AgentRoleState> serverStates = heartbeat.getInstalledRoleStates();
+    if (serverStates == null) {
       return false;
     }
-    List<Action> actions = response.getActions();
-    for (Action action : actions) { //TBD: no iteration for every invocation of this method
-      if (action.kind != Action.Kind.START_ACTION) {
-        continue;
-      }
-      if (action.getClusterId().equals(clusterIdAndRev.getClusterName()) && 
-          action.getClusterDefinitionRevision() == 
-          clusterIdAndRev.getRevision() &&
-          action.getRole().equals(roleName)) {
+
+    //TBD: create a hashmap (don't iterate for every server state)
+    for (AgentRoleState serverState : serverStates) {
+      if (serverState.getClusterId().equals(clusterIdAndRev.getClusterName()) &&
+          serverState.getClusterDefinitionRevision() == clusterIdAndRev.getRevision() &&
+          serverState.getComponentName().equals(component) &&
+          serverState.getRoleName().equals(roleName)) {
         return true;
       }
     }
@@ -192,57 +217,56 @@ public class HeartbeatHandler {
   }
   
   private void getInstallAndConfigureAction(String script, 
-      ClusterNameAndRev clusterNameRev, List<Action> allActions) {
+      List<Action> allActions) {
     ConfigFile file = new ConfigFile();
     file.setData(script);
     //TODO: this should be written in Ambari's scratch space directory
-    file.setPath("/tmp/" + clusterNameRev.getClusterName() 
-        + "_" + clusterNameRev.getRevision());
+    //this file is the complete install/config script. Note that the
+    //script includes install/config snippets for all clusters the 
+    //node belongs to
+    file.setPath("/tmp/ambari_install_script" + script.hashCode());
     
     Action action = new Action();
     action.setFile(file);
-    action.setClusterId(clusterNameRev.getClusterName());
-    action.setClusterDefinitionRevision(clusterNameRev.getRevision());
-    action.setKind(Kind.WRITE_FILE_ACTION);
-    allActions.add(action);
-    
-    action = new Action();
-    action.setClusterId(clusterNameRev.getClusterName());
-    action.setClusterDefinitionRevision(clusterNameRev.getRevision());
+    action.setKind(Kind.INSTALL_AND_CONFIG_ACTION);
     String deployCmd = Util.getInstallAndConfigureCommand();
     //TODO: assumption is that the file is passed as an argument
     //Should generally hold for many install/config systems like Puppet
     //but is something that needs to be thought about more
     Command command = new Command(null,deployCmd,new String[]{file.getPath()});
     action.setCommand(command);
-    action.setKind(Kind.RUN_ACTION);
+    //in the action ID send the hashCode of the script content so that 
+    //the controller can check how the installation went when a heartbeat
+    //response is sent back
+    action.setId(Integer.toString(script.hashCode()));
     allActions.add(action);
   }
   
   private boolean wasStopRoleSuccessful(ClusterNameAndRev clusterIdAndRev, 
-      String roleName, ControllerResponse response, HeartBeat heartbeat) {
-    //Check whether the statechange was successful on the agent, and if
-    //the state information to the agent included the start-action for the
-    //role in question.If the state information didn't include the start-action
-    //command, the controller wants the role stopped
-    if (!heartbeat.getStateChangeStatus()) {
-      return false;
-    }
-    List<Action> actions = response.getActions();
-    for (Action action : actions) {
-      if (action.getClusterId() == clusterIdAndRev.getClusterName() && 
-          action.getClusterDefinitionRevision() == 
-          clusterIdAndRev.getRevision() &&
-          action.getRole().equals(roleName) &&
-          action.kind == Action.Kind.START_ACTION) {
-        return false;
+      String component, String roleName, ControllerResponse response, 
+      HeartBeat heartbeat) {
+    List<AgentRoleState> serverStates = heartbeat.getInstalledRoleStates();
+    if (serverStates == null) {
+      return true;
+    }
+    boolean stopped = true;
+    //TBD: create a hashmap (don't iterate for every server state)
+    for (AgentRoleState serverState : serverStates) {
+      if (serverState.getClusterId().equals(clusterIdAndRev.getClusterName()) &&
+          serverState.getClusterDefinitionRevision() == clusterIdAndRev.getRevision() &&
+          serverState.getComponentName().equals(component) &&
+          serverState.getRoleName().equals(roleName)) {
+        stopped = false;
       }
     }
-    return true;
+    return stopped;
   }
   
   private ActionResult getActionResult(HeartBeat heartbeat, String id) {
     List<ActionResult> actionResults = heartbeat.getActionResults();
+    if (actionResults == null) {
+      return null;
+    }
     for (ActionResult result : actionResults) {
       if (result.getId().equals(id)) {
         return result;
@@ -265,23 +289,24 @@ public class HeartbeatHandler {
     return new ArrayList<ClusterNameAndRev>(); //empty
   }  
   
-  private enum SpecialServiceIDs {
+  enum SpecialServiceIDs {
       SERVICE_AVAILABILITY_CHECK_ID, SERVICE_PRESTART_CHECK_ID,
       CREATE_STRUCTURE_ACTION_ID
-  }  
+  }
+  
   
-  private static class ClusterNameAndRev implements 
+  static class ClusterNameAndRev implements 
   Comparable<ClusterNameAndRev> {
     String clusterName;
-    long revision;
-    ClusterNameAndRev(String clusterName, long revision) {
+    int revision;
+    ClusterNameAndRev(String clusterName, int revision) {
       this.clusterName = clusterName;
       this.revision = revision;
     }
     String getClusterName() {
       return clusterName;
     }
-    long getRevision() {
+    int getRevision() {
       return revision;
     }
     @Override
@@ -311,7 +336,7 @@ public class HeartbeatHandler {
     }
   }
 
-  private static String getSpecialActionID(ClusterNameAndRev clusterNameAndRev, 
+  static String getSpecialActionID(ClusterNameAndRev clusterNameAndRev, 
       String component, String role, SpecialServiceIDs serviceId) {
     String id = clusterNameAndRev.getClusterName() +"-"+
       clusterNameAndRev.getRevision() +"-"+ component + "-";
@@ -325,34 +350,33 @@ public class HeartbeatHandler {
   private void checkAndCreateActions(Cluster cluster,
       ClusterFSM clusterFsm, ClusterNameAndRev clusterIdAndRev, 
       ServiceFSM service, HeartBeat heartbeat, 
-      List<Action> allActions) 
-          throws Exception {
+      List<Action> allActions) throws Exception {
+    ComponentPlugin plugin = 
+        cluster.getComponentDefinition(service.getServiceName());
     //see whether the service is in the STARTED state, and if so,
     //check whether there is any action-result that indicates success
     //of the availability check (safemode, etc.)
     if (service.getServiceState() == ServiceState.STARTED) {
-      String id = getSpecialActionID(clusterIdAndRev, service.getServiceName(), 
-          null, SpecialServiceIDs.SERVICE_AVAILABILITY_CHECK_ID);
-      ActionResult result = getActionResult(heartbeat, id);
-      if (result != null) {
-        //this action ran
-        //TODO: this needs to be generalized so that it handles the case
-        //where the service is not available for a couple of checkservice
-        //invocations
-        if (result.getCommandResult().getExitCode() == 0) {
-          StateMachineInvoker.getAMBARIEventHandler().handle(
-              new ServiceEvent(ServiceEventType.AVAILABLE_CHECK_SUCCESS,
-                  service));
+      String role = plugin.runCheckRole();  
+      if (nodePlayingRole(heartbeat.getHostname(), role)) {
+        String id = getSpecialActionID(clusterIdAndRev, service.getServiceName(), 
+            role, SpecialServiceIDs.SERVICE_AVAILABILITY_CHECK_ID);
+        ActionResult result = getActionResult(heartbeat, id);
+        if (result != null) {
+          //this action ran
+          //TODO: this needs to be generalized so that it handles the case
+          //where the service is not available for a couple of checkservice
+          //invocations
+          if (result.getCommandResult().getExitCode() == 0) {
+            StateMachineInvoker.getAMBARIEventHandler().handle(
+                new ServiceEvent(ServiceEventType.AVAILABLE_CHECK_SUCCESS,
+                    service));
+          } else {
+            StateMachineInvoker.getAMBARIEventHandler().handle(
+                new ServiceEvent(ServiceEventType.AVAILABLE_CHECK_FAILURE,
+                    service));
+          }
         } else {
-          StateMachineInvoker.getAMBARIEventHandler().handle(
-              new ServiceEvent(ServiceEventType.AVAILABLE_CHECK_FAILURE,
-                  service));
-        }
-      } else {
-        ComponentPlugin plugin = 
-            cluster.getComponentDefinition(service.getServiceName());
-        String role = plugin.runCheckRole();
-        if (nodePlayingRole(heartbeat.getHostname(), role)) {
           Action action = plugin.checkService(cluster.getName(), role);
           fillActionDetails(action, clusterIdAndRev.getClusterName(),
               clusterIdAndRev.getRevision(),service.getServiceName(), role);
@@ -364,25 +388,23 @@ public class HeartbeatHandler {
     }
     
     if (service.getServiceState() == ServiceState.PRESTART) {
-      String id = getSpecialActionID(clusterIdAndRev, service.getServiceName(), 
-          null, SpecialServiceIDs.SERVICE_PRESTART_CHECK_ID);
-      ActionResult result = getActionResult(heartbeat, id);
-      if (result != null) {
-        //this action ran
-        if (result.getCommandResult().getExitCode() == 0) {
-          StateMachineInvoker.getAMBARIEventHandler().handle(
-              new ServiceEvent(ServiceEventType.PRESTART_SUCCESS,
-                  service));
+      String role = plugin.runPreStartRole();
+      if (nodePlayingRole(heartbeat.getHostname(), role)) {
+        String id = getSpecialActionID(clusterIdAndRev, service.getServiceName(), 
+            role, SpecialServiceIDs.SERVICE_PRESTART_CHECK_ID);
+        ActionResult result = getActionResult(heartbeat, id);
+        if (result != null) {
+          //this action ran
+          if (result.getCommandResult().getExitCode() == 0) {
+            StateMachineInvoker.getAMBARIEventHandler().handle(
+                new ServiceEvent(ServiceEventType.PRESTART_SUCCESS,
+                    service));
+          } else {
+            StateMachineInvoker.getAMBARIEventHandler().handle(
+                new ServiceEvent(ServiceEventType.PRESTART_FAILURE,
+                    service));
+          }
         } else {
-          StateMachineInvoker.getAMBARIEventHandler().handle(
-              new ServiceEvent(ServiceEventType.PRESTART_FAILURE,
-                  service));
-        }
-      } else {
-        ComponentPlugin plugin = 
-            cluster.getComponentDefinition(service.getServiceName());
-        String role = plugin.runPreStartRole();
-        if (nodePlayingRole(heartbeat.getHostname(), role)) {
           Action action = plugin.preStartAction(cluster.getName(), role);
           fillActionDetails(action, clusterIdAndRev.getClusterName(),
               clusterIdAndRev.getRevision(),service.getServiceName(), role);
@@ -397,8 +419,7 @@ public class HeartbeatHandler {
   private boolean nodePlayingRole(String host, String role) 
       throws Exception {
     //TODO: iteration on every call seems avoidable ..
-    List<String> nodeRoles = nodes.getNode(host).getNodeState().
-        getNodeRoleNames();
+    List<String> nodeRoles = nodes.getNodeRoles(host);
     return nodeRoles.contains(role);
   }
   

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java Fri Dec  9 09:13:06 2011
@@ -150,6 +150,14 @@ public class Nodes {
     }
     
     /*
+     * Get the node's roles
+     */
+    public synchronized List<String> getNodeRoles(String host) 
+        throws Exception {
+      return getNode(host).getNodeState().getNodeRoleNames();
+    }
+    
+    /*
      * Get time difference
      */
     public static long getTimeDiffInMillis (XMLGregorianCalendar t2, 

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java Fri Dec  9 09:13:06 2011
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.ambari.controller;
 
 import java.util.Date;

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java Fri Dec  9 09:13:06 2011
@@ -106,14 +106,14 @@ public class ClusterImpl implements Clus
   private ClusterState clusterState;
   private static Log LOG = LogFactory.getLog(ClusterImpl.class);
     
-  public ClusterImpl(Cluster cluster, int revision, 
+  public ClusterImpl(Cluster cluster, int revision,
       ClusterState clusterState) throws IOException {
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();
     this.stateMachine = stateMachineFactory.make(this);
     List<ServiceFSM> serviceImpls = new ArrayList<ServiceFSM>();
-    for (String service : 
+    for (String service :
       cluster.getClusterDefinition(revision).getEnabledServices()) {
       if(hasActiveRoles(cluster, service)){
         ServiceImpl serviceImpl = new ServiceImpl(cluster, this, service);
@@ -123,14 +123,14 @@ public class ClusterImpl implements Clus
     this.services = serviceImpls;
     this.clusterState = clusterState;
   }
-
-  private boolean hasActiveRoles(Cluster cluster, String serviceName)
+  
+  private static boolean hasActiveRoles(Cluster cluster, String serviceName)
       throws IOException {
     ComponentPlugin plugin = cluster.getComponentDefinition(serviceName);
     String[] roles = plugin.getActiveRoles();
     return roles.length > 0;
   }
-
+  
   public ClusterStateFSM getState() {
     return stateMachine.getCurrentState();
   }
@@ -191,6 +191,7 @@ public class ClusterImpl implements Clus
     @Override
     public void transition(ClusterImpl operand, ClusterEvent event) {
       operand.getClusterState().setState(operand.getState().name());
+      //TODO: do it in the reverse order of startup
       ServiceFSM service = operand.getFirstService();
       if (service != null) {
         StateMachineInvoker.getAMBARIEventHandler().handle(

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java Fri Dec  9 09:13:06 2011
@@ -52,6 +52,9 @@ public class RoleImpl implements RoleFSM
              RoleState.ACTIVE,
              RoleEventType.START_SUCCESS, new SuccessfulStartTransition())
          
+         .addTransition(RoleState.STARTING, RoleState.STOPPING, 
+             RoleEventType.STOP)
+             
          .addTransition(RoleState.ACTIVE, RoleState.ACTIVE,
              RoleEventType.START_SUCCESS)
              
@@ -150,21 +153,25 @@ public class RoleImpl implements RoleFSM
 
   @Override
   public void activate() {
-    //load the plugin and get the commands for starting the role
+    StateMachineInvoker.getAMBARIEventHandler()
+       .handle(new RoleEvent(RoleEventType.START, this));
   }
 
   @Override
   public void deactivate() {
-    
+    StateMachineInvoker.getAMBARIEventHandler()
+       .handle(new RoleEvent(RoleEventType.STOP, this));  
   }
 
   @Override
   public boolean shouldStop() {
-    return myState == RoleState.STOPPING || myState == RoleState.STOPPED;
+    return getRoleState() == RoleState.STOPPING 
+        || getRoleState() == RoleState.STOPPED;
   }
 
   @Override
   public boolean shouldStart() {
-    return myState == RoleState.STARTING || myState == RoleState.ACTIVE;
+    return getRoleState() == RoleState.STARTING 
+        || getRoleState() == RoleState.ACTIVE;
   }
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java Fri Dec  9 09:13:06 2011
@@ -29,22 +29,25 @@ import org.apache.ambari.event.EventHand
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import com.google.inject.Inject;
+
 public class StateMachineInvoker {
   
-  private static Dispatcher dispatcher;
-  
-  static {
-    dispatcher = new AsyncDispatcher();
+  @Inject
+  public static void init(AsyncDispatcher d, 
+      ConcurrentHashMap<String, ClusterFSM> c) {
+    clusters = c;
+    dispatcher = d;
     dispatcher.register(ClusterEventType.class, new ClusterEventDispatcher());
     dispatcher.register(ServiceEventType.class, new ServiceEventDispatcher());
     dispatcher.register(RoleEventType.class, new RoleEventDispatcher());
     dispatcher.start();
   }
+  
+  private static Dispatcher dispatcher;
+  
   private static Log LOG = LogFactory.getLog(StateMachineInvoker.class);
-  public Dispatcher getAMBARIDispatcher() {
-    return dispatcher;
-  }
-
+  
   public static EventHandler getAMBARIEventHandler() {
     return dispatcher.getEventHandler();
   }
@@ -73,8 +76,7 @@ public class StateMachineInvoker {
     }
   }
   
-  private static ConcurrentMap<String, ClusterFSM> clusters = 
-      new ConcurrentHashMap<String, ClusterFSM>();
+  private static ConcurrentMap<String, ClusterFSM> clusters;
   
   public static ClusterFSM createCluster(Cluster cluster, int revision, 
       ClusterState state) throws IOException {
@@ -104,6 +106,11 @@ public class StateMachineInvoker {
     return clusters.get(clusterId);
   }
   
+  public static void setStateMachineClusterInstance(String clusterId, 
+      ClusterFSM clusterFsm) {
+    clusters.put(clusterId, clusterFsm);
+  }
+  
   public static ClusterState getClusterState(String clusterId,
       long clusterDefinitionRev) {
     return clusters.get(clusterId).getClusterState();

Added: incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java?rev=1212291&view=auto
==============================================================================
--- incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java (added)
+++ incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java Fri Dec  9 09:13:06 2011
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.controller;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.common.rest.agent.Action;
+import org.apache.ambari.common.rest.agent.Action.Kind;
+import org.apache.ambari.common.rest.agent.ActionResult;
+import org.apache.ambari.common.rest.agent.AgentRoleState;
+import org.apache.ambari.common.rest.agent.CommandResult;
+import org.apache.ambari.common.rest.agent.ControllerResponse;
+import org.apache.ambari.common.rest.agent.HeartBeat;
+import org.apache.ambari.common.rest.entities.ClusterDefinition;
+import org.apache.ambari.common.rest.entities.ClusterState;
+import org.apache.ambari.common.rest.entities.Node;
+import org.apache.ambari.common.rest.entities.NodeState;
+import org.apache.ambari.components.ComponentPlugin;
+import org.apache.ambari.controller.HeartbeatHandler.ClusterNameAndRev;
+import org.apache.ambari.controller.HeartbeatHandler.SpecialServiceIDs;
+import org.apache.ambari.event.AsyncDispatcher;
+import org.apache.ambari.event.EventHandler;
+import org.apache.ambari.resource.statemachine.ClusterFSM;
+import org.apache.ambari.resource.statemachine.RoleEvent;
+import org.apache.ambari.resource.statemachine.RoleEventType;
+import org.apache.ambari.resource.statemachine.RoleFSM;
+import org.apache.ambari.resource.statemachine.RoleState;
+import org.apache.ambari.resource.statemachine.ServiceEvent;
+import org.apache.ambari.resource.statemachine.ServiceEventType;
+import org.apache.ambari.resource.statemachine.ServiceFSM;
+import org.apache.ambari.resource.statemachine.ServiceState;
+import org.apache.ambari.resource.statemachine.StateMachineInvoker;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestHeartbeat {
+  
+  ComponentPlugin plugin;
+  String[] roles = {"abc"};
+  String[] services = {"comp1"};
+  ClusterDefinition cdef;
+  Cluster cluster;
+  Nodes nodes;
+  Clusters clusters;
+  HeartBeat heartbeat;
+  Node node;
+  final String script = "script-content";
+  final int scriptHash = script.hashCode();
+  
+  private static ConcurrentHashMap<String, ClusterFSM> c;
+  
+  @BeforeMethod
+  public void setup() throws Exception {
+    plugin = mock(ComponentPlugin.class);
+    when(plugin.getActiveRoles()).thenReturn(roles);
+    cdef = mock(ClusterDefinition.class);
+    when(cdef.getEnabledServices()).thenReturn(Arrays.asList("comp1"));
+    cluster = mock(Cluster.class);
+    when(cluster.getClusterDefinition(anyInt())).thenReturn(cdef);
+    when(cluster.getName()).thenReturn("cluster1");
+    when(cluster.getComponentDefinition("comp1")).thenReturn(plugin);
+    when(cluster.getLatestRevisionNumber()).thenReturn(-1);
+    Action startAction = new Action();
+    startAction.setKind(Kind.START_ACTION);
+    when(plugin.startServer("cluster1", "abc")).thenReturn(startAction);
+    when(plugin.runCheckRole()).thenReturn("abc");
+    when(plugin.runPreStartRole()).thenReturn("abc");
+    Action preStartAction = new Action();
+    preStartAction.setKind(Kind.RUN_ACTION);
+    when(plugin.preStartAction("cluster1", "abc")).thenReturn(preStartAction);
+    Action checkServiceAction = new Action();
+    preStartAction.setKind(Kind.RUN_ACTION);
+    when(plugin.checkService("cluster1", "abc")).thenReturn(checkServiceAction);
+    nodes = mock(Nodes.class);
+    clusters = mock(Clusters.class);
+    node = new Node();
+    node.setName("localhost");
+    NodeState nodeState = new NodeState();
+    nodeState.setClusterName("cluster1");
+    node.setNodeState(nodeState);
+    when(nodes.getNode("localhost")).thenReturn(node);
+    when(nodes.getNodeRoles("localhost"))
+         .thenReturn(Arrays.asList(roles));
+    when(clusters.getClusterByName("cluster1")).thenReturn(cluster);
+    when(clusters.getInstallAndConfigureScript(anyString(), anyInt()))
+        .thenReturn(script);
+    heartbeat = new HeartBeat();
+    heartbeat.setIdle(true);
+    heartbeat.setInstallScriptHash(-1);
+    heartbeat.setHostname("localhost");
+    heartbeat.setInstalledRoleStates(new ArrayList<AgentRoleState>());
+    StateMachineInvoker.init(new AsyncDispatcher(), 
+        (c=new ConcurrentHashMap<String, ClusterFSM>()));
+  }
+  
+  @AfterTest
+  public void teardown() {
+    c.clear();
+  }
+
+  @Test
+  public void testInstall() throws Exception {
+    //send a heartbeat and get a response with install/config action
+    HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+    
+    ControllerResponse response = handler.processHeartBeat(heartbeat);
+    List<Action> actions = response.getActions();
+    assert(actions.size() == 1);
+    assert(actions.get(0).getKind() == Action.Kind.INSTALL_AND_CONFIG_ACTION);
+  }
+  
+  
+  @Test
+  public void testStartServer() throws Exception {
+    //send a heartbeat when some server needs to be started, 
+    //and the heartbeat response should have the start action
+    TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+    ((TestRoleImpl)clusterImpl.getServices()
+        .get(0).getRoles().get(0)).setShouldStart(true);
+    c.put("cluster1", clusterImpl);
+    processHeartbeatAndGetResponse(true);
+  }
+  
+  @Test
+  public void testStopServer() throws Exception {
+    //send a heartbeat when some server needs to be stopped, 
+    //and the heartbeat response shouldn't have a start action
+    //for the server
+    TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+    ((TestRoleImpl)clusterImpl.getServices()
+        .get(0).getRoles().get(0)).setShouldStart(false);
+    c.put("cluster1", clusterImpl);
+    processHeartbeatAndGetResponse(false);
+  }
+  
+  @Test
+  public void testIsRoleActive() throws Exception {
+    //send a heartbeat with some role server start success, 
+    //and then the role should be considered active
+    TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+    c.put("cluster1", clusterImpl);
+    RoleFSM roleFsm = clusterImpl.getServices()
+        .get(0).getRoles().get(0);
+    heartbeat.setInstallScriptHash(scriptHash);
+    List<AgentRoleState> installedRoleStates = new ArrayList<AgentRoleState>();
+    AgentRoleState roleState = new AgentRoleState();
+    roleState.setRoleName(roles[0]);
+    roleState.setClusterDefinitionRevision(-1);
+    roleState.setClusterId("cluster1");
+    roleState.setComponentName("comp1");
+    installedRoleStates.add(roleState);
+    heartbeat.setInstalledRoleStates(installedRoleStates);
+    HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+    ControllerResponse response = handler.processHeartBeat(heartbeat);
+    checkActions(response, true);
+    int i = 0;
+    while (i++ < 10) {
+      if (roleFsm.getRoleState() == RoleState.ACTIVE) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    assert(roleFsm.getRoleState() == RoleState.ACTIVE);
+  }
+  
+  @Test
+  public void testCreationOfPreStartAction() throws Exception {
+    TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+    ServiceFSM serviceImpl = clusterImpl.getServices().get(0);
+    ((TestRoleImpl)clusterImpl.getServices().get(0).getRoles().get(0)).setShouldStart(false);
+    ((TestServiceImpl)serviceImpl).setServiceState(ServiceState.PRESTART);
+    c.put("cluster1", clusterImpl);
+    checkSpecialAction(ServiceState.PRESTART, ServiceEventType.START, 
+        SpecialServiceIDs.SERVICE_PRESTART_CHECK_ID);
+  }
+  @Test
+  public void testCreationOfCheckRoleAction() throws Exception {
+    
+    TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+    ServiceFSM serviceImpl = clusterImpl.getServices().get(0);
+    ((TestServiceImpl)serviceImpl).setServiceState(ServiceState.STARTED);
+    c.put("cluster1", clusterImpl);
+    checkSpecialAction(ServiceState.STARTED, ServiceEventType.ROLE_STARTED, 
+        SpecialServiceIDs.SERVICE_AVAILABILITY_CHECK_ID);
+  }
+  
+  @Test
+  public void testServiceAvailableEvent() throws Exception {
+    TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+    c.put("cluster1", clusterImpl);
+    heartbeat.setInstallScriptHash(scriptHash);
+    ServiceFSM serviceImpl = clusterImpl.getServices().get(0);
+    ((TestServiceImpl)serviceImpl).setServiceState(ServiceState.STARTED);
+    ActionResult actionResult = new ActionResult();
+    actionResult.setKind(Kind.RUN_ACTION);
+    ClusterNameAndRev clusterNameAndRev = new ClusterNameAndRev("cluster1", -1);
+    String checkActionId = HeartbeatHandler.getSpecialActionID(
+        clusterNameAndRev, "comp1", "abc", 
+        SpecialServiceIDs.SERVICE_AVAILABILITY_CHECK_ID);
+    actionResult.setId(checkActionId);
+    actionResult.setClusterId("cluster1");
+    actionResult.setClusterDefinitionRevision(-1);
+    CommandResult commandResult = new CommandResult(0,"","");
+    actionResult.setCommandResult(commandResult);
+    List<ActionResult> actionResults = new ArrayList<ActionResult>();
+    actionResults.add(actionResult);
+    heartbeat.setActionResults(actionResults);
+    HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+    handler.processHeartBeat(heartbeat);
+    int i = 0;
+    while (i++ < 10) {
+      if (serviceImpl.getServiceState() == ServiceState.ACTIVE) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    assert(serviceImpl.getServiceState() == ServiceState.ACTIVE);
+  }
+  
+  @Test
+  public void testServiceReadyToStartEvent() throws Exception {
+    TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+    c.put("cluster1", clusterImpl);
+    heartbeat.setInstallScriptHash(scriptHash);
+    ServiceFSM serviceImpl = clusterImpl.getServices().get(0);
+    ((TestServiceImpl)serviceImpl).setServiceState(ServiceState.PRESTART);
+    ActionResult actionResult = new ActionResult();
+    actionResult.setKind(Kind.RUN_ACTION);
+    ClusterNameAndRev clusterNameAndRev = new ClusterNameAndRev("cluster1", -1);
+    String checkActionId = HeartbeatHandler.getSpecialActionID(
+        clusterNameAndRev, "comp1", "abc", 
+        SpecialServiceIDs.SERVICE_PRESTART_CHECK_ID);
+    actionResult.setId(checkActionId);
+    actionResult.setClusterId("cluster1");
+    actionResult.setClusterDefinitionRevision(-1);
+    CommandResult commandResult = new CommandResult(0,"","");
+    actionResult.setCommandResult(commandResult);
+    List<ActionResult> actionResults = new ArrayList<ActionResult>();
+    actionResults.add(actionResult);
+    heartbeat.setActionResults(actionResults);
+    HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+    handler.processHeartBeat(heartbeat);
+    int i = 0;
+    while (i++ < 10) {
+      if (serviceImpl.getServiceState() == ServiceState.STARTING) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    assert(serviceImpl.getServiceState() == ServiceState.STARTING);
+  }
+  
+  private void checkSpecialAction(ServiceState serviceState, 
+      ServiceEventType serviceEventType, 
+      SpecialServiceIDs serviceId) throws Exception {
+    heartbeat.setInstallScriptHash(scriptHash);
+    HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+    ControllerResponse response = handler.processHeartBeat(heartbeat);
+    checkActions(response, ServiceState.STARTED == serviceState);
+    ClusterNameAndRev clusterNameAndRev = new ClusterNameAndRev("cluster1", -1);
+    boolean found = false;
+    String checkActionId = HeartbeatHandler.getSpecialActionID(
+        clusterNameAndRev, "comp1", "abc", 
+        serviceId);
+    for (Action action : response.getActions()) {
+      if (action.getKind() == Kind.RUN_ACTION && 
+          action.getId().equals(checkActionId)) {
+        found = true;
+        break;
+      }
+    }
+    assert(found != false);
+  }
+  
+  private void processHeartbeatAndGetResponse(boolean shouldFindStart)
+      throws Exception {
+    heartbeat.setInstallScriptHash(scriptHash);
+    HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+    ControllerResponse response = handler.processHeartBeat(heartbeat);
+    checkActions(response, shouldFindStart);
+  }
+  
+  private void checkActions(ControllerResponse response, boolean shouldFindStart) {
+    List<Action> actions = response.getActions();
+    boolean foundStart = false;
+    boolean foundInstall = false;
+    for (Action a : actions) {
+      if (a.getKind() == Action.Kind.START_ACTION) {
+        foundStart = true;
+      }
+      if (a.getKind() == Action.Kind.INSTALL_AND_CONFIG_ACTION) {
+        foundInstall = true;
+      }
+    }
+    assert (foundInstall != false && foundStart == shouldFindStart);
+  }
+
+  
+  class TestClusterImpl implements ClusterFSM {
+    ClusterState clusterState;
+    List<ServiceFSM> serviceFsms;
+    public void setClusterState(ClusterState state) {
+      this.clusterState = state;
+    }
+    public TestClusterImpl(String[] services, String roles[]) {
+      serviceFsms = new ArrayList<ServiceFSM>();
+      for (String service : services) {
+        ServiceFSM srv = new TestServiceImpl(service,roles);
+        serviceFsms.add(srv);
+      }
+    }
+    @Override
+    public List<ServiceFSM> getServices() {
+      return serviceFsms;
+    }
+
+    @Override
+    public Map<String, String> getServiceStates() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public void terminate() {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public ClusterState getClusterState() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public void activate() {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public void deactivate() {
+      // TODO Auto-generated method stub
+      
+    }
+    
+  }
+  
+  class TestServiceImpl implements ServiceFSM, EventHandler<ServiceEvent> {
+
+    ServiceState serviceState;
+    String serviceName;
+    List<RoleFSM> roleFsms;
+    public void setServiceState(ServiceState state) {
+      this.serviceState = state;
+    }
+
+    public TestServiceImpl(String service, String[] roles) {
+      roleFsms = new ArrayList<RoleFSM>();
+      for (String role : roles) {
+        TestRoleImpl r = new TestRoleImpl(role);
+        roleFsms.add(r);
+      }
+      serviceName = service;
+    }
+    
+    @Override
+    public ServiceState getServiceState() {
+      return serviceState;
+    }
+
+    @Override
+    public String getServiceName() {
+      return serviceName;
+    }
+
+    @Override
+    public ClusterFSM getAssociatedCluster() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public boolean isActive() {
+      // TODO Auto-generated method stub
+      return false;
+    }
+
+    @Override
+    public List<RoleFSM> getRoles() {
+      return roleFsms;
+    }
+
+    @Override
+    public void activate() {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public void deactivate() {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public void handle(ServiceEvent event) {
+      if (event.getType() == ServiceEventType.AVAILABLE_CHECK_SUCCESS) {
+        serviceState = ServiceState.ACTIVE;
+      }
+      if (event.getType() == ServiceEventType.PRESTART_SUCCESS) {
+        serviceState = ServiceState.STARTING;
+      }
+    }
+    
+  }
+  
+  class TestRoleImpl implements RoleFSM, EventHandler<RoleEvent>  {
+ 
+    RoleState roleState;
+    String roleName;
+    boolean shouldStart = true;
+    public void setShouldStart(boolean shouldStart) {
+      this.shouldStart = shouldStart;
+    }
+    public void setRoleState(RoleState roleState) {
+      this.roleState = roleState;
+    }
+    
+    public TestRoleImpl(String role) {
+      this.roleName = role;
+    }
+    @Override
+    public RoleState getRoleState() {
+      return roleState;
+    }
+
+    @Override
+    public String getRoleName() {
+      return roleName;
+    }
+
+    @Override
+    public ServiceFSM getAssociatedService() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public boolean shouldStop() {
+      return false;
+    }
+
+    @Override
+    public boolean shouldStart() {
+      return shouldStart;
+    }
+
+    @Override
+    public void activate() {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public void deactivate() {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public void handle(RoleEvent event) {
+      if (event.getType() == RoleEventType.START_SUCCESS) {
+        roleState = RoleState.ACTIVE;
+      }
+    }
+  }
+}

Copied: incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestStackFlattener.java (from r1212286, incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/StackFlattenerTest.java)
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestStackFlattener.java?p2=incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestStackFlattener.java&p1=incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/StackFlattenerTest.java&r1=1212286&r2=1212291&rev=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/StackFlattenerTest.java (original)
+++ incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestStackFlattener.java Fri Dec  9 09:13:06 2011
@@ -39,7 +39,7 @@ import org.testng.annotations.BeforeMeth
 import org.testng.annotations.Test;
 import static org.testng.AssertJUnit.assertEquals;
 
-public class StackFlattenerTest {
+public class TestStackFlattener {
   
   Stacks stacks;
   Stack parentStack;

Copied: incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/TestClusterImpl.java (from r1212288, incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/ClusterImplTest.java)
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/TestClusterImpl.java?p2=incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/TestClusterImpl.java&p1=incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/ClusterImplTest.java&r1=1212288&r2=1212291&rev=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/ClusterImplTest.java (original)
+++ incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/TestClusterImpl.java Fri Dec  9 09:13:06 2011
@@ -13,7 +13,7 @@ import org.apache.ambari.components.Comp
 import org.apache.ambari.controller.Cluster;
 import org.testng.annotations.Test;
 
-public class ClusterImplTest {
+public class TestClusterImpl {
 
   /**
    * Create cluster with two components, both having active roles.

Modified: incubator/ambari/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/pom.xml?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/pom.xml (original)
+++ incubator/ambari/trunk/pom.xml Fri Dec  9 09:13:06 2011
@@ -295,7 +295,7 @@
               <phase>test</phase>
               <argLine>-Xmx1024m</argLine>
               <includes>
-                <include>**/*Test.java</include>
+                <include>**/Test*.java</include>
               </includes>
               <excludes>
                 <exclude>**/IntegrationTest*.java</exclude>
@@ -443,7 +443,7 @@
                                     <argLine>-Xmx1024m -Djava.library.path=.
                                     </argLine>
                                     <includes>
-                                        <include>**/*Test.java</include>
+                                        <include>**/Test*.java</include>
                                     </includes>
                                     <excludes>
                                         <exclude>**/IntegrationTest.java</exclude>
@@ -477,7 +477,7 @@
                                         <include>**/IntegrationTest*.java</include>
                                     </includes>
                                     <excludes>
-                                        <exclude>**/*Test.java</exclude>
+                                        <exclude>**/Test*.java</exclude>
                                         <exclude>**/PerformanceTest*.java</exclude>
                                     </excludes>
                                     <skipTests>false</skipTests>