You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2013/03/06 20:06:14 UTC
svn commit: r1453482 - in /incubator/ambari/trunk: ./
ambari-agent/src/main/python/ambari_agent/ ambari-agent/src/test/python/
ambari-agent/src/test/python/dummy_files/
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/
ambari-server/s...
Author: swagle
Date: Wed Mar 6 19:06:13 2013
New Revision: 1453482
URL: http://svn.apache.org/r1453482
Log:
AMBARI-1560. Upgrade action/task support in server. (Sumit Mohanty via swagle)
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
incubator/ambari/trunk/ambari-agent/src/test/python/TestLiveStatus.py
incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py
incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Wed Mar 6 19:06:13 2013
@@ -12,6 +12,8 @@ Trunk (unreleased changes):
NEW FEATURES
+ AMBARI-1560. Upgrade action/task support in server. (Sumit Mohanty via swagle)
+
AMBARI-1553. List cluster-level configurations with host-level, if any (ncole)
AMBARI-1557. Adding Hue service to the HDP stack definition along with the
Modified: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py Wed Mar 6 19:06:13 2013
@@ -110,7 +110,8 @@ class ActionQueue(threading.Thread):
component = command['componentName']
globalConfig = command['configurations']['global']
try:
- livestatus = LiveStatus(cluster, service, component, globalConfig)
+ livestatus = LiveStatus(cluster, service, component,
+ globalConfig, self.config)
result = livestatus.build()
logger.info("Got live status for component " + component + " of service " + str(service) +\
" of cluster " + str(cluster) + "\n" + pprint.pformat(result))
Modified: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/LiveStatus.py?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/LiveStatus.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/LiveStatus.py Wed Mar 6 19:06:13 2013
@@ -26,6 +26,7 @@ import socket
import time
import traceback
from pprint import pprint, pformat
+from StackVersionsFileHandler import StackVersionsFileHandler
logger = logging.getLogger()
@@ -91,11 +92,13 @@ class LiveStatus:
LIVE_STATUS = "STARTED"
DEAD_STATUS = "INSTALLED"
- def __init__(self, cluster, service, component, globalConfig):
+ def __init__(self, cluster, service, component, globalConfig, config):
self.cluster = cluster
self.service = service
self.component = component
self.globalConfig = globalConfig
+ versionsFileDir = config.get('agent', 'prefix')
+ self.versionsHandler = StackVersionsFileHandler(versionsFileDir)
def belongsToService(self, component):
@@ -117,7 +120,9 @@ class LiveStatus:
"msg" : "",
"status" : status,
"clusterName" : self.cluster,
- "serviceName" : self.service
+ "serviceName" : self.service,
+ "stackVersion": self.versionsHandler.
+ read_stack_version(component["componentName"])
}
break
logger.info("The live status for component " + str(self.component) + " of service " + \
Modified: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py Wed Mar 6 19:06:13 2013
@@ -66,7 +66,7 @@ class UpgradeExecutor:
srcStackTuple = self.split_stack_version(srcStack)
tgtStackTuple = self.split_stack_version(srcStack)
- if srcStackTuple == None or tgtStackTuple == None:
+ if srcStackTuple is None or tgtStackTuple is None:
errorstr = "Source (%s) or target (%s) version does not match pattern \
<Name>-<Version>" % (srcStack, tgtStack)
logger.info(errorstr)
Modified: incubator/ambari/trunk/ambari-agent/src/test/python/TestLiveStatus.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestLiveStatus.py?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestLiveStatus.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestLiveStatus.py Wed Mar 6 19:06:13 2013
@@ -20,15 +20,21 @@ limitations under the License.
from unittest import TestCase
from ambari_agent.LiveStatus import LiveStatus
-from ambari_agent import AmbariConfig
+from ambari_agent.AmbariConfig import AmbariConfig
import socket
import os
class TestLiveStatus(TestCase):
def test_build(self):
for component in LiveStatus.COMPONENTS:
- livestatus = LiveStatus('', component['serviceName'], component['componentName'], {})
+ config = AmbariConfig().getConfig()
+ config.set('agent', 'prefix', "dummy_files")
+ livestatus = LiveStatus('', component['serviceName'], component['componentName'], {}, config)
+ livestatus.versionsHandler.versionsFilePath = os.path.join("dummy_files","dummy_current_stack")
result = livestatus.build()
print "LiveStatus of {0}: {1}".format(component['serviceName'], str(result))
self.assertEquals(len(result) > 0, True, 'Livestatus should not be empty')
+ if component['componentName'] == 'GANGLIA_SERVER':
+ self.assertEquals(result['stackVersion'],'HDP-1.2.2',
+ 'Livestatus should contain component stack version')
Modified: incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestStackVersionsFileHandler.py Wed Mar 6 19:06:13 2013
@@ -41,9 +41,9 @@ class TestStackVersionsFileHandler(TestC
@patch.object(stackVersionsFileHandler, 'touch_file')
def test_read_stack_version(self, touch_method):
stackVersionsFileHandler.versionsFilePath = dummyVersionsFile
- result = stackVersionsFileHandler.read_stack_version("NAGIOS")
+ result = stackVersionsFileHandler.read_stack_version("NAGIOS_SERVER")
self.assertEquals(result, "HDP-1.2.1")
- result = stackVersionsFileHandler.read_stack_version("GANGLIA")
+ result = stackVersionsFileHandler.read_stack_version("GANGLIA_SERVER")
self.assertEquals(result, "HDP-1.2.2")
result = stackVersionsFileHandler.read_stack_version("NOTEXISTING")
self.assertEquals(result, stackVersionsFileHandler.DEFAULT_VER)
@@ -55,18 +55,18 @@ class TestStackVersionsFileHandler(TestC
stackVersionsFileHandler.versionsFilePath = dummyVersionsFile
result = stackVersionsFileHandler.read_all_stack_versions()
self.assertEquals(len(result.keys()), 4)
- self.assertEquals(result["NAGIOS"], "HDP-1.2.1")
- self.assertEquals(result["HBASE"], "HDP-1.3.0")
+ self.assertEquals(result["NAGIOS_SERVER"], "HDP-1.2.1")
+ self.assertEquals(result["HCATALOG"], "HDP-1.3.0")
self.assertTrue(touch_method.called)
def test_extract(self):
- s = " NAGIOS \t HDP-1.3.0 "
+ s = " NAGIOS_SERVER \t HDP-1.3.0 "
comp, ver = stackVersionsFileHandler.extract(s)
- self.assertEqual(comp, "NAGIOS")
+ self.assertEqual(comp, "NAGIOS_SERVER")
self.assertEqual(ver, "HDP-1.3.0")
# testing wrong value
- s = " NAGIOS "
+ s = " NAGIOS_SERVER "
comp, ver = stackVersionsFileHandler.extract(s)
self.assertEqual(comp, stackVersionsFileHandler.DEFAULT_VER)
self.assertEqual(ver, stackVersionsFileHandler.DEFAULT_VER)
Modified: incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack (original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/dummy_files/dummy_current_stack Wed Mar 6 19:06:13 2013
@@ -1,4 +1,4 @@
-HDFS HDP-1.2.0
-NAGIOS HDP-1.2.1
-HBASE HDP-1.3.0
-GANGLIA HDP-1.2.2
+DATANODE HDP-1.2.0
+NAGIOS_SERVER HDP-1.2.1
+HCATALOG HDP-1.3.0
+GANGLIA_SERVER HDP-1.2.2
Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java Wed Mar 6 19:06:13 2013
@@ -89,6 +89,9 @@ public class ActionManager {
return db.getAction(StageUtils.getActionId(requestId, stageId));
}
+ /**
+ * Persists command reports into the db
+ */
public void processTaskResponse(String hostname, List<CommandReport> reports) {
if (reports == null) {
return;
Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java Wed Mar 6 19:06:13 2013
@@ -94,11 +94,7 @@ public class HeartBeatHandler {
if (currentResponseId == null) {
//Server restarted, or unknown host.
LOG.error("CurrentResponseId unknown - send register command");
- response = new HeartBeatResponse();
- RegistrationCommand regCmd = new RegistrationCommand();
- response.setResponseId(0);
- response.setRegistrationCommand(regCmd);
- return response;
+ return createRegisterCommand();
}
if (LOG.isDebugEnabled()) {
@@ -113,10 +109,7 @@ public class HeartBeatHandler {
return hostResponses.get(hostname);
}else if (heartbeat.getResponseId() != currentResponseId) {
LOG.error("Error in responseId sequence - sending agent restart command");
- response = new HeartBeatResponse();
- response.setRestartAgent(true);
- response.setResponseId(currentResponseId);
- return response;
+ return createRestartCommand(currentResponseId);
}
response = new HeartBeatResponse();
@@ -127,11 +120,7 @@ public class HeartBeatHandler {
if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {
// After loosing heartbeat agent should reregister
LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");
- response = new HeartBeatResponse();
- RegistrationCommand regCmd = new RegistrationCommand();
- response.setResponseId(0);
- response.setRegistrationCommand(regCmd);
- return response;
+ return createRegisterCommand();
}
hostResponseIds.put(hostname, currentResponseId);
@@ -161,18 +150,28 @@ public class HeartBeatHandler {
} catch (InvalidStateTransitionException ex) {
LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);
hostObject.setState(HostState.INIT);
- RegistrationCommand regCmd = new RegistrationCommand();
- response.setRegistrationCommand(regCmd);
- return response;
+ return createRegisterCommand();
}
//Examine heartbeat for command reports
+ processCommandReports(heartbeat, hostname, clusterFsm, now);
+
+ // Examine heartbeart for component live status reports
+ processStatusReports(heartbeat, hostname, clusterFsm);
+
+ // Send commands if node is active
+ if (hostObject.getState().equals(HostState.HEALTHY)) {
+ sendCommands(hostname, response);
+ }
+ return response;
+ }
+
+ protected void processCommandReports(HeartBeat heartbeat,
+ String hostname,
+ Clusters clusterFsm, long now) throws
+ AmbariException {
List<CommandReport> reports = heartbeat.getReports();
for (CommandReport report : reports) {
- String clusterName = report.getClusterName();
- if ((clusterName == null) || "".equals(clusterName)) {
- clusterName = "cluster1";
- }
Cluster cl = clusterFsm.getCluster(report.getClusterName());
String service = report.getServiceName();
if (service == null || "".equals(service)) {
@@ -185,18 +184,23 @@ public class HeartBeatHandler {
Service svc = cl.getService(service);
ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
+ String schName = scHost.getServiceComponentName();
if (report.getStatus().equals("COMPLETED")) {
- scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(scHost
- .getServiceComponentName(), hostname, now));
+ // Updating stack version, if needed
+ if (scHost.getState().equals(State.UPGRADING)) {
+ scHost.setStackVersion(scHost.getDesiredStackVersion());
+ }
+ scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
+ hostname, now));
} else if (report.getStatus().equals("FAILED")) {
- scHost.handleEvent(new ServiceComponentHostOpFailedEvent(scHost
- .getServiceComponentName(), hostname, now));
+ scHost.handleEvent(new ServiceComponentHostOpFailedEvent(schName,
+ hostname, now));
} else if (report.getStatus().equals("IN_PROGRESS")) {
- scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(scHost
- .getServiceComponentName(), hostname, now));
+ scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
+ hostname, now));
}
} catch (ServiceComponentNotFoundException scnex) {
- LOG.info("Service component not found ", scnex);
+ LOG.warn("Service component not found ", scnex);
} catch (InvalidStateTransitionException ex) {
LOG.warn("State machine exception", ex);
}
@@ -204,8 +208,12 @@ public class HeartBeatHandler {
}
//Update state machines from reports
actionManager.processTaskResponse(hostname, reports);
+ }
- // Examine heartbeart for component live status reports
+ protected void processStatusReports(HeartBeat heartbeat,
+ String hostname,
+ Clusters clusterFsm) throws
+ AmbariException {
Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
for (Cluster cl : clusters) {
for (ComponentStatus status : heartbeat.componentStatus) {
@@ -221,16 +229,16 @@ public class HeartBeatHandler {
State prevState = scHost.getState();
State liveState = State.valueOf(State.class, status.getStatus());
if (prevState.equals(State.INSTALLED)
- || prevState.equals(State.START_FAILED)
- || prevState.equals(State.STARTED)
- || prevState.equals(State.STOP_FAILED)) {
+ || prevState.equals(State.START_FAILED)
+ || prevState.equals(State.STARTED)
+ || prevState.equals(State.STOP_FAILED)) {
scHost.setState(liveState);
if (!prevState.equals(liveState)) {
LOG.info("State of service component " + componentName
- + " of service " + status.getServiceName()
- + " of cluster " + status.getClusterName()
- + " has changed from " + prevState + " to " + liveState
- + " at host " + hostname);
+ + " of service " + status.getServiceName()
+ + " of cluster " + status.getClusterName()
+ + " has changed from " + prevState + " to " + liveState
+ + " at host " + hostname);
}
}
@@ -246,73 +254,76 @@ public class HeartBeatHandler {
}
catch (ServiceNotFoundException e) {
LOG.warn("Received a live status update for a non-initialized"
- + " service"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName());
+ + " service"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName());
// FIXME ignore invalid live update and continue for now?
continue;
}
catch (ServiceComponentNotFoundException e) {
LOG.warn("Received a live status update for a non-initialized"
- + " servicecomponent"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName()
- + ", componentName=" + status.getComponentName());
+ + " servicecomponent"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName()
+ + ", componentName=" + status.getComponentName());
// FIXME ignore invalid live update and continue for now?
continue;
}
catch (ServiceComponentHostNotFoundException e) {
LOG.warn("Received a live status update for a non-initialized"
- + " service"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName()
- + ", componentName=" + status.getComponentName()
- + ", hostname=" + hostname);
+ + " service"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName()
+ + ", componentName=" + status.getComponentName()
+ + ", hostname=" + hostname);
// FIXME ignore invalid live update and continue for now?
continue;
}
catch (RuntimeException e) {
LOG.warn("Received a live status with invalid payload"
- + " service"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName()
- + ", componentName=" + status.getComponentName()
- + ", hostname=" + hostname
- + ", error=" + e.getMessage());
+ + " service"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName()
+ + ", componentName=" + status.getComponentName()
+ + ", hostname=" + hostname
+ + ", error=" + e.getMessage());
continue;
}
}
}
}
+ }
- // Send commands if node is active
- if (hostObject.getState().equals(HostState.HEALTHY)) {
- List<AgentCommand> cmds = actionQueue.dequeueAll(heartbeat.getHostname());
- if (cmds != null && !cmds.isEmpty()) {
- for (AgentCommand ac : cmds) {
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac));
- }
- } catch (Exception e) {
- throw new AmbariException("Could not get jaxb string for command", e);
+ /**
+ * Adds commands from action queue to a heartbeat responce
+ */
+ protected void sendCommands(String hostname, HeartBeatResponse response)
+ throws AmbariException {
+ List<AgentCommand> cmds = actionQueue.dequeueAll(hostname);
+ if (cmds != null && !cmds.isEmpty()) {
+ for (AgentCommand ac : cmds) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac));
}
- switch (ac.getCommandType()) {
- case EXECUTION_COMMAND: {
- response.addExecutionCommand((ExecutionCommand) ac);
- break;
- }
- case STATUS_COMMAND: {
- response.addStatusCommand((StatusCommand) ac);
- break;
- }
- default:
- LOG.error("There is no action for agent command ="+ ac.getCommandType().name() );
+ } catch (Exception e) {
+ throw new AmbariException("Could not get jaxb string for command", e);
+ }
+ switch (ac.getCommandType()) {
+ case EXECUTION_COMMAND: {
+ response.addExecutionCommand((ExecutionCommand) ac);
+ break;
+ }
+ case STATUS_COMMAND: {
+ response.addStatusCommand((StatusCommand) ac);
+ break;
}
+ default:
+ LOG.error("There is no action for agent command ="+
+ ac.getCommandType().name() );
}
}
}
- return response;
}
public String getOsType(String os, String osRelease) {
@@ -329,6 +340,21 @@ public class HeartBeatHandler {
return osType.toLowerCase();
}
+ protected HeartBeatResponse createRegisterCommand() {
+ HeartBeatResponse response = new HeartBeatResponse();
+ RegistrationCommand regCmd = new RegistrationCommand();
+ response.setResponseId(0);
+ response.setRegistrationCommand(regCmd);
+ return response;
+ }
+
+ protected HeartBeatResponse createRestartCommand(Long currentResponseId) {
+ HeartBeatResponse response = new HeartBeatResponse();
+ response.setRestartAgent(true);
+ response.setResponseId(currentResponseId);
+ return response;
+ }
+
public RegistrationResponse handleRegistration(Register register)
throws InvalidStateTransitionException, AmbariException {
String hostname = register.getHostname();
Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java Wed Mar 6 19:06:13 2013
@@ -173,6 +173,10 @@ public class ServiceComponentHostImpl im
State.STOPPING,
ServiceComponentHostEventType.HOST_SVCCOMP_STOP,
new ServiceComponentHostOpStartedTransition())
+ .addTransition(State.INSTALLED,
+ State.UPGRADING,
+ ServiceComponentHostEventType.HOST_SVCCOMP_UPGRADE,
+ new ServiceComponentHostOpStartedTransition())
.addTransition(State.STARTING,
State.STARTING,
@@ -333,10 +337,7 @@ public class ServiceComponentHostImpl im
ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
new ServiceComponentHostOpCompletedTransition())
- .addTransition(State.INSTALLED,
- State.INSTALLED,
- ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
- new ServiceComponentHostOpCompletedTransition())
+
.addTransition(State.INSTALLING,
State.INSTALLING,
@@ -361,6 +362,10 @@ public class ServiceComponentHostImpl im
ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,
new ServiceComponentHostOpStartedTransition())
+ .addTransition(State.INSTALLED,
+ State.INSTALLED,
+ ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
+ new ServiceComponentHostOpCompletedTransition())
.addTransition(State.INSTALLED,
State.UNINSTALLING,
ServiceComponentHostEventType.HOST_SVCCOMP_UNINSTALL,
@@ -369,6 +374,10 @@ public class ServiceComponentHostImpl im
State.INSTALLING,
ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,
new ServiceComponentHostOpStartedTransition())
+ .addTransition(State.INSTALLED,
+ State.UPGRADING,
+ ServiceComponentHostEventType.HOST_SVCCOMP_UPGRADE,
+ new ServiceComponentHostOpStartedTransition())
.addTransition(State.UPGRADING,
State.UPGRADING,
Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java?rev=1453482&r1=1453481&r2=1453482&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java Wed Mar 6 19:06:13 2013
@@ -760,6 +760,248 @@ public class TestHeartbeatHandler {
stack130, serviceComponentHost3.getStackVersion());
}
+ @Test
+ public void testComponentUpgradeCompleteReport() throws AmbariException, InvalidStateTransitionException {
+ ActionManager am = new ActionManager(0, 0, null, null,
+ new ActionDBInMemoryImpl(), new HostsMap((String) null));
+ Cluster cluster = getDummyCluster();
+
+ @SuppressWarnings("serial")
+ Set<String> hostNames = new HashSet<String>(){{
+ add(DummyHostname1);
+ }};
+
+ clusters.mapHostsToCluster(hostNames, DummyCluster);
+
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(NAMENODE).persist();
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(HDFS_CLIENT).persist();
+ hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+ StackId stack130 = new StackId("HDP-1.3.0");
+ StackId stack122 = new StackId("HDP-1.2.2");
+
+ serviceComponentHost1.setState(State.UPGRADING);
+ serviceComponentHost2.setState(State.INSTALLING);
+
+ serviceComponentHost1.setStackVersion(stack122);
+ serviceComponentHost1.setDesiredStackVersion(stack130);
+ serviceComponentHost2.setStackVersion(stack122);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(0);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+ CommandReport cr1 = new CommandReport();
+ cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr1.setTaskId(1);
+ cr1.setClusterName(DummyCluster);
+ cr1.setServiceName(HDFS);
+ cr1.setRole(DATANODE);
+ cr1.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr1.setStdErr("none");
+ cr1.setStdOut("dummy output");
+ cr1.setExitCode(0);
+
+ CommandReport cr2 = new CommandReport();
+ cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr2.setTaskId(2);
+ cr2.setClusterName(DummyCluster);
+ cr2.setServiceName(HDFS);
+ cr2.setRole(NAMENODE);
+ cr2.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr2.setStdErr("none");
+ cr2.setStdOut("dummy output");
+ cr2.setExitCode(0);
+ ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+ reports.add(cr1);
+ reports.add(cr2);
+ hb.setReports(reports);
+
+ ActionQueue aq = new ActionQueue();
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+ handler.handleHeartBeat(hb);
+ assertEquals("Stack version for SCH should be updated to " +
+ serviceComponentHost1.getDesiredStackVersion(),
+ stack130, serviceComponentHost1.getStackVersion());
+ assertEquals("Stack version for SCH should not change ",
+ stack122, serviceComponentHost2.getStackVersion());
+ }
+
+ @Test
+ public void testComponentUpgradeInProgressReport() throws AmbariException, InvalidStateTransitionException {
+ ActionManager am = new ActionManager(0, 0, null, null,
+ new ActionDBInMemoryImpl(), new HostsMap((String) null));
+ Cluster cluster = getDummyCluster();
+
+ @SuppressWarnings("serial")
+ Set<String> hostNames = new HashSet<String>(){{
+ add(DummyHostname1);
+ }};
+
+ clusters.mapHostsToCluster(hostNames, DummyCluster);
+
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(NAMENODE).persist();
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(HDFS_CLIENT).persist();
+ hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+ StackId stack130 = new StackId("HDP-1.3.0");
+ StackId stack122 = new StackId("HDP-1.2.2");
+
+ serviceComponentHost1.setState(State.UPGRADING);
+ serviceComponentHost2.setState(State.INSTALLING);
+
+ serviceComponentHost1.setStackVersion(stack122);
+ serviceComponentHost1.setDesiredStackVersion(stack130);
+ serviceComponentHost2.setStackVersion(stack122);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(0);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+ CommandReport cr1 = new CommandReport();
+ cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr1.setTaskId(1);
+ cr1.setClusterName(DummyCluster);
+ cr1.setServiceName(HDFS);
+ cr1.setRole(DATANODE);
+ cr1.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr1.setStdErr("none");
+ cr1.setStdOut("dummy output");
+ cr1.setExitCode(777);
+
+ CommandReport cr2 = new CommandReport();
+ cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr2.setTaskId(2);
+ cr2.setClusterName(DummyCluster);
+ cr2.setServiceName(HDFS);
+ cr2.setRole(NAMENODE);
+ cr2.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr2.setStdErr("none");
+ cr2.setStdOut("dummy output");
+ cr2.setExitCode(777);
+ ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+ reports.add(cr1);
+ reports.add(cr2);
+ hb.setReports(reports);
+
+ ActionQueue aq = new ActionQueue();
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+ handler.handleHeartBeat(hb);
+ assertEquals("State of SCH not change while operation is in progress",
+ State.UPGRADING, serviceComponentHost1.getState());
+ assertEquals("Stack version of SCH should not change after in progress report",
+ stack130, serviceComponentHost1.getDesiredStackVersion());
+ assertEquals("State of SCH not change while operation is in progress",
+ State.INSTALLING, serviceComponentHost2.getState());
+ }
+
+
+ @Test
+ public void testComponentUpgradeFailReport() throws AmbariException, InvalidStateTransitionException {
+ ActionManager am = new ActionManager(0, 0, null, null,
+ new ActionDBInMemoryImpl(), new HostsMap((String) null));
+ Cluster cluster = getDummyCluster();
+
+ @SuppressWarnings("serial")
+ Set<String> hostNames = new HashSet<String>(){{
+ add(DummyHostname1);
+ }};
+
+ clusters.mapHostsToCluster(hostNames, DummyCluster);
+
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(NAMENODE).persist();
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(HDFS_CLIENT).persist();
+ hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+ StackId stack130 = new StackId("HDP-1.3.0");
+ StackId stack122 = new StackId("HDP-1.2.2");
+
+ serviceComponentHost1.setState(State.UPGRADING);
+ serviceComponentHost2.setState(State.INSTALLING);
+
+ serviceComponentHost1.setStackVersion(stack122);
+ serviceComponentHost1.setDesiredStackVersion(stack130);
+ serviceComponentHost2.setStackVersion(stack122);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(0);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+ CommandReport cr1 = new CommandReport();
+ cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr1.setTaskId(1);
+ cr1.setClusterName(DummyCluster);
+ cr1.setServiceName(HDFS);
+ cr1.setRole(DATANODE);
+ cr1.setStatus(HostRoleStatus.FAILED.toString());
+ cr1.setStdErr("none");
+ cr1.setStdOut("dummy output");
+ cr1.setExitCode(0);
+
+ CommandReport cr2 = new CommandReport();
+ cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr2.setTaskId(2);
+ cr2.setClusterName(DummyCluster);
+ cr2.setServiceName(HDFS);
+ cr2.setRole(NAMENODE);
+ cr2.setStatus(HostRoleStatus.FAILED.toString());
+ cr2.setStdErr("none");
+ cr2.setStdOut("dummy output");
+ cr2.setExitCode(0);
+ ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+ reports.add(cr1);
+ reports.add(cr2);
+ hb.setReports(reports);
+
+ ActionQueue aq = new ActionQueue();
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+ handler.handleHeartBeat(hb);
+ assertEquals("State of SCH should change after fail report",
+ State.UPGRADE_FAILED, serviceComponentHost1.getState());
+ assertEquals("State of SCH should change after fail report",
+ State.INSTALL_FAILED, serviceComponentHost2.getState());
+ assertEquals("Stack version of SCH should not change after fail report",
+ stack122, serviceComponentHost1.getStackVersion());
+ assertEquals("Stack version of SCH should not change after fail report",
+ stack130, serviceComponentHost1.getDesiredStackVersion());
+ assertEquals("Stack version of SCH should not change after fail report",
+ State.INSTALL_FAILED, serviceComponentHost2.getState());
+ }
+
+
private ComponentStatus createComponentStatus(String clusterName, String serviceName, String message,
State state, String componentName, String stackVersion) {
ComponentStatus componentStatus1 = new ComponentStatus();