You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/12/09 10:13:07 UTC
svn commit: r1212291 - in /incubator/ambari/trunk: ./
agent/src/main/python/ambari_agent/ agent/src/test/python/
client/src/main/java/org/apache/ambari/common/rest/agent/
controller/src/main/java/org/apache/ambari/controller/
controller/src/main/java/o...
Author: ddas
Date: Fri Dec 9 09:13:06 2011
New Revision: 1212291
URL: http://svn.apache.org/viewvc?rev=1212291&view=rev
Log:
AMBARI-141. Update the heartbeat on controller/agent.
Added:
incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py
incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java
incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestStackFlattener.java
- copied, changed from r1212286, incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/StackFlattenerTest.java
incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/TestClusterImpl.java
- copied, changed from r1212288, incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/ClusterImplTest.java
Removed:
incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/StackFlattenerTest.java
incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/ClusterImplTest.java
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py
incubator/ambari/trunk/agent/src/test/python/unitTests.py
incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java
incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/ControllerModule.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
incubator/ambari/trunk/pom.xml
Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Fri Dec 9 09:13:06 2011
@@ -2,6 +2,8 @@ Ambari Change log
Release 0.1.0 - unreleased
+ AMBARI-141. Update the heartbeat on controller/agent (ddas)
+
AMBARI-147. Create a stack flattener and introduce Guice. (omalley)
AMBARI-145. FSMs are created for only those components that have
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py Fri Dec 9 09:13:06 2011
@@ -22,6 +22,7 @@ import logging
import logging.handlers
import Queue
import threading
+from shell import shellRunner
from FileUtil import writeFile, createStructure, deleteStructure
from shell import shellRunner
import json
@@ -29,6 +30,7 @@ import os
import time
logger = logging.getLogger()
+installScriptHash = -1
class ActionQueue(threading.Thread):
global q, r, clusterId, clusterDefinitionRevision
@@ -54,6 +56,26 @@ class ActionQueue(threading.Thread):
def put(self, response):
if 'actions' in response:
actions = response['actions']
+ # for the servers, take a diff of what's running, and what the controller
+ # asked the agent to start. Kill all those servers that the controller
+ # didn't ask us to start
+ sh = shellRunner()
+ runningServers = sh.getServerTracker()
+
+ # get the list of servers the controller wants running
+ serversToRun = {}
+ for action in actions:
+ if action['kind'] == 'START_ACTION':
+ processKey = sh.getServerKey(action['clusterId'],action['clusterDefinitionRevision'],
+ action['component'], action['role'])
+ serversToRun[processKey] = 1
+
+ # create stop actions for the servers that the controller wants stopped
+ for server in runningServers.keys():
+ if server not in serversToRun:
+ sh.stopProcess(server)
+ # now put all the actions in the queue. The ordering is important (we stopped
+ # all unneeded servers first)
for action in actions:
q.put(action)
@@ -63,12 +85,12 @@ class ActionQueue(threading.Thread):
while not q.empty():
action = q.get()
switches = {
- 'START_ACTION' : self.startAction,
- 'STOP_ACTION' : self.stopAction,
- 'RUN_ACTION' : self.runAction,
- 'CREATE_STRUCTURE_ACTION' : self.createStructureAction,
- 'DELETE_STRUCTURE_ACTION' : self.deleteStructureAction,
- 'WRITE_FILE_ACTION' : self.writeFileAction
+ 'START_ACTION' : self.startAction,
+ 'RUN_ACTION' : self.runAction,
+ 'CREATE_STRUCTURE_ACTION' : self.createStructureAction,
+ 'DELETE_STRUCTURE_ACTION' : self.deleteStructureAction,
+ 'WRITE_FILE_ACTION' : self.writeFileAction,
+ 'INSTALL_AND_CONFIG_ACTION' : self.installAndConfigAction
}
try:
result = switches.get(action['kind'], self.unknownAction)(action)
@@ -112,21 +134,31 @@ class ActionQueue(threading.Thread):
action['command'],
action['user'], result)
- # Run stop action, stop a server process.
- def stopAction(self, action):
- result = self.genResult(action)
- return self.sh.stopProcess(action['workDirComponent'],
- action['clusterId'],
- action['clusterDefinitionRevision'],
- action['component'],
- action['role'],
- action['signal'], result)
-
# Write file action
def writeFileAction(self, action):
result = self.genResult(action)
return writeFile(action, result)
+ # Install and configure action
+ def installAndConfigAction(self, action):
+ w = self.writeFileAction(action)
+ commandResult = {}
+ if w['exitCode']!=0:
+ commandResult['output'] = out
+ commandResult['error'] = err
+ commandResult['exitCode'] = exitCode
+ r['commandResult'] = commandResult
+ return r
+ r = self.sh.run(action['command'])
+ if r['exitCode'] != 0:
+ commandResult['output'] = out
+ commandResult['error'] = err
+ else:
+ installScriptHash = action['id']
+ commandResult['exitCode'] = r['exitCode']
+ r['commandResult'] = commandResult
+ return r
+
# Run command action
def runAction(self, action):
result = self.genResult(action)
@@ -160,3 +192,6 @@ class ActionQueue(threading.Thread):
def isIdle(self):
return q.empty()
+ # Get the hash of the script currently used for install/config
+ def getInstallScriptHash(self):
+ return installScriptHash
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py Fri Dec 9 09:13:06 2011
@@ -41,7 +41,8 @@ class Heartbeat:
'timestamp' : timestamp,
'hostname' : socket.gethostname(),
'hardwareProfile' : self.hardware.get(),
- 'idle' : self.actionQueue.isIdle()
+ 'idle' : self.actionQueue.isIdle(),
+ 'installScriptHash' : self.actionQueue.getInstallScriptHash()
}
if len(queueResult)!=0:
heartbeat['actionResults'] = queueResult
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py Fri Dec 9 09:13:06 2011
@@ -121,7 +121,7 @@ class shellRunner:
logger.warn("%s %s %s can not switch user for START_ACTION." % (clusterId, component, role))
code = 0
commandResult = {}
- process = clusterId+"/"+clusterDefinitionRevision+"/"+component+"/"+role
+ process = self.getServerKey(clusterId,clusterDefinitionRevision,component,role)
if not process in serverTracker:
cmd = sys.executable
tempfilename = tempfile.mktemp()
@@ -148,27 +148,16 @@ class shellRunner:
return result
# Stop a process and remove presisted state
- def stopProcess(self, workdir, clusterId, clusterDefinitionRevision, component, role, sig, result):
+ def stopProcess(self, processKey):
global serverTracker
- oldDir = os.getcwd()
- os.chdir(workdir)
- process = clusterId+"/"+clusterDefinitionRevision+"/"+component+"/"+role
- commandResult = {'exitCode': 0}
+ keyFragments = processKey.split('/')
+ process = self.getServerKey(keyFragments[0],keyFragments[1],keyFragments[2],keyFragments[3])
if process in serverTracker:
- if sig=='TERM':
- os.kill(serverTracker[process], signal.SIGTERM)
- # TODO: gracefully check if process is still alive
- # before remove from serverTracker
- del serverTracker[process]
- else:
- os.kill(serverTracker[process], signal.SIGKILL)
- del serverTracker[process]
- result['commandResult'] = commandResult
- try:
- os.chdir(oldDir)
- except Exception:
- logger.warn("%s %s %s can not restore environment for STOP_ACTION." % (clusterId, component, role))
- return result
+ os.kill(serverTracker[process], signal.SIGKILL)
+ del serverTracker[process]
def getServerTracker(self):
return serverTracker
+
+ def getServerKey(self,clusterId, clusterDefinitionRevision, component, role):
+ return clusterId+"/"+clusterDefinitionRevision+"/"+component+"/"+role
Added: incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py?rev=1212291&view=auto
==============================================================================
--- incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py (added)
+++ incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py Fri Dec 9 09:13:06 2011
@@ -0,0 +1,57 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import os, errno
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.AmbariConfig import AmbariConfig
+
+class TestAgentActions(TestCase):
+ def test_installAndConfigAction(self):
+ path = "/tmp/ambari_file_test/_file_write_test_1"
+ configFile = {
+ "data" : "test",
+ "owner" : os.getuid(),
+ "group" : os.getgid() ,
+ "permission" : 0700,
+ "path" : path,
+ "umask" : 022
+ }
+
+ #note that the command in the action is just a listing of the path created
+ #we just want to ensure that 'ls' can run on the data file (in the actual world
+ #this 'ls' would be a puppet or a chef command that would work on a data
+ #file
+ action = {
+ 'id' : 'tttt',
+ 'kind' : 'INSTALL_AND_CONFIG_ACTION',
+ 'clusterId' : 'abc',
+ 'role' : 'namenode',
+ 'component' : 'hdfs',
+ 'workDirComponent' : 'abc-hdfs',
+ 'file' : configFile,
+ 'clusterDefinitionRevision' : 12,
+ 'command' : ['/bin/ls',path]
+ }
+ result = { }
+ actionQueue = ActionQueue(AmbariConfig().getConfig())
+ result = actionQueue.installAndConfigAction(action)
+ self.assertEqual(result['exitCode'], 0, "installAndConfigAction test failed. Returned %d " % result['exitCode'])
+ self.assertEqual(result['output'], path + "\n", "installAndConfigAction test failed Returned %s " % result['output'])
Modified: incubator/ambari/trunk/agent/src/test/python/unitTests.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/test/python/unitTests.py?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/test/python/unitTests.py (original)
+++ incubator/ambari/trunk/agent/src/test/python/unitTests.py Fri Dec 9 09:13:06 2011
@@ -34,7 +34,8 @@ def all_tests_suite():
'TestServerStatus',
'TestFileUtil',
'TestActionQueue',
- 'TestAmbariComponent'
+ 'TestAmbariComponent',
+ 'TestAgentActions'
])
return TestAgent([suite])
Modified: incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java (original)
+++ incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java Fri Dec 9 09:13:06 2011
@@ -161,7 +161,8 @@ public class Action {
public static enum Kind {
RUN_ACTION, START_ACTION, STOP_ACTION, STATUS_ACTION,
- CREATE_STRUCTURE_ACTION, DELETE_STRUCTURE_ACTION, WRITE_FILE_ACTION;
+ CREATE_STRUCTURE_ACTION, DELETE_STRUCTURE_ACTION, WRITE_FILE_ACTION,
+ INSTALL_AND_CONFIG_ACTION;
public static class KindAdaptor extends XmlAdapter<String, Kind> {
@Override
public String marshal(Kind obj) throws Exception {
Modified: incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java (original)
+++ incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/HeartBeat.java Fri Dec 9 09:13:06 2011
@@ -36,7 +36,7 @@ import javax.xml.bind.annotation.XmlType
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType(name = "", propOrder = {"responseId","timestamp",
"hostname", "hardwareProfile", "installedRoleStates",
- "stateChangeStatus", "actionResults", "idle"})
+ "serverStates", "deployState", "actionResults", "idle"})
public class HeartBeat {
@XmlElement
private short responseId = -1;
@@ -49,7 +49,7 @@ public class HeartBeat {
@XmlElement
private List<AgentRoleState> installedRoleStates;
@XmlElement
- private boolean stateChangeStatus;
+ private int installScriptHash;
@XmlElement
private List<ActionResult> actionResults;
@XmlElement
@@ -87,8 +87,8 @@ public class HeartBeat {
return installedRoleStates;
}
- public boolean getStateChangeStatus() {
- return stateChangeStatus;
+ public int getInstallScriptHash() {
+ return installScriptHash;
}
public void setTimestamp(long timestamp) {
@@ -115,7 +115,7 @@ public class HeartBeat {
this.idle = idle;
}
- public void setStateChangeStatus(boolean stateChangeStatus) {
- this.stateChangeStatus = stateChangeStatus;
+ public void setInstallScriptHash(int hash) {
+ this.installScriptHash = hash;
}
}
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Cluster.java Fri Dec 9 09:13:06 2011
@@ -132,7 +132,7 @@ public class Cluster {
* @return the latestRevision
*/
public int getLatestRevisionNumber() {
- return this.latestRevisionNumber;
+ return latestRevisionNumber;
}
/**
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/ControllerModule.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/ControllerModule.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/ControllerModule.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/ControllerModule.java Fri Dec 9 09:13:06 2011
@@ -24,6 +24,7 @@ import org.apache.ambari.controller.rest
import org.apache.ambari.controller.rest.resources.StacksResource;
import org.apache.ambari.datastore.PersistentDataStore;
import org.apache.ambari.datastore.impl.ZookeeperDS;
+import org.apache.ambari.resource.statemachine.StateMachineInvoker;
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
@@ -37,7 +38,8 @@ public class ControllerModule extends Ab
requestStaticInjection(ClustersResource.class,
NodesResource.class,
StacksResource.class,
- ControllerResource.class);
+ ControllerResource.class,
+ StateMachineInvoker.class);
install(new FactoryModuleBuilder()
.implement(Cluster.class,Cluster.class)
.build(ClusterFactory.class));
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java Fri Dec 9 09:13:06 2011
@@ -29,6 +29,7 @@ import org.apache.ambari.controller.Node
import org.apache.ambari.common.rest.agent.Action;
import org.apache.ambari.common.rest.agent.Action.Kind;
import org.apache.ambari.common.rest.agent.ActionResult;
+import org.apache.ambari.common.rest.agent.AgentRoleState;
import org.apache.ambari.common.rest.agent.Command;
import org.apache.ambari.common.rest.agent.ConfigFile;
import org.apache.ambari.common.rest.agent.ControllerResponse;
@@ -86,21 +87,33 @@ public class HeartbeatHandler {
List<Action> allActions = new ArrayList<Action>();
//if the command-execution takes longer than one heartbeat interval
- //the check for idleness will prevent the same node getting the same
- //command more than once. In the future this could be improved
+ //the check for idleness will prevent the same node getting more
+ //commands. In the future this could be improved
//to reflect the command execution state more accurately.
if (heartbeat.getIdle()) {
+
List<ClusterNameAndRev> clustersNodeBelongsTo =
getClustersNodeBelongsTo(hostname);
+
+ //TODO: have an API in Clusters that can return a script
+ //pertaining to all clusters
+ String script =
+ clusters.getInstallAndConfigureScript(
+ clustersNodeBelongsTo.get(0).getClusterName(),
+ clustersNodeBelongsTo.get(0).getRevision());
+ if (script == null) {
+ return createResponse(responseId,allActions,heartbeat);
+ }
+ //send the deploy script
+ getInstallAndConfigureAction(script, allActions);
- for (ClusterNameAndRev clusterIdAndRev : clustersNodeBelongsTo) {
+ if (!installAndConfigDone(script,heartbeat)) {
+ return createResponse(responseId,allActions,heartbeat);
+ }
- String script =
- clusters.getInstallAndConfigureScript(clusterName,
- clusterRev);
-
- //send the deploy script
- getInstallAndConfigureAction(script,clusterIdAndRev, allActions);
+ for (ClusterNameAndRev clusterIdAndRev : clustersNodeBelongsTo) {
+ clusterName = clusterIdAndRev.getClusterName();
+ clusterRev = clusterIdAndRev.getRevision();
//get the cluster object corresponding to the clusterId
Cluster cluster = clusters.getClusterByName(clusterName);
@@ -130,7 +143,8 @@ public class HeartbeatHandler {
//check the expected state of the agent and whether the start
//was successful
if (wasStartRoleSuccessful(clusterIdAndRev,
- role.getRoleName(), response, heartbeat)) {
+ service.getServiceName(), role.getRoleName(), response,
+ heartbeat)) {
//raise an event to the state machine for a successful
//role-start
StateMachineInvoker.getAMBARIEventHandler()
@@ -145,7 +159,8 @@ public class HeartbeatHandler {
//raise an event to the state machine for a successful
//role-stop instance
if (wasStopRoleSuccessful(clusterIdAndRev,
- role.getRoleName(), response, heartbeat)) {
+ service.getServiceName(), role.getRoleName(), response,
+ heartbeat)) {
StateMachineInvoker.getAMBARIEventHandler()
.handle(new RoleEvent(RoleEventType.STOP_SUCCESS, role));
}
@@ -159,32 +174,42 @@ public class HeartbeatHandler {
}
}
}
+ return createResponse(responseId,allActions,heartbeat);
+ }
+
+ private ControllerResponse createResponse(short responseId,
+ List<Action> allActions, HeartBeat heartbeat) {
ControllerResponse r = new ControllerResponse();
r.setResponseId(responseId);
- //TODO: need to persist this state (if allActions are different from the
- //last allActions)
r.setActions(allActions);
agentToHeartbeatResponseMap.put(heartbeat.getHostname(), r);
return r;
}
+ private boolean installAndConfigDone(String script, HeartBeat heartbeat) {
+ if (script == null || heartbeat.getInstallScriptHash() == -1) {
+ return false;
+ }
+ if (script.hashCode() == heartbeat.getInstallScriptHash()) {
+ return true;
+ }
+ return false;
+ }
+
private boolean wasStartRoleSuccessful(ClusterNameAndRev clusterIdAndRev,
- String roleName, ControllerResponse response, HeartBeat heartbeat) {
- //Check whether the statechange was successful on the agent, and if
- //the state information sent to the agent in the previous heartbeat
- //included the start-action for the role in question.
- if (!heartbeat.getStateChangeStatus()) {
+ String component, String roleName, ControllerResponse response,
+ HeartBeat heartbeat) {
+ List<AgentRoleState> serverStates = heartbeat.getInstalledRoleStates();
+ if (serverStates == null) {
return false;
}
- List<Action> actions = response.getActions();
- for (Action action : actions) { //TBD: no iteration for every invocation of this method
- if (action.kind != Action.Kind.START_ACTION) {
- continue;
- }
- if (action.getClusterId().equals(clusterIdAndRev.getClusterName()) &&
- action.getClusterDefinitionRevision() ==
- clusterIdAndRev.getRevision() &&
- action.getRole().equals(roleName)) {
+
+ //TBD: create a hashmap (don't iterate for every server state)
+ for (AgentRoleState serverState : serverStates) {
+ if (serverState.getClusterId().equals(clusterIdAndRev.getClusterName()) &&
+ serverState.getClusterDefinitionRevision() == clusterIdAndRev.getRevision() &&
+ serverState.getComponentName().equals(component) &&
+ serverState.getRoleName().equals(roleName)) {
return true;
}
}
@@ -192,57 +217,56 @@ public class HeartbeatHandler {
}
private void getInstallAndConfigureAction(String script,
- ClusterNameAndRev clusterNameRev, List<Action> allActions) {
+ List<Action> allActions) {
ConfigFile file = new ConfigFile();
file.setData(script);
//TODO: this should be written in Ambari's scratch space directory
- file.setPath("/tmp/" + clusterNameRev.getClusterName()
- + "_" + clusterNameRev.getRevision());
+ //this file is the complete install/config script. Note that the
+ //script includes install/config snippets for all clusters the
+ //node belongs to
+ file.setPath("/tmp/ambari_install_script" + script.hashCode());
Action action = new Action();
action.setFile(file);
- action.setClusterId(clusterNameRev.getClusterName());
- action.setClusterDefinitionRevision(clusterNameRev.getRevision());
- action.setKind(Kind.WRITE_FILE_ACTION);
- allActions.add(action);
-
- action = new Action();
- action.setClusterId(clusterNameRev.getClusterName());
- action.setClusterDefinitionRevision(clusterNameRev.getRevision());
+ action.setKind(Kind.INSTALL_AND_CONFIG_ACTION);
String deployCmd = Util.getInstallAndConfigureCommand();
//TODO: assumption is that the file is passed as an argument
//Should generally hold for many install/config systems like Puppet
//but is something that needs to be thought about more
Command command = new Command(null,deployCmd,new String[]{file.getPath()});
action.setCommand(command);
- action.setKind(Kind.RUN_ACTION);
+ //in the action ID send the hashCode of the script content so that
+ //the controller can check how the installation went when a heartbeat
+ //response is sent back
+ action.setId(Integer.toString(script.hashCode()));
allActions.add(action);
}
private boolean wasStopRoleSuccessful(ClusterNameAndRev clusterIdAndRev,
- String roleName, ControllerResponse response, HeartBeat heartbeat) {
- //Check whether the statechange was successful on the agent, and if
- //the state information to the agent included the start-action for the
- //role in question.If the state information didn't include the start-action
- //command, the controller wants the role stopped
- if (!heartbeat.getStateChangeStatus()) {
- return false;
- }
- List<Action> actions = response.getActions();
- for (Action action : actions) {
- if (action.getClusterId() == clusterIdAndRev.getClusterName() &&
- action.getClusterDefinitionRevision() ==
- clusterIdAndRev.getRevision() &&
- action.getRole().equals(roleName) &&
- action.kind == Action.Kind.START_ACTION) {
- return false;
+ String component, String roleName, ControllerResponse response,
+ HeartBeat heartbeat) {
+ List<AgentRoleState> serverStates = heartbeat.getInstalledRoleStates();
+ if (serverStates == null) {
+ return true;
+ }
+ boolean stopped = true;
+ //TBD: create a hashmap (don't iterate for every server state)
+ for (AgentRoleState serverState : serverStates) {
+ if (serverState.getClusterId().equals(clusterIdAndRev.getClusterName()) &&
+ serverState.getClusterDefinitionRevision() == clusterIdAndRev.getRevision() &&
+ serverState.getComponentName().equals(component) &&
+ serverState.getRoleName().equals(roleName)) {
+ stopped = false;
}
}
- return true;
+ return stopped;
}
private ActionResult getActionResult(HeartBeat heartbeat, String id) {
List<ActionResult> actionResults = heartbeat.getActionResults();
+ if (actionResults == null) {
+ return null;
+ }
for (ActionResult result : actionResults) {
if (result.getId().equals(id)) {
return result;
@@ -265,23 +289,24 @@ public class HeartbeatHandler {
return new ArrayList<ClusterNameAndRev>(); //empty
}
- private enum SpecialServiceIDs {
+ enum SpecialServiceIDs {
SERVICE_AVAILABILITY_CHECK_ID, SERVICE_PRESTART_CHECK_ID,
CREATE_STRUCTURE_ACTION_ID
- }
+ }
+
- private static class ClusterNameAndRev implements
+ static class ClusterNameAndRev implements
Comparable<ClusterNameAndRev> {
String clusterName;
- long revision;
- ClusterNameAndRev(String clusterName, long revision) {
+ int revision;
+ ClusterNameAndRev(String clusterName, int revision) {
this.clusterName = clusterName;
this.revision = revision;
}
String getClusterName() {
return clusterName;
}
- long getRevision() {
+ int getRevision() {
return revision;
}
@Override
@@ -311,7 +336,7 @@ public class HeartbeatHandler {
}
}
- private static String getSpecialActionID(ClusterNameAndRev clusterNameAndRev,
+ static String getSpecialActionID(ClusterNameAndRev clusterNameAndRev,
String component, String role, SpecialServiceIDs serviceId) {
String id = clusterNameAndRev.getClusterName() +"-"+
clusterNameAndRev.getRevision() +"-"+ component + "-";
@@ -325,34 +350,33 @@ public class HeartbeatHandler {
private void checkAndCreateActions(Cluster cluster,
ClusterFSM clusterFsm, ClusterNameAndRev clusterIdAndRev,
ServiceFSM service, HeartBeat heartbeat,
- List<Action> allActions)
- throws Exception {
+ List<Action> allActions) throws Exception {
+ ComponentPlugin plugin =
+ cluster.getComponentDefinition(service.getServiceName());
//see whether the service is in the STARTED state, and if so,
//check whether there is any action-result that indicates success
//of the availability check (safemode, etc.)
if (service.getServiceState() == ServiceState.STARTED) {
- String id = getSpecialActionID(clusterIdAndRev, service.getServiceName(),
- null, SpecialServiceIDs.SERVICE_AVAILABILITY_CHECK_ID);
- ActionResult result = getActionResult(heartbeat, id);
- if (result != null) {
- //this action ran
- //TODO: this needs to be generalized so that it handles the case
- //where the service is not available for a couple of checkservice
- //invocations
- if (result.getCommandResult().getExitCode() == 0) {
- StateMachineInvoker.getAMBARIEventHandler().handle(
- new ServiceEvent(ServiceEventType.AVAILABLE_CHECK_SUCCESS,
- service));
+ String role = plugin.runCheckRole();
+ if (nodePlayingRole(heartbeat.getHostname(), role)) {
+ String id = getSpecialActionID(clusterIdAndRev, service.getServiceName(),
+ role, SpecialServiceIDs.SERVICE_AVAILABILITY_CHECK_ID);
+ ActionResult result = getActionResult(heartbeat, id);
+ if (result != null) {
+ //this action ran
+ //TODO: this needs to be generalized so that it handles the case
+ //where the service is not available for a couple of checkservice
+ //invocations
+ if (result.getCommandResult().getExitCode() == 0) {
+ StateMachineInvoker.getAMBARIEventHandler().handle(
+ new ServiceEvent(ServiceEventType.AVAILABLE_CHECK_SUCCESS,
+ service));
+ } else {
+ StateMachineInvoker.getAMBARIEventHandler().handle(
+ new ServiceEvent(ServiceEventType.AVAILABLE_CHECK_FAILURE,
+ service));
+ }
} else {
- StateMachineInvoker.getAMBARIEventHandler().handle(
- new ServiceEvent(ServiceEventType.AVAILABLE_CHECK_FAILURE,
- service));
- }
- } else {
- ComponentPlugin plugin =
- cluster.getComponentDefinition(service.getServiceName());
- String role = plugin.runCheckRole();
- if (nodePlayingRole(heartbeat.getHostname(), role)) {
Action action = plugin.checkService(cluster.getName(), role);
fillActionDetails(action, clusterIdAndRev.getClusterName(),
clusterIdAndRev.getRevision(),service.getServiceName(), role);
@@ -364,25 +388,23 @@ public class HeartbeatHandler {
}
if (service.getServiceState() == ServiceState.PRESTART) {
- String id = getSpecialActionID(clusterIdAndRev, service.getServiceName(),
- null, SpecialServiceIDs.SERVICE_PRESTART_CHECK_ID);
- ActionResult result = getActionResult(heartbeat, id);
- if (result != null) {
- //this action ran
- if (result.getCommandResult().getExitCode() == 0) {
- StateMachineInvoker.getAMBARIEventHandler().handle(
- new ServiceEvent(ServiceEventType.PRESTART_SUCCESS,
- service));
+ String role = plugin.runPreStartRole();
+ if (nodePlayingRole(heartbeat.getHostname(), role)) {
+ String id = getSpecialActionID(clusterIdAndRev, service.getServiceName(),
+ role, SpecialServiceIDs.SERVICE_PRESTART_CHECK_ID);
+ ActionResult result = getActionResult(heartbeat, id);
+ if (result != null) {
+ //this action ran
+ if (result.getCommandResult().getExitCode() == 0) {
+ StateMachineInvoker.getAMBARIEventHandler().handle(
+ new ServiceEvent(ServiceEventType.PRESTART_SUCCESS,
+ service));
+ } else {
+ StateMachineInvoker.getAMBARIEventHandler().handle(
+ new ServiceEvent(ServiceEventType.PRESTART_FAILURE,
+ service));
+ }
} else {
- StateMachineInvoker.getAMBARIEventHandler().handle(
- new ServiceEvent(ServiceEventType.PRESTART_FAILURE,
- service));
- }
- } else {
- ComponentPlugin plugin =
- cluster.getComponentDefinition(service.getServiceName());
- String role = plugin.runPreStartRole();
- if (nodePlayingRole(heartbeat.getHostname(), role)) {
Action action = plugin.preStartAction(cluster.getName(), role);
fillActionDetails(action, clusterIdAndRev.getClusterName(),
clusterIdAndRev.getRevision(),service.getServiceName(), role);
@@ -397,8 +419,7 @@ public class HeartbeatHandler {
private boolean nodePlayingRole(String host, String role)
throws Exception {
//TODO: iteration on every call seems avoidable ..
- List<String> nodeRoles = nodes.getNode(host).getNodeState().
- getNodeRoleNames();
+ List<String> nodeRoles = nodes.getNodeRoles(host);
return nodeRoles.contains(role);
}
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Nodes.java Fri Dec 9 09:13:06 2011
@@ -150,6 +150,14 @@ public class Nodes {
}
/*
+ * Get the node's roles
+ */
+ public synchronized List<String> getNodeRoles(String host)
+ throws Exception {
+ return getNode(host).getNodeState().getNodeRoleNames();
+ }
+
+ /*
* Get time difference
*/
public static long getTimeDiffInMillis (XMLGregorianCalendar t2,
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/Util.java Fri Dec 9 09:13:06 2011
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.ambari.controller;
import java.util.Date;
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java Fri Dec 9 09:13:06 2011
@@ -106,14 +106,14 @@ public class ClusterImpl implements Clus
private ClusterState clusterState;
private static Log LOG = LogFactory.getLog(ClusterImpl.class);
- public ClusterImpl(Cluster cluster, int revision,
+ public ClusterImpl(Cluster cluster, int revision,
ClusterState clusterState) throws IOException {
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.stateMachine = stateMachineFactory.make(this);
List<ServiceFSM> serviceImpls = new ArrayList<ServiceFSM>();
- for (String service :
+ for (String service :
cluster.getClusterDefinition(revision).getEnabledServices()) {
if(hasActiveRoles(cluster, service)){
ServiceImpl serviceImpl = new ServiceImpl(cluster, this, service);
@@ -123,14 +123,14 @@ public class ClusterImpl implements Clus
this.services = serviceImpls;
this.clusterState = clusterState;
}
-
- private boolean hasActiveRoles(Cluster cluster, String serviceName)
+
+ private static boolean hasActiveRoles(Cluster cluster, String serviceName)
throws IOException {
ComponentPlugin plugin = cluster.getComponentDefinition(serviceName);
String[] roles = plugin.getActiveRoles();
return roles.length > 0;
}
-
+
public ClusterStateFSM getState() {
return stateMachine.getCurrentState();
}
@@ -191,6 +191,7 @@ public class ClusterImpl implements Clus
@Override
public void transition(ClusterImpl operand, ClusterEvent event) {
operand.getClusterState().setState(operand.getState().name());
+ //TODO: do it in the reverse order of startup
ServiceFSM service = operand.getFirstService();
if (service != null) {
StateMachineInvoker.getAMBARIEventHandler().handle(
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java Fri Dec 9 09:13:06 2011
@@ -52,6 +52,9 @@ public class RoleImpl implements RoleFSM
RoleState.ACTIVE,
RoleEventType.START_SUCCESS, new SuccessfulStartTransition())
+ .addTransition(RoleState.STARTING, RoleState.STOPPING,
+ RoleEventType.STOP)
+
.addTransition(RoleState.ACTIVE, RoleState.ACTIVE,
RoleEventType.START_SUCCESS)
@@ -150,21 +153,25 @@ public class RoleImpl implements RoleFSM
@Override
public void activate() {
- //load the plugin and get the commands for starting the role
+ StateMachineInvoker.getAMBARIEventHandler()
+ .handle(new RoleEvent(RoleEventType.START, this));
}
@Override
public void deactivate() {
-
+ StateMachineInvoker.getAMBARIEventHandler()
+ .handle(new RoleEvent(RoleEventType.STOP, this));
}
@Override
public boolean shouldStop() {
- return myState == RoleState.STOPPING || myState == RoleState.STOPPED;
+ return getRoleState() == RoleState.STOPPING
+ || getRoleState() == RoleState.STOPPED;
}
@Override
public boolean shouldStart() {
- return myState == RoleState.STARTING || myState == RoleState.ACTIVE;
+ return getRoleState() == RoleState.STARTING
+ || getRoleState() == RoleState.ACTIVE;
}
}
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java Fri Dec 9 09:13:06 2011
@@ -29,22 +29,25 @@ import org.apache.ambari.event.EventHand
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.google.inject.Inject;
+
public class StateMachineInvoker {
- private static Dispatcher dispatcher;
-
- static {
- dispatcher = new AsyncDispatcher();
+ @Inject
+ public static void init(AsyncDispatcher d,
+ ConcurrentHashMap<String, ClusterFSM> c) {
+ clusters = c;
+ dispatcher = d;
dispatcher.register(ClusterEventType.class, new ClusterEventDispatcher());
dispatcher.register(ServiceEventType.class, new ServiceEventDispatcher());
dispatcher.register(RoleEventType.class, new RoleEventDispatcher());
dispatcher.start();
}
+
+ private static Dispatcher dispatcher;
+
private static Log LOG = LogFactory.getLog(StateMachineInvoker.class);
- public Dispatcher getAMBARIDispatcher() {
- return dispatcher;
- }
-
+
public static EventHandler getAMBARIEventHandler() {
return dispatcher.getEventHandler();
}
@@ -73,8 +76,7 @@ public class StateMachineInvoker {
}
}
- private static ConcurrentMap<String, ClusterFSM> clusters =
- new ConcurrentHashMap<String, ClusterFSM>();
+ private static ConcurrentMap<String, ClusterFSM> clusters;
public static ClusterFSM createCluster(Cluster cluster, int revision,
ClusterState state) throws IOException {
@@ -104,6 +106,11 @@ public class StateMachineInvoker {
return clusters.get(clusterId);
}
+ public static void setStateMachineClusterInstance(String clusterId,
+ ClusterFSM clusterFsm) {
+ clusters.put(clusterId, clusterFsm);
+ }
+
public static ClusterState getClusterState(String clusterId,
long clusterDefinitionRev) {
return clusters.get(clusterId).getClusterState();
Added: incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java?rev=1212291&view=auto
==============================================================================
--- incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java (added)
+++ incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java Fri Dec 9 09:13:06 2011
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.controller;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.common.rest.agent.Action;
+import org.apache.ambari.common.rest.agent.Action.Kind;
+import org.apache.ambari.common.rest.agent.ActionResult;
+import org.apache.ambari.common.rest.agent.AgentRoleState;
+import org.apache.ambari.common.rest.agent.CommandResult;
+import org.apache.ambari.common.rest.agent.ControllerResponse;
+import org.apache.ambari.common.rest.agent.HeartBeat;
+import org.apache.ambari.common.rest.entities.ClusterDefinition;
+import org.apache.ambari.common.rest.entities.ClusterState;
+import org.apache.ambari.common.rest.entities.Node;
+import org.apache.ambari.common.rest.entities.NodeState;
+import org.apache.ambari.components.ComponentPlugin;
+import org.apache.ambari.controller.HeartbeatHandler.ClusterNameAndRev;
+import org.apache.ambari.controller.HeartbeatHandler.SpecialServiceIDs;
+import org.apache.ambari.event.AsyncDispatcher;
+import org.apache.ambari.event.EventHandler;
+import org.apache.ambari.resource.statemachine.ClusterFSM;
+import org.apache.ambari.resource.statemachine.RoleEvent;
+import org.apache.ambari.resource.statemachine.RoleEventType;
+import org.apache.ambari.resource.statemachine.RoleFSM;
+import org.apache.ambari.resource.statemachine.RoleState;
+import org.apache.ambari.resource.statemachine.ServiceEvent;
+import org.apache.ambari.resource.statemachine.ServiceEventType;
+import org.apache.ambari.resource.statemachine.ServiceFSM;
+import org.apache.ambari.resource.statemachine.ServiceState;
+import org.apache.ambari.resource.statemachine.StateMachineInvoker;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestHeartbeat {
+
+ ComponentPlugin plugin;
+ String[] roles = {"abc"};
+ String[] services = {"comp1"};
+ ClusterDefinition cdef;
+ Cluster cluster;
+ Nodes nodes;
+ Clusters clusters;
+ HeartBeat heartbeat;
+ Node node;
+ final String script = "script-content";
+ final int scriptHash = script.hashCode();
+
+ private static ConcurrentHashMap<String, ClusterFSM> c;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ plugin = mock(ComponentPlugin.class);
+ when(plugin.getActiveRoles()).thenReturn(roles);
+ cdef = mock(ClusterDefinition.class);
+ when(cdef.getEnabledServices()).thenReturn(Arrays.asList("comp1"));
+ cluster = mock(Cluster.class);
+ when(cluster.getClusterDefinition(anyInt())).thenReturn(cdef);
+ when(cluster.getName()).thenReturn("cluster1");
+ when(cluster.getComponentDefinition("comp1")).thenReturn(plugin);
+ when(cluster.getLatestRevisionNumber()).thenReturn(-1);
+ Action startAction = new Action();
+ startAction.setKind(Kind.START_ACTION);
+ when(plugin.startServer("cluster1", "abc")).thenReturn(startAction);
+ when(plugin.runCheckRole()).thenReturn("abc");
+ when(plugin.runPreStartRole()).thenReturn("abc");
+ Action preStartAction = new Action();
+ preStartAction.setKind(Kind.RUN_ACTION);
+ when(plugin.preStartAction("cluster1", "abc")).thenReturn(preStartAction);
+ Action checkServiceAction = new Action();
+ preStartAction.setKind(Kind.RUN_ACTION);
+ when(plugin.checkService("cluster1", "abc")).thenReturn(checkServiceAction);
+ nodes = mock(Nodes.class);
+ clusters = mock(Clusters.class);
+ node = new Node();
+ node.setName("localhost");
+ NodeState nodeState = new NodeState();
+ nodeState.setClusterName("cluster1");
+ node.setNodeState(nodeState);
+ when(nodes.getNode("localhost")).thenReturn(node);
+ when(nodes.getNodeRoles("localhost"))
+ .thenReturn(Arrays.asList(roles));
+ when(clusters.getClusterByName("cluster1")).thenReturn(cluster);
+ when(clusters.getInstallAndConfigureScript(anyString(), anyInt()))
+ .thenReturn(script);
+ heartbeat = new HeartBeat();
+ heartbeat.setIdle(true);
+ heartbeat.setInstallScriptHash(-1);
+ heartbeat.setHostname("localhost");
+ heartbeat.setInstalledRoleStates(new ArrayList<AgentRoleState>());
+ StateMachineInvoker.init(new AsyncDispatcher(),
+ (c=new ConcurrentHashMap<String, ClusterFSM>()));
+ }
+
+ @AfterTest
+ public void teardown() {
+ c.clear();
+ }
+
+ @Test
+ public void testInstall() throws Exception {
+ //send a heartbeat and get a response with install/config action
+ HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+
+ ControllerResponse response = handler.processHeartBeat(heartbeat);
+ List<Action> actions = response.getActions();
+ assert(actions.size() == 1);
+ assert(actions.get(0).getKind() == Action.Kind.INSTALL_AND_CONFIG_ACTION);
+ }
+
+
+ @Test
+ public void testStartServer() throws Exception {
+ //send a heartbeat when some server needs to be started,
+ //and the heartbeat response should have the start action
+ TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+ ((TestRoleImpl)clusterImpl.getServices()
+ .get(0).getRoles().get(0)).setShouldStart(true);
+ c.put("cluster1", clusterImpl);
+ processHeartbeatAndGetResponse(true);
+ }
+
+ @Test
+ public void testStopServer() throws Exception {
+ //send a heartbeat when some server needs to be stopped,
+ //and the heartbeat response shouldn't have a start action
+ //for the server
+ TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+ ((TestRoleImpl)clusterImpl.getServices()
+ .get(0).getRoles().get(0)).setShouldStart(false);
+ c.put("cluster1", clusterImpl);
+ processHeartbeatAndGetResponse(false);
+ }
+
+ @Test
+ public void testIsRoleActive() throws Exception {
+ //send a heartbeat with some role server start success,
+ //and then the role should be considered active
+ TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+ c.put("cluster1", clusterImpl);
+ RoleFSM roleFsm = clusterImpl.getServices()
+ .get(0).getRoles().get(0);
+ heartbeat.setInstallScriptHash(scriptHash);
+ List<AgentRoleState> installedRoleStates = new ArrayList<AgentRoleState>();
+ AgentRoleState roleState = new AgentRoleState();
+ roleState.setRoleName(roles[0]);
+ roleState.setClusterDefinitionRevision(-1);
+ roleState.setClusterId("cluster1");
+ roleState.setComponentName("comp1");
+ installedRoleStates.add(roleState);
+ heartbeat.setInstalledRoleStates(installedRoleStates);
+ HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+ ControllerResponse response = handler.processHeartBeat(heartbeat);
+ checkActions(response, true);
+ int i = 0;
+ while (i++ < 10) {
+ if (roleFsm.getRoleState() == RoleState.ACTIVE) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ assert(roleFsm.getRoleState() == RoleState.ACTIVE);
+ }
+
+ @Test
+ public void testCreationOfPreStartAction() throws Exception {
+ TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+ ServiceFSM serviceImpl = clusterImpl.getServices().get(0);
+ ((TestRoleImpl)clusterImpl.getServices().get(0).getRoles().get(0)).setShouldStart(false);
+ ((TestServiceImpl)serviceImpl).setServiceState(ServiceState.PRESTART);
+ c.put("cluster1", clusterImpl);
+ checkSpecialAction(ServiceState.PRESTART, ServiceEventType.START,
+ SpecialServiceIDs.SERVICE_PRESTART_CHECK_ID);
+ }
+ @Test
+ public void testCreationOfCheckRoleAction() throws Exception {
+
+ TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+ ServiceFSM serviceImpl = clusterImpl.getServices().get(0);
+ ((TestServiceImpl)serviceImpl).setServiceState(ServiceState.STARTED);
+ c.put("cluster1", clusterImpl);
+ checkSpecialAction(ServiceState.STARTED, ServiceEventType.ROLE_STARTED,
+ SpecialServiceIDs.SERVICE_AVAILABILITY_CHECK_ID);
+ }
+
+ @Test
+ public void testServiceAvailableEvent() throws Exception {
+ TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+ c.put("cluster1", clusterImpl);
+ heartbeat.setInstallScriptHash(scriptHash);
+ ServiceFSM serviceImpl = clusterImpl.getServices().get(0);
+ ((TestServiceImpl)serviceImpl).setServiceState(ServiceState.STARTED);
+ ActionResult actionResult = new ActionResult();
+ actionResult.setKind(Kind.RUN_ACTION);
+ ClusterNameAndRev clusterNameAndRev = new ClusterNameAndRev("cluster1", -1);
+ String checkActionId = HeartbeatHandler.getSpecialActionID(
+ clusterNameAndRev, "comp1", "abc",
+ SpecialServiceIDs.SERVICE_AVAILABILITY_CHECK_ID);
+ actionResult.setId(checkActionId);
+ actionResult.setClusterId("cluster1");
+ actionResult.setClusterDefinitionRevision(-1);
+ CommandResult commandResult = new CommandResult(0,"","");
+ actionResult.setCommandResult(commandResult);
+ List<ActionResult> actionResults = new ArrayList<ActionResult>();
+ actionResults.add(actionResult);
+ heartbeat.setActionResults(actionResults);
+ HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+ handler.processHeartBeat(heartbeat);
+ int i = 0;
+ while (i++ < 10) {
+ if (serviceImpl.getServiceState() == ServiceState.ACTIVE) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ assert(serviceImpl.getServiceState() == ServiceState.ACTIVE);
+ }
+
+ @Test
+ public void testServiceReadyToStartEvent() throws Exception {
+ TestClusterImpl clusterImpl = new TestClusterImpl(services,roles);
+ c.put("cluster1", clusterImpl);
+ heartbeat.setInstallScriptHash(scriptHash);
+ ServiceFSM serviceImpl = clusterImpl.getServices().get(0);
+ ((TestServiceImpl)serviceImpl).setServiceState(ServiceState.PRESTART);
+ ActionResult actionResult = new ActionResult();
+ actionResult.setKind(Kind.RUN_ACTION);
+ ClusterNameAndRev clusterNameAndRev = new ClusterNameAndRev("cluster1", -1);
+ String checkActionId = HeartbeatHandler.getSpecialActionID(
+ clusterNameAndRev, "comp1", "abc",
+ SpecialServiceIDs.SERVICE_PRESTART_CHECK_ID);
+ actionResult.setId(checkActionId);
+ actionResult.setClusterId("cluster1");
+ actionResult.setClusterDefinitionRevision(-1);
+ CommandResult commandResult = new CommandResult(0,"","");
+ actionResult.setCommandResult(commandResult);
+ List<ActionResult> actionResults = new ArrayList<ActionResult>();
+ actionResults.add(actionResult);
+ heartbeat.setActionResults(actionResults);
+ HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+ handler.processHeartBeat(heartbeat);
+ int i = 0;
+ while (i++ < 10) {
+ if (serviceImpl.getServiceState() == ServiceState.STARTING) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ assert(serviceImpl.getServiceState() == ServiceState.STARTING);
+ }
+
+ private void checkSpecialAction(ServiceState serviceState,
+ ServiceEventType serviceEventType,
+ SpecialServiceIDs serviceId) throws Exception {
+ heartbeat.setInstallScriptHash(scriptHash);
+ HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+ ControllerResponse response = handler.processHeartBeat(heartbeat);
+ checkActions(response, ServiceState.STARTED == serviceState);
+ ClusterNameAndRev clusterNameAndRev = new ClusterNameAndRev("cluster1", -1);
+ boolean found = false;
+ String checkActionId = HeartbeatHandler.getSpecialActionID(
+ clusterNameAndRev, "comp1", "abc",
+ serviceId);
+ for (Action action : response.getActions()) {
+ if (action.getKind() == Kind.RUN_ACTION &&
+ action.getId().equals(checkActionId)) {
+ found = true;
+ break;
+ }
+ }
+ assert(found != false);
+ }
+
+ private void processHeartbeatAndGetResponse(boolean shouldFindStart)
+ throws Exception {
+ heartbeat.setInstallScriptHash(scriptHash);
+ HeartbeatHandler handler = new HeartbeatHandler(clusters, nodes);
+ ControllerResponse response = handler.processHeartBeat(heartbeat);
+ checkActions(response, shouldFindStart);
+ }
+
+ private void checkActions(ControllerResponse response, boolean shouldFindStart) {
+ List<Action> actions = response.getActions();
+ boolean foundStart = false;
+ boolean foundInstall = false;
+ for (Action a : actions) {
+ if (a.getKind() == Action.Kind.START_ACTION) {
+ foundStart = true;
+ }
+ if (a.getKind() == Action.Kind.INSTALL_AND_CONFIG_ACTION) {
+ foundInstall = true;
+ }
+ }
+ assert (foundInstall != false && foundStart == shouldFindStart);
+ }
+
+
+ class TestClusterImpl implements ClusterFSM {
+ ClusterState clusterState;
+ List<ServiceFSM> serviceFsms;
+ public void setClusterState(ClusterState state) {
+ this.clusterState = state;
+ }
+ public TestClusterImpl(String[] services, String roles[]) {
+ serviceFsms = new ArrayList<ServiceFSM>();
+ for (String service : services) {
+ ServiceFSM srv = new TestServiceImpl(service,roles);
+ serviceFsms.add(srv);
+ }
+ }
+ @Override
+ public List<ServiceFSM> getServices() {
+ return serviceFsms;
+ }
+
+ @Override
+ public Map<String, String> getServiceStates() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void terminate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ClusterState getClusterState() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void activate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void deactivate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ class TestServiceImpl implements ServiceFSM, EventHandler<ServiceEvent> {
+
+ ServiceState serviceState;
+ String serviceName;
+ List<RoleFSM> roleFsms;
+ public void setServiceState(ServiceState state) {
+ this.serviceState = state;
+ }
+
+ public TestServiceImpl(String service, String[] roles) {
+ roleFsms = new ArrayList<RoleFSM>();
+ for (String role : roles) {
+ TestRoleImpl r = new TestRoleImpl(role);
+ roleFsms.add(r);
+ }
+ serviceName = service;
+ }
+
+ @Override
+ public ServiceState getServiceState() {
+ return serviceState;
+ }
+
+ @Override
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ @Override
+ public ClusterFSM getAssociatedCluster() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isActive() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public List<RoleFSM> getRoles() {
+ return roleFsms;
+ }
+
+ @Override
+ public void activate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void deactivate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void handle(ServiceEvent event) {
+ if (event.getType() == ServiceEventType.AVAILABLE_CHECK_SUCCESS) {
+ serviceState = ServiceState.ACTIVE;
+ }
+ if (event.getType() == ServiceEventType.PRESTART_SUCCESS) {
+ serviceState = ServiceState.STARTING;
+ }
+ }
+
+ }
+
+ class TestRoleImpl implements RoleFSM, EventHandler<RoleEvent> {
+
+ RoleState roleState;
+ String roleName;
+ boolean shouldStart = true;
+ public void setShouldStart(boolean shouldStart) {
+ this.shouldStart = shouldStart;
+ }
+ public void setRoleState(RoleState roleState) {
+ this.roleState = roleState;
+ }
+
+ public TestRoleImpl(String role) {
+ this.roleName = role;
+ }
+ @Override
+ public RoleState getRoleState() {
+ return roleState;
+ }
+
+ @Override
+ public String getRoleName() {
+ return roleName;
+ }
+
+ @Override
+ public ServiceFSM getAssociatedService() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean shouldStop() {
+ return false;
+ }
+
+ @Override
+ public boolean shouldStart() {
+ return shouldStart;
+ }
+
+ @Override
+ public void activate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void deactivate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void handle(RoleEvent event) {
+ if (event.getType() == RoleEventType.START_SUCCESS) {
+ roleState = RoleState.ACTIVE;
+ }
+ }
+ }
+}
Copied: incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestStackFlattener.java (from r1212286, incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/StackFlattenerTest.java)
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestStackFlattener.java?p2=incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestStackFlattener.java&p1=incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/StackFlattenerTest.java&r1=1212286&r2=1212291&rev=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/StackFlattenerTest.java (original)
+++ incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestStackFlattener.java Fri Dec 9 09:13:06 2011
@@ -39,7 +39,7 @@ import org.testng.annotations.BeforeMeth
import org.testng.annotations.Test;
import static org.testng.AssertJUnit.assertEquals;
-public class StackFlattenerTest {
+public class TestStackFlattener {
Stacks stacks;
Stack parentStack;
Copied: incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/TestClusterImpl.java (from r1212288, incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/ClusterImplTest.java)
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/TestClusterImpl.java?p2=incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/TestClusterImpl.java&p1=incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/ClusterImplTest.java&r1=1212288&r2=1212291&rev=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/ClusterImplTest.java (original)
+++ incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/resource/statemachine/TestClusterImpl.java Fri Dec 9 09:13:06 2011
@@ -13,7 +13,7 @@ import org.apache.ambari.components.Comp
import org.apache.ambari.controller.Cluster;
import org.testng.annotations.Test;
-public class ClusterImplTest {
+public class TestClusterImpl {
/**
* Create cluster with two components, both having active roles.
Modified: incubator/ambari/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/pom.xml?rev=1212291&r1=1212290&r2=1212291&view=diff
==============================================================================
--- incubator/ambari/trunk/pom.xml (original)
+++ incubator/ambari/trunk/pom.xml Fri Dec 9 09:13:06 2011
@@ -295,7 +295,7 @@
<phase>test</phase>
<argLine>-Xmx1024m</argLine>
<includes>
- <include>**/*Test.java</include>
+ <include>**/Test*.java</include>
</includes>
<excludes>
<exclude>**/IntegrationTest*.java</exclude>
@@ -443,7 +443,7 @@
<argLine>-Xmx1024m -Djava.library.path=.
</argLine>
<includes>
- <include>**/*Test.java</include>
+ <include>**/Test*.java</include>
</includes>
<excludes>
<exclude>**/IntegrationTest.java</exclude>
@@ -477,7 +477,7 @@
<include>**/IntegrationTest*.java</include>
</includes>
<excludes>
- <exclude>**/*Test.java</exclude>
+ <exclude>**/Test*.java</exclude>
<exclude>**/PerformanceTest*.java</exclude>
</excludes>
<skipTests>false</skipTests>