You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2016/02/23 20:04:57 UTC
[1/3] ambari git commit: AMBARI-15141. Start all services request
aborts in the middle and hosts go into heartbeat-lost state. (mpapirkovskyy)
Repository: ambari
Updated Branches:
refs/heads/trunk 9d7ff5f14 -> 083ac6dab
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index 2a4cec8..c62352a 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -54,10 +54,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import javax.xml.bind.JAXBException;
@@ -65,47 +63,28 @@ import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.actionmanager.ActionDBAccessor;
-import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl;
import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.actionmanager.Request;
-import org.apache.ambari.server.actionmanager.RequestFactory;
import org.apache.ambari.server.actionmanager.Stage;
import org.apache.ambari.server.actionmanager.StageFactory;
import org.apache.ambari.server.agent.HostStatus.Status;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
-import org.apache.ambari.server.controller.HostsMap;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
-import org.apache.ambari.server.orm.OrmTestHelper;
-import org.apache.ambari.server.orm.dao.ClusterDAO;
-import org.apache.ambari.server.orm.dao.HostDAO;
-import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
-import org.apache.ambari.server.orm.dao.ResourceTypeDAO;
-import org.apache.ambari.server.orm.dao.StackDAO;
-import org.apache.ambari.server.orm.entities.ClusterEntity;
-import org.apache.ambari.server.orm.entities.HostEntity;
-import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
-import org.apache.ambari.server.orm.entities.ResourceEntity;
-import org.apache.ambari.server.orm.entities.ResourceTypeEntity;
-import org.apache.ambari.server.orm.entities.StackEntity;
-import org.apache.ambari.server.security.authorization.ResourceType;
import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileWriter;
import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileWriterFactory;
import org.apache.ambari.server.serveraction.kerberos.KerberosServerAction;
import org.apache.ambari.server.state.Alert;
-import org.apache.ambari.server.state.AlertState;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostHealthStatus;
import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.MaintenanceState;
-import org.apache.ambari.server.state.RepositoryVersionState;
import org.apache.ambari.server.state.SecurityState;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponentHost;
@@ -113,9 +92,6 @@ import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.State;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
-import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
-import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
-import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.apache.ambari.server.utils.StageUtils;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.DigestUtils;
@@ -128,7 +104,6 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.JsonObject;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
@@ -145,8 +120,6 @@ public class TestHeartbeatHandler {
long requestId = 23;
long stageId = 31;
- private final static StackId HDP_22_STACK = new StackId("HDP", "2.2.0");
-
@Inject
AmbariMetaInfo metaInfo;
@@ -157,26 +130,14 @@ public class TestHeartbeatHandler {
ActionDBAccessor actionDBAccessor;
@Inject
- OrmTestHelper helper;
-
- @Inject
- ResourceTypeDAO resourceTypeDAO;
-
- @Inject
- StackDAO stackDAO;
-
- @Inject
- ClusterDAO clusterDAO;
-
- @Inject
- HostDAO hostDAO;
-
- @Inject
StageFactory stageFactory;
@Inject
HostRoleCommandFactory hostRoleCommandFactory;
+ @Inject
+ HeartbeatTestHelper heartbeatTestHelper;
+
private UnitOfWork unitOfWork;
@Rule
@@ -187,18 +148,7 @@ public class TestHeartbeatHandler {
@Before
public void setup() throws Exception {
- module = new InMemoryDefaultTestModule(){
-
- @Override
- protected void configure() {
- getProperties().put("recovery.type", "FULL");
- getProperties().put("recovery.lifetime_max_count", "10");
- getProperties().put("recovery.max_count", "4");
- getProperties().put("recovery.window_in_minutes", "23");
- getProperties().put("recovery.retry_interval", "2");
- super.configure();
- }
- };
+ module = HeartbeatTestHelper.getTestModule();
injector = Guice.createInjector(module);
injector.getInstance(GuiceJpaInitializer.class);
clusters = injector.getInstance(Clusters.class);
@@ -215,7 +165,7 @@ public class TestHeartbeatHandler {
@Test
@SuppressWarnings("unchecked")
public void testHeartbeat() throws Exception {
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(new ArrayList<HostRoleCommand>());
replay(am);
Clusters fsm = clusters;
@@ -257,387 +207,16 @@ public class TestHeartbeatHandler {
assertEquals(0, aq.dequeueAll(DummyHostname1).size());
}
- @Test
- @SuppressWarnings("unchecked")
- public void testHeartbeatWithConfigs() throws Exception {
- Cluster cluster = getDummyCluster();
- Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
- hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-
- ActionQueue aq = new ActionQueue();
-
- ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
- ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
- serviceComponentHost1.setState(State.INSTALLED);
- serviceComponentHost2.setState(State.INSTALLED);
-
- HeartBeat hb = new HeartBeat();
- hb.setResponseId(0);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- hb.setHostname(DummyHostname1);
-
- List<CommandReport> reports = new ArrayList<CommandReport>();
- CommandReport cr = new CommandReport();
- cr.setActionId(StageUtils.getActionId(requestId, stageId));
- cr.setServiceName(HDFS);
- cr.setTaskId(1);
- cr.setRole(DATANODE);
- cr.setStatus("COMPLETED");
- cr.setStdErr("");
- cr.setStdOut("");
- cr.setExitCode(215);
- cr.setRoleCommand("START");
- cr.setClusterName(DummyCluster);
-
- cr.setConfigurationTags(new HashMap<String, Map<String, String>>() {{
- put("global", new HashMap<String, String>() {{
- put("tag", "version1");
- }});
- }});
-
- reports.add(cr);
- hb.setReports(reports);
-
- HostEntity host1 = hostDAO.findByName(DummyHostname1);
- Assert.assertNotNull(host1);
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
-
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>() {{
- add(command);
- }});
- replay(am);
-
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
-
- // the heartbeat test passed if actual configs is populated
- Assert.assertNotNull(serviceComponentHost1.getActualConfigs());
- Assert.assertEquals(serviceComponentHost1.getActualConfigs().size(), 1);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testRestartRequiredAfterInstallClient() throws Exception {
- Cluster cluster = getDummyCluster();
- Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(HDFS_CLIENT).persist();
- hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
-
- ActionQueue aq = new ActionQueue();
-
- ServiceComponentHost serviceComponentHost = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(HDFS_CLIENT).getServiceComponentHost(DummyHostname1);
-
- serviceComponentHost.setState(State.INSTALLED);
- serviceComponentHost.setRestartRequired(true);
-
- HeartBeat hb = new HeartBeat();
- hb.setResponseId(0);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- hb.setHostname(DummyHostname1);
-
-
- List<CommandReport> reports = new ArrayList<CommandReport>();
- CommandReport cr = new CommandReport();
- cr.setActionId(StageUtils.getActionId(requestId, stageId));
- cr.setServiceName(HDFS);
- cr.setRoleCommand("INSTALL");
- cr.setCustomCommand("EXECUTION_COMMAND");
- cr.setTaskId(1);
- cr.setRole(HDFS_CLIENT);
- cr.setStatus("COMPLETED");
- cr.setStdErr("");
- cr.setStdOut("");
- cr.setExitCode(215);
- cr.setClusterName(DummyCluster);
- cr.setConfigurationTags(new HashMap<String, Map<String, String>>() {{
- put("global", new HashMap<String, String>() {{
- put("tag", "version1");
- }});
- }});
- reports.add(cr);
- hb.setReports(reports);
-
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
-
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>() {{
- add(command);
- add(command);
- }});
- replay(am);
-
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
-
- Assert.assertNotNull(serviceComponentHost.getActualConfigs());
- Assert.assertFalse(serviceComponentHost.isRestartRequired());
- Assert.assertEquals(serviceComponentHost.getActualConfigs().size(), 1);
-
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testHeartbeatCustomCommandWithConfigs() throws Exception {
- Cluster cluster = getDummyCluster();
- Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
- hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-
- ActionQueue aq = new ActionQueue();
-
- ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
- ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
- serviceComponentHost1.setState(State.INSTALLED);
- serviceComponentHost2.setState(State.INSTALLED);
-
- HeartBeat hb = new HeartBeat();
- hb.setResponseId(0);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- hb.setHostname(DummyHostname1);
-
- List<CommandReport> reports = new ArrayList<CommandReport>();
- CommandReport cr = new CommandReport();
- cr.setActionId(StageUtils.getActionId(requestId, stageId));
- cr.setServiceName(HDFS);
- cr.setRoleCommand("CUSTOM_COMMAND");
- cr.setCustomCommand("RESTART");
- cr.setTaskId(1);
- cr.setRole(DATANODE);
- cr.setStatus("COMPLETED");
- cr.setStdErr("");
- cr.setStdOut("");
- cr.setExitCode(215);
- cr.setClusterName(DummyCluster);
- cr.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
- put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
- }});
- CommandReport crn = new CommandReport();
- crn.setActionId(StageUtils.getActionId(requestId, stageId));
- crn.setServiceName(HDFS);
- crn.setRoleCommand("CUSTOM_COMMAND");
- crn.setCustomCommand("START");
- crn.setTaskId(1);
- crn.setRole(NAMENODE);
- crn.setStatus("COMPLETED");
- crn.setStdErr("");
- crn.setStdOut("");
- crn.setExitCode(215);
- crn.setClusterName(DummyCluster);
- crn.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
- put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
- }});
-
- reports.add(cr);
- reports.add(crn);
- hb.setReports(reports);
-
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
-
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>() {{
- add(command);
- add(command);
- }});
- replay(am);
-
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
-
- // the heartbeat test passed if actual configs is populated
- Assert.assertNotNull(serviceComponentHost1.getActualConfigs());
- Assert.assertEquals(serviceComponentHost1.getActualConfigs().size(), 1);
- Assert.assertNotNull(serviceComponentHost2.getActualConfigs());
- Assert.assertEquals(serviceComponentHost2.getActualConfigs().size(), 1);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testHeartbeatCustomStartStop() throws Exception {
- Cluster cluster = getDummyCluster();
- Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
- hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-
- ActionQueue aq = new ActionQueue();
-
- ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
- ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
- serviceComponentHost1.setState(State.INSTALLED);
- serviceComponentHost2.setState(State.STARTED);
- serviceComponentHost1.setRestartRequired(true);
- serviceComponentHost2.setRestartRequired(true);
-
- HeartBeat hb = new HeartBeat();
- hb.setResponseId(0);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- hb.setHostname(DummyHostname1);
-
- List<CommandReport> reports = new ArrayList<CommandReport>();
- CommandReport cr = new CommandReport();
- cr.setActionId(StageUtils.getActionId(requestId, stageId));
- cr.setServiceName(HDFS);
- cr.setRoleCommand("CUSTOM_COMMAND");
- cr.setCustomCommand("START");
- cr.setTaskId(1);
- cr.setRole(DATANODE);
- cr.setStatus("COMPLETED");
- cr.setStdErr("");
- cr.setStdOut("");
- cr.setExitCode(215);
- cr.setClusterName(DummyCluster);
- CommandReport crn = new CommandReport();
- crn.setActionId(StageUtils.getActionId(requestId, stageId));
- crn.setServiceName(HDFS);
- crn.setRoleCommand("CUSTOM_COMMAND");
- crn.setCustomCommand("STOP");
- crn.setTaskId(1);
- crn.setRole(NAMENODE);
- crn.setStatus("COMPLETED");
- crn.setStdErr("");
- crn.setStdOut("");
- crn.setExitCode(215);
- crn.setClusterName(DummyCluster);
-
- reports.add(cr);
- reports.add(crn);
- hb.setReports(reports);
-
- assertTrue(serviceComponentHost1.isRestartRequired());
-
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
-
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>() {{
- add(command);
- add(command);
- }});
- replay(am);
-
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
-
- // the heartbeat test passed if actual configs is populated
- State componentState1 = serviceComponentHost1.getState();
- assertEquals(State.STARTED, componentState1);
- assertFalse(serviceComponentHost1.isRestartRequired());
- State componentState2 = serviceComponentHost2.getState();
- assertEquals(State.INSTALLED, componentState2);
- assertTrue(serviceComponentHost2.isRestartRequired());
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testStatusHeartbeat() throws Exception {
- Cluster cluster = getDummyCluster();
- Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
- hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
-
- ActionQueue aq = new ActionQueue();
- ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
- ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
- ServiceComponentHost serviceComponentHost3 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(SECONDARY_NAMENODE).getServiceComponentHost(DummyHostname1);
- serviceComponentHost1.setState(State.INSTALLED);
- serviceComponentHost1.setSecurityState(SecurityState.UNSECURED);
- serviceComponentHost2.setState(State.INSTALLED);
- serviceComponentHost2.setSecurityState(SecurityState.SECURING);
- serviceComponentHost3.setState(State.STARTING);
- HeartBeat hb = new HeartBeat();
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(0);
- hb.setHostname(DummyHostname1);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- hb.setReports(new ArrayList<CommandReport>());
- ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
- ComponentStatus componentStatus1 = new ComponentStatus();
- componentStatus1.setClusterName(DummyCluster);
- componentStatus1.setServiceName(HDFS);
- componentStatus1.setMessage(DummyHostStatus);
- componentStatus1.setStatus(State.STARTED.name());
- componentStatus1.setSecurityState(SecurityState.SECURED_KERBEROS.name());
- componentStatus1.setComponentName(DATANODE);
- componentStatuses.add(componentStatus1);
- ComponentStatus componentStatus2 = new ComponentStatus();
- componentStatus2.setClusterName(DummyCluster);
- componentStatus2.setServiceName(HDFS);
- componentStatus2.setMessage(DummyHostStatus);
- componentStatus2.setStatus(State.STARTED.name());
- componentStatus2.setSecurityState(SecurityState.UNSECURED.name());
- componentStatus2.setComponentName(SECONDARY_NAMENODE);
- componentStatuses.add(componentStatus2);
- hb.setComponentStatus(componentStatuses);
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>() {{
- add(command);
- add(command);
- }});
- replay(am);
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
- State componentState1 = serviceComponentHost1.getState();
- State componentState2 = serviceComponentHost2.getState();
- State componentState3 = serviceComponentHost3.getState();
- assertEquals(State.STARTED, componentState1);
- assertEquals(SecurityState.SECURED_KERBEROS, serviceComponentHost1.getSecurityState());
- assertEquals(State.INSTALLED, componentState2);
- assertEquals(SecurityState.SECURING, serviceComponentHost2.getSecurityState());
- assertEquals(State.STARTED, componentState3);
- assertEquals(SecurityState.UNSECURED, serviceComponentHost3.getSecurityState());
- }
@Test
@SuppressWarnings("unchecked")
public void testStatusHeartbeatWithAnnotation() throws Exception {
- Cluster cluster = getDummyCluster();
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
hdfs.persist();
hdfs.addServiceComponent(DATANODE).persist();
@@ -658,14 +237,14 @@ public class TestHeartbeatHandler {
final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
Role.DATANODE, null, null);
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(
new ArrayList<HostRoleCommand>() {{
add(command);
}}).anyTimes();
replay(am);
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
HeartBeatResponse resp = handler.handleHeartBeat(hb);
Assert.assertFalse(resp.hasMappedComponents());
@@ -689,7 +268,7 @@ public class TestHeartbeatHandler {
@Test
@SuppressWarnings("unchecked")
public void testLiveStatusUpdateAfterStopFailed() throws Exception {
- Cluster cluster = getDummyCluster();
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
hdfs.persist();
hdfs.addServiceComponent(DATANODE).persist();
@@ -743,7 +322,7 @@ public class TestHeartbeatHandler {
final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
Role.DATANODE, null, null);
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(
new ArrayList<HostRoleCommand>() {{
add(command);
@@ -751,82 +330,25 @@ public class TestHeartbeatHandler {
}});
replay(am);
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ heartbeatProcessor.processHeartbeat(hb);
+
State componentState1 = serviceComponentHost1.getState();
State componentState2 = serviceComponentHost2.getState();
assertEquals(State.STARTED, componentState1);
assertEquals(State.INSTALLED, componentState2);
}
+
@Test
- public void testCommandReport() throws AmbariException {
- injector.injectMembers(this);
- clusters.addHost(DummyHostname1);
- clusters.getHost(DummyHostname1).persist();
-
- StackId dummyStackId = new StackId(DummyStackId);
- clusters.addCluster(DummyCluster, dummyStackId);
-
- ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
- ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
- new HostsMap((String) null), unitOfWork, injector.getInstance(RequestFactory.class), null, null);
- populateActionDB(db, DummyHostname1);
- Stage stage = db.getAllStages(requestId).get(0);
- Assert.assertEquals(stageId, stage.getStageId());
- stage.setHostRoleStatus(DummyHostname1, HBASE_MASTER, HostRoleStatus.QUEUED);
- db.hostRoleScheduled(stage, DummyHostname1, HBASE_MASTER);
- List<CommandReport> reports = new ArrayList<CommandReport>();
- CommandReport cr = new CommandReport();
- cr.setActionId(StageUtils.getActionId(requestId, stageId));
- cr.setTaskId(1);
- cr.setRole(HBASE_MASTER);
- cr.setStatus("COMPLETED");
- cr.setStdErr("");
- cr.setStdOut("");
- cr.setExitCode(215);
-
- cr.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
- put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
- }});
-
-
- reports.add(cr);
- am.processTaskResponse(DummyHostname1, reports, stage.getOrderedHostRoleCommands());
- assertEquals(215,
- am.getAction(requestId, stageId).getExitCode(DummyHostname1, HBASE_MASTER));
- assertEquals(HostRoleStatus.COMPLETED, am.getAction(requestId, stageId)
- .getHostRoleStatus(DummyHostname1, HBASE_MASTER));
- Stage s = db.getAllStages(requestId).get(0);
- assertEquals(HostRoleStatus.COMPLETED,
- s.getHostRoleStatus(DummyHostname1, HBASE_MASTER));
- assertEquals(215,
- s.getExitCode(DummyHostname1, HBASE_MASTER));
- }
-
- private void populateActionDB(ActionDBAccessor db, String DummyHostname1) throws AmbariException {
- Stage s = stageFactory.createNew(requestId, "/a/b", DummyCluster, 1L, "heartbeat handler test",
- "clusterHostInfo", "commandParamsStage", "hostParamsStage");
- s.setStageId(stageId);
- String filename = null;
- s.addHostRoleExecutionCommand(DummyHostname1, Role.HBASE_MASTER,
- RoleCommand.START,
- new ServiceComponentHostStartEvent(Role.HBASE_MASTER.toString(),
- DummyHostname1, System.currentTimeMillis()), DummyCluster, HBASE, false, false);
- List<Stage> stages = new ArrayList<Stage>();
- stages.add(s);
- Request request = new Request(stages, clusters);
- db.persistActions(request);
- }
-
- @Test
- public void testRegistration() throws AmbariException,
- InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
- replay(am);
- Clusters fsm = clusters;
- HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
- injector);
+ public void testRegistration() throws AmbariException,
+ InvalidStateTransitionException {
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ replay(am);
+ Clusters fsm = clusters;
+ HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
+ injector);
clusters.addHost(DummyHostname1);
Host hostObject = clusters.getHost(DummyHostname1);
hostObject.setIPv4("ipv4");
@@ -853,12 +375,12 @@ public class TestHeartbeatHandler {
@Test
public void testRegistrationRecoveryConfig() throws AmbariException,
InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
injector);
- Cluster cluster = getDummyCluster();
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
hdfs.persist();
hdfs.addServiceComponent(DATANODE).persist();
@@ -912,12 +434,12 @@ public class TestHeartbeatHandler {
@Test
public void testRegistrationRecoveryConfigMaintenanceMode()
throws AmbariException, InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
injector);
- Cluster cluster = getDummyCluster();
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
hdfs.persist();
hdfs.addServiceComponent(DATANODE).persist();
@@ -963,7 +485,7 @@ public class TestHeartbeatHandler {
@Test
public void testRegistrationAgentConfig() throws AmbariException,
InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -996,7 +518,7 @@ public class TestHeartbeatHandler {
public void testRegistrationWithBadVersion() throws AmbariException,
InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -1037,7 +559,7 @@ public class TestHeartbeatHandler {
@Test
public void testRegistrationPublicHostname() throws AmbariException, InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -1070,7 +592,7 @@ public class TestHeartbeatHandler {
@Test
public void testInvalidOSRegistration() throws AmbariException,
InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -1099,7 +621,7 @@ public class TestHeartbeatHandler {
public void testIncompatibleAgentRegistration() throws AmbariException,
InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
replay(am);
Clusters fsm = clusters;
HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -1127,7 +649,7 @@ public class TestHeartbeatHandler {
@Test
public void testRegisterNewNode()
throws AmbariException, InvalidStateTransitionException {
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
replay(am);
Clusters fsm = clusters;
fsm.addHost(DummyHostname1);
@@ -1220,7 +742,7 @@ public class TestHeartbeatHandler {
HeartbeatMonitor hm = mock(HeartbeatMonitor.class);
when(hm.generateStatusCommands(anyString())).thenReturn(dummyCmds);
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
replay(am);
Clusters fsm = clusters;
ActionQueue actionQueue = new ActionQueue();
@@ -1248,7 +770,7 @@ public class TestHeartbeatHandler {
@Test
@SuppressWarnings("unchecked")
public void testTaskInProgressHandling() throws AmbariException, InvalidStateTransitionException {
- Cluster cluster = getDummyCluster();
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
hdfs.persist();
hdfs.addServiceComponent(DATANODE).persist();
@@ -1289,15 +811,16 @@ public class TestHeartbeatHandler {
final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
Role.DATANODE, null, RoleCommand.INSTALL);
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(
new ArrayList<HostRoleCommand>() {{
add(command);
}});
replay(am);
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
+ handler.getHeartbeatProcessor().processHeartbeat(hb);
State componentState1 = serviceComponentHost1.getState();
assertEquals("Host state should still be installing", State.INSTALLING, componentState1);
}
@@ -1305,7 +828,7 @@ public class TestHeartbeatHandler {
@Test
@SuppressWarnings("unchecked")
public void testOPFailedEventForAbortedTask() throws AmbariException, InvalidStateTransitionException {
- Cluster cluster = getDummyCluster();
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
hdfs.persist();
hdfs.addServiceComponent(DATANODE).persist();
@@ -1342,494 +865,45 @@ public class TestHeartbeatHandler {
List<CommandReport> reports = new ArrayList<CommandReport>();
CommandReport cr = new CommandReport();
- cr.setActionId(StageUtils.getActionId(1, 1));
- cr.setTaskId(1);
- cr.setClusterName(DummyCluster);
- cr.setServiceName(HDFS);
- cr.setRole(DATANODE);
- cr.setRoleCommand("INSTALL");
- cr.setStatus("FAILED");
- cr.setStdErr("none");
- cr.setStdOut("dummy output");
- cr.setExitCode(777);
- reports.add(cr);
- hb.setReports(reports);
- hb.setComponentStatus(new ArrayList<ComponentStatus>());
-
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
-
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>() {{
- add(command);
- }});
- replay(am);
-
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
- State componentState1 = serviceComponentHost1.getState();
- assertEquals("Host state should still be installing", State.INSTALLING,
- componentState1);
- }
-
- /**
- * Tests the fact that when START and STOP commands are in progress, and heartbeat
- * forces the host component state to STARTED or INSTALLED, there are no undesired
- * side effects.
- * @throws AmbariException
- * @throws InvalidStateTransitionException
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testCommandReportOnHeartbeatUpdatedState()
- throws AmbariException, InvalidStateTransitionException {
- Cluster cluster = getDummyCluster();
- Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-
- ActionQueue aq = new ActionQueue();
-
- ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
- serviceComponentHost1.setState(State.INSTALLED);
-
- HeartBeat hb = new HeartBeat();
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(0);
- hb.setHostname(DummyHostname1);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-
- List<CommandReport> reports = new ArrayList<CommandReport>();
- CommandReport cr = new CommandReport();
- cr.setActionId(StageUtils.getActionId(requestId, stageId));
- cr.setTaskId(1);
- cr.setClusterName(DummyCluster);
- cr.setServiceName(HDFS);
- cr.setRole(DATANODE);
- cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
- cr.setStdErr("none");
- cr.setStdOut("dummy output");
- cr.setExitCode(777);
- cr.setRoleCommand("START");
- reports.add(cr);
- hb.setReports(reports);
- hb.setComponentStatus(new ArrayList<ComponentStatus>());
-
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
-
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>() {{
- add(command);
- }}).anyTimes();
- replay(am);
-
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.INSTALLED,
- State.INSTALLED, serviceComponentHost1.getState());
-
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(1);
- cr.setStatus(HostRoleStatus.COMPLETED.toString());
- cr.setExitCode(0);
-
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.STARTED,
- State.STARTED, serviceComponentHost1.getState());
-
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(2);
- cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
- cr.setRoleCommand("STOP");
- cr.setExitCode(777);
-
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.STARTED,
- State.STARTED, serviceComponentHost1.getState());
-
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(3);
- cr.setStatus(HostRoleStatus.COMPLETED.toString());
- cr.setExitCode(0);
-
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.INSTALLED,
- State.INSTALLED, serviceComponentHost1.getState());
-
- // validate the transitions when there is no heartbeat
- serviceComponentHost1.setState(State.STARTING);
- cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
- cr.setExitCode(777);
- cr.setRoleCommand("START");
- hb.setResponseId(4);
-
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.STARTING,
- State.STARTING, serviceComponentHost1.getState());
-
- cr.setStatus(HostRoleStatus.COMPLETED.toString());
- cr.setExitCode(0);
- hb.setResponseId(5);
-
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.STARTED,
- State.STARTED, serviceComponentHost1.getState());
-
- serviceComponentHost1.setState(State.STOPPING);
- cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
- cr.setExitCode(777);
- cr.setRoleCommand("STOP");
- hb.setResponseId(6);
-
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.STOPPING,
- State.STOPPING, serviceComponentHost1.getState());
-
- cr.setStatus(HostRoleStatus.COMPLETED.toString());
- cr.setExitCode(0);
- hb.setResponseId(7);
-
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.INSTALLED,
- State.INSTALLED, serviceComponentHost1.getState());
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testUpgradeSpecificHandling() throws AmbariException, InvalidStateTransitionException {
- Cluster cluster = getDummyCluster();
- Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
-
- ActionQueue aq = new ActionQueue();
-
- ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
- serviceComponentHost1.setState(State.UPGRADING);
-
- HeartBeat hb = new HeartBeat();
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(0);
- hb.setHostname(DummyHostname1);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
-
- List<CommandReport> reports = new ArrayList<CommandReport>();
- CommandReport cr = new CommandReport();
- cr.setActionId(StageUtils.getActionId(requestId, stageId));
- cr.setTaskId(1);
- cr.setClusterName(DummyCluster);
- cr.setServiceName(HDFS);
- cr.setRole(DATANODE);
- cr.setRoleCommand("INSTALL");
- cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
- cr.setStdErr("none");
- cr.setStdOut("dummy output");
- cr.setExitCode(777);
- reports.add(cr);
- hb.setReports(reports);
- hb.setComponentStatus(new ArrayList<ComponentStatus>());
-
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
-
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>() {{
- add(command);
- }}).anyTimes();
- replay(am);
-
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.UPGRADING,
- State.UPGRADING, serviceComponentHost1.getState());
-
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(1);
- cr.setStatus(HostRoleStatus.COMPLETED.toString());
- cr.setExitCode(0);
-
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.INSTALLED,
- State.INSTALLED, serviceComponentHost1.getState());
-
- serviceComponentHost1.setState(State.UPGRADING);
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(2);
- cr.setStatus(HostRoleStatus.FAILED.toString());
- cr.setExitCode(3);
-
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.UPGRADING,
- State.UPGRADING, serviceComponentHost1.getState());
-
- serviceComponentHost1.setState(State.UPGRADING);
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(3);
- cr.setStatus(HostRoleStatus.PENDING.toString());
- cr.setExitCode(55);
-
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.UPGRADING,
- State.UPGRADING, serviceComponentHost1.getState());
-
- serviceComponentHost1.setState(State.UPGRADING);
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(4);
- cr.setStatus(HostRoleStatus.QUEUED.toString());
- cr.setExitCode(55);
-
- handler.handleHeartBeat(hb);
- assertEquals("Host state should be " + State.UPGRADING,
- State.UPGRADING, serviceComponentHost1.getState());
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testStatusHeartbeatWithVersion() throws Exception {
- Cluster cluster = getDummyCluster();
- Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(HDFS_CLIENT).persist();
- hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
-
- ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
- ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
- ServiceComponentHost serviceComponentHost3 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(HDFS_CLIENT).getServiceComponentHost(DummyHostname1);
-
- StackId stack130 = new StackId("HDP-1.3.0");
- StackId stack120 = new StackId("HDP-1.2.0");
-
- serviceComponentHost1.setState(State.INSTALLED);
- serviceComponentHost2.setState(State.STARTED);
- serviceComponentHost3.setState(State.STARTED);
- serviceComponentHost1.setStackVersion(stack130);
- serviceComponentHost2.setStackVersion(stack120);
- serviceComponentHost3.setStackVersion(stack120);
-
- HeartBeat hb = new HeartBeat();
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(0);
- hb.setHostname(DummyHostname1);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- hb.setReports(new ArrayList<CommandReport>());
- hb.setAgentEnv(new AgentEnv());
- hb.setMounts(new ArrayList<DiskInfo>());
-
- ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
- ComponentStatus componentStatus1 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED,
- SecurityState.UNSECURED, DATANODE, "{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}");
- ComponentStatus componentStatus2 =
- createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED, SecurityState.UNSECURED, NAMENODE, "");
- ComponentStatus componentStatus3 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.INSTALLED,
- SecurityState.UNSECURED, HDFS_CLIENT, "{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}");
-
- componentStatuses.add(componentStatus1);
- componentStatuses.add(componentStatus2);
- componentStatuses.add(componentStatus3);
- hb.setComponentStatus(componentStatuses);
-
- ActionQueue aq = new ActionQueue();
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>() {{
- }});
- replay(am);
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
- assertEquals("Matching value " + serviceComponentHost1.getStackVersion(),
- stack130, serviceComponentHost1.getStackVersion());
- assertEquals("Matching value " + serviceComponentHost2.getStackVersion(),
- stack120, serviceComponentHost2.getStackVersion());
- assertEquals("Matching value " + serviceComponentHost3.getStackVersion(),
- stack130, serviceComponentHost3.getStackVersion());
- assertTrue(hb.getAgentEnv().getHostHealth().getServerTimeStampAtReporting() >= hb.getTimestamp());
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testComponentUpgradeCompleteReport() throws AmbariException, InvalidStateTransitionException {
- Cluster cluster = getDummyCluster();
- Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(HDFS_CLIENT).persist();
- hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
-
- ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
- ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
-
- StackId stack130 = new StackId("HDP-1.3.0");
- StackId stack120 = new StackId("HDP-1.2.0");
-
- serviceComponentHost1.setState(State.UPGRADING);
- serviceComponentHost2.setState(State.INSTALLING);
-
- serviceComponentHost1.setStackVersion(stack120);
- serviceComponentHost1.setDesiredStackVersion(stack130);
- serviceComponentHost2.setStackVersion(stack120);
-
- HeartBeat hb = new HeartBeat();
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(0);
- hb.setHostname(DummyHostname1);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- CommandReport cr1 = new CommandReport();
- cr1.setActionId(StageUtils.getActionId(requestId, stageId));
- cr1.setTaskId(1);
- cr1.setClusterName(DummyCluster);
- cr1.setServiceName(HDFS);
- cr1.setRole(DATANODE);
- cr1.setStatus(HostRoleStatus.COMPLETED.toString());
- cr1.setStdErr("none");
- cr1.setStdOut("dummy output");
- cr1.setExitCode(0);
- cr1.setRoleCommand(RoleCommand.UPGRADE.toString());
-
- CommandReport cr2 = new CommandReport();
- cr2.setActionId(StageUtils.getActionId(requestId, stageId));
- cr2.setTaskId(2);
- cr2.setClusterName(DummyCluster);
- cr2.setServiceName(HDFS);
- cr2.setRole(NAMENODE);
- cr2.setStatus(HostRoleStatus.COMPLETED.toString());
- cr2.setStdErr("none");
- cr2.setStdOut("dummy output");
- cr2.setExitCode(0);
- cr2.setRoleCommand(RoleCommand.UPGRADE.toString());
- ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
- reports.add(cr1);
- reports.add(cr2);
- hb.setReports(reports);
-
- ActionQueue aq = new ActionQueue();
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
-
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>() {{
- add(command);
- add(command);
- }});
- replay(am);
-
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
- assertEquals("Stack version for SCH should be updated to " +
- serviceComponentHost1.getDesiredStackVersion(),
- stack130, serviceComponentHost1.getStackVersion());
- assertEquals("Stack version for SCH should not change ",
- stack120, serviceComponentHost2.getStackVersion());
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testComponentUpgradeInProgressReport() throws AmbariException, InvalidStateTransitionException {
- Cluster cluster = getDummyCluster();
- Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(HDFS_CLIENT).persist();
- hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
-
- ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
- ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
-
- StackId stack130 = new StackId("HDP-1.3.0");
- StackId stack120 = new StackId("HDP-1.2.0");
-
- serviceComponentHost1.setState(State.UPGRADING);
- serviceComponentHost2.setState(State.INSTALLING);
-
- serviceComponentHost1.setStackVersion(stack120);
- serviceComponentHost1.setDesiredStackVersion(stack130);
- serviceComponentHost2.setStackVersion(stack120);
-
- HeartBeat hb = new HeartBeat();
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(0);
- hb.setHostname(DummyHostname1);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- CommandReport cr1 = new CommandReport();
- cr1.setActionId(StageUtils.getActionId(requestId, stageId));
- cr1.setTaskId(1);
- cr1.setClusterName(DummyCluster);
- cr1.setServiceName(HDFS);
- cr1.setRole(DATANODE);
- cr1.setRoleCommand("INSTALL");
- cr1.setStatus(HostRoleStatus.IN_PROGRESS.toString());
- cr1.setStdErr("none");
- cr1.setStdOut("dummy output");
- cr1.setExitCode(777);
-
- CommandReport cr2 = new CommandReport();
- cr2.setActionId(StageUtils.getActionId(requestId, stageId));
- cr2.setTaskId(2);
- cr2.setClusterName(DummyCluster);
- cr2.setServiceName(HDFS);
- cr2.setRole(NAMENODE);
- cr2.setRoleCommand("INSTALL");
- cr2.setStatus(HostRoleStatus.IN_PROGRESS.toString());
- cr2.setStdErr("none");
- cr2.setStdOut("dummy output");
- cr2.setExitCode(777);
- ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
- reports.add(cr1);
- reports.add(cr2);
+ cr.setActionId(StageUtils.getActionId(1, 1));
+ cr.setTaskId(1);
+ cr.setClusterName(DummyCluster);
+ cr.setServiceName(HDFS);
+ cr.setRole(DATANODE);
+ cr.setRoleCommand("INSTALL");
+ cr.setStatus("FAILED");
+ cr.setStdErr("none");
+ cr.setStdOut("dummy output");
+ cr.setExitCode(777);
+ reports.add(cr);
hb.setReports(reports);
+ hb.setComponentStatus(new ArrayList<ComponentStatus>());
- ActionQueue aq = new ActionQueue();
final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
Role.DATANODE, null, null);
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(
new ArrayList<HostRoleCommand>() {{
add(command);
- add(command);
}});
replay(am);
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
handler.handleHeartBeat(hb);
- assertEquals("State of SCH not change while operation is in progress",
- State.UPGRADING, serviceComponentHost1.getState());
- assertEquals("Stack version of SCH should not change after in progress report",
- stack130, serviceComponentHost1.getDesiredStackVersion());
- assertEquals("State of SCH not change while operation is in progress",
- State.INSTALLING, serviceComponentHost2.getState());
+ handler.getHeartbeatProcessor().processHeartbeat(hb);
+ State componentState1 = serviceComponentHost1.getState();
+ assertEquals("Host state should still be installing", State.INSTALLING,
+ componentState1);
}
+
+
@Test
@SuppressWarnings("unchecked")
- public void testComponentUpgradeFailReport() throws AmbariException, InvalidStateTransitionException {
- Cluster cluster = getDummyCluster();
+ public void testStatusHeartbeatWithVersion() throws Exception {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
hdfs.persist();
hdfs.addServiceComponent(DATANODE).persist();
@@ -1840,117 +914,73 @@ public class TestHeartbeatHandler {
hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
- getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+ getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost3 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(HDFS_CLIENT).getServiceComponentHost(DummyHostname1);
StackId stack130 = new StackId("HDP-1.3.0");
StackId stack120 = new StackId("HDP-1.2.0");
- serviceComponentHost1.setState(State.UPGRADING);
- serviceComponentHost2.setState(State.INSTALLING);
-
- serviceComponentHost1.setStackVersion(stack120);
- serviceComponentHost1.setDesiredStackVersion(stack130);
+ serviceComponentHost1.setState(State.INSTALLED);
+ serviceComponentHost2.setState(State.STARTED);
+ serviceComponentHost3.setState(State.STARTED);
+ serviceComponentHost1.setStackVersion(stack130);
serviceComponentHost2.setStackVersion(stack120);
-
- Stage s = stageFactory.createNew(requestId, "/a/b", "cluster1", 1L, "action manager test",
- "clusterHostInfo", "commandParamsStage", "hostParamsStage");
- s.setStageId(stageId);
- s.addHostRoleExecutionCommand(DummyHostname1, Role.DATANODE, RoleCommand.UPGRADE,
- new ServiceComponentHostUpgradeEvent(Role.DATANODE.toString(),
- DummyHostname1, System.currentTimeMillis(), "HDP-1.3.0"),
- DummyCluster, "HDFS", false, false);
- s.addHostRoleExecutionCommand(DummyHostname1, Role.NAMENODE, RoleCommand.INSTALL,
- new ServiceComponentHostInstallEvent(Role.NAMENODE.toString(),
- DummyHostname1, System.currentTimeMillis(), "HDP-1.3.0"),
- DummyCluster, "HDFS", false, false);
- List<Stage> stages = new ArrayList<Stage>();
- stages.add(s);
- Request request = new Request(stages, clusters);
- actionDBAccessor.persistActions(request);
- CommandReport cr = new CommandReport();
- cr.setActionId(StageUtils.getActionId(requestId, stageId));
- cr.setTaskId(1);
- cr.setClusterName(DummyCluster);
- cr.setServiceName(HDFS);
- cr.setRole(DATANODE);
- cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
- cr.setStdErr("none");
- cr.setStdOut("dummy output");
- actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
- Role.DATANODE.name(), cr);
- cr.setRole(NAMENODE);
- cr.setTaskId(2);
- actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
- Role.NAMENODE.name(), cr);
+ serviceComponentHost3.setStackVersion(stack120);
HeartBeat hb = new HeartBeat();
hb.setTimestamp(System.currentTimeMillis());
hb.setResponseId(0);
hb.setHostname(DummyHostname1);
hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- CommandReport cr1 = new CommandReport();
- cr1.setActionId(StageUtils.getActionId(requestId, stageId));
- cr1.setTaskId(1);
- cr1.setClusterName(DummyCluster);
- cr1.setServiceName(HDFS);
- cr1.setRole(DATANODE);
- cr1.setRoleCommand("INSTALL");
- cr1.setStatus(HostRoleStatus.FAILED.toString());
- cr1.setStdErr("none");
- cr1.setStdOut("dummy output");
- cr1.setExitCode(0);
+ hb.setReports(new ArrayList<CommandReport>());
+ hb.setAgentEnv(new AgentEnv());
+ hb.setMounts(new ArrayList<DiskInfo>());
- CommandReport cr2 = new CommandReport();
- cr2.setActionId(StageUtils.getActionId(requestId, stageId));
- cr2.setTaskId(2);
- cr2.setClusterName(DummyCluster);
- cr2.setServiceName(HDFS);
- cr2.setRole(NAMENODE);
- cr2.setRoleCommand("INSTALL");
- cr2.setStatus(HostRoleStatus.FAILED.toString());
- cr2.setStdErr("none");
- cr2.setStdOut("dummy output");
- cr2.setExitCode(0);
- ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
- reports.add(cr1);
- reports.add(cr2);
- hb.setReports(reports);
+ ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
+ ComponentStatus componentStatus1 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED,
+ SecurityState.UNSECURED, DATANODE, "{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}");
+ ComponentStatus componentStatus2 =
+ createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED, SecurityState.UNSECURED, NAMENODE, "");
+ ComponentStatus componentStatus3 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.INSTALLED,
+ SecurityState.UNSECURED, HDFS_CLIENT, "{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}");
- ActionQueue aq = new ActionQueue();
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
+ componentStatuses.add(componentStatus1);
+ componentStatuses.add(componentStatus2);
+ componentStatuses.add(componentStatus3);
+ hb.setComponentStatus(componentStatuses);
- ActionManager am = getMockActionManager();
+ ActionQueue aq = new ActionQueue();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(
new ArrayList<HostRoleCommand>() {{
- add(command);
- add(command);
}});
replay(am);
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
handler.handleHeartBeat(hb);
- assertEquals("State of SCH should change after fail report",
- State.UPGRADING, serviceComponentHost1.getState());
- assertEquals("State of SCH should change after fail report",
- State.INSTALL_FAILED, serviceComponentHost2.getState());
- assertEquals("Stack version of SCH should not change after fail report",
- stack120, serviceComponentHost1.getStackVersion());
- assertEquals("Stack version of SCH should not change after fail report",
- stack130, serviceComponentHost1.getDesiredStackVersion());
- assertEquals("Stack version of SCH should not change after fail report",
- State.INSTALL_FAILED, serviceComponentHost2.getState());
+ heartbeatProcessor.processHeartbeat(hb);
+
+ assertEquals("Matching value " + serviceComponentHost1.getStackVersion(),
+ stack130, serviceComponentHost1.getStackVersion());
+ assertEquals("Matching value " + serviceComponentHost2.getStackVersion(),
+ stack120, serviceComponentHost2.getStackVersion());
+ assertEquals("Matching value " + serviceComponentHost3.getStackVersion(),
+ stack130, serviceComponentHost3.getStackVersion());
+ assertTrue(hb.getAgentEnv().getHostHealth().getServerTimeStampAtReporting() >= hb.getTimestamp());
}
+
@Test
@SuppressWarnings("unchecked")
public void testRecoveryStatusReports() throws Exception {
Clusters fsm = clusters;
- Cluster cluster = getDummyCluster();
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
Host hostObject = clusters.getHost(DummyHostname1);
Service hdfs = cluster.addService(HDFS);
hdfs.persist();
@@ -1964,7 +994,7 @@ public class TestHeartbeatHandler {
ActionQueue aq = new ActionQueue();
final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1, Role.DATANODE, null, null);
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(
new ArrayList<HostRoleCommand>() {{
add(command);
@@ -2032,7 +1062,7 @@ public class TestHeartbeatHandler {
public void testProcessStatusReports() throws Exception {
Clusters fsm = clusters;
- Cluster cluster = getDummyCluster();
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
Host hostObject = clusters.getHost(DummyHostname1);
Service hdfs = cluster.addService(HDFS);
hdfs.persist();
@@ -2047,7 +1077,7 @@ public class TestHeartbeatHandler {
final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
Role.DATANODE, null, null);
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(
new ArrayList<HostRoleCommand>() {{
add(command);
@@ -2055,6 +1085,7 @@ public class TestHeartbeatHandler {
}}).anyTimes();
replay(am);
HeartBeatHandler handler = new HeartBeatHandler(fsm, aq, am, injector);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
Register reg = new Register();
HostInfo hi = new HostInfo();
@@ -2092,6 +1123,7 @@ public class TestHeartbeatHandler {
componentStatus.add(nameNodeStatus);
hb1.setComponentStatus(componentStatus);
handler.handleHeartBeat(hb1);
+ heartbeatProcessor.processHeartbeat(hb1);
assertEquals(HostHealthStatus.HealthStatus.HEALTHY.name(), hostObject.getStatus());
//Some slaves are down, masters are up
@@ -2116,6 +1148,7 @@ public class TestHeartbeatHandler {
componentStatus.add(nameNodeStatus);
hb2.setComponentStatus(componentStatus);
handler.handleHeartBeat(hb2);
+ heartbeatProcessor.processHeartbeat(hb2);
assertEquals(HostHealthStatus.HealthStatus.ALERT.name(), hostObject.getStatus());
// mark the installed DN as maintenance
@@ -2142,6 +1175,7 @@ public class TestHeartbeatHandler {
componentStatus.add(nameNodeStatus);
hb2a.setComponentStatus(componentStatus);
handler.handleHeartBeat(hb2a);
+ heartbeatProcessor.processHeartbeat(hb2a);
assertEquals(HostHealthStatus.HealthStatus.HEALTHY.name(), hostObject.getStatus());
hdfs.getServiceComponent(DATANODE).getServiceComponentHost(
@@ -2169,11 +1203,13 @@ public class TestHeartbeatHandler {
componentStatus.add(nameNodeStatus);
hb3.setComponentStatus(componentStatus);
handler.handleHeartBeat(hb3);
+ heartbeatProcessor.processHeartbeat(hb3);
assertEquals(HostHealthStatus.HealthStatus.UNHEALTHY.name(), hostObject.getStatus());
//All are up
hb1.setResponseId(4);
handler.handleHeartBeat(hb1);
+ heartbeatProcessor.processHeartbeat(hb1);
assertEquals(HostHealthStatus.HealthStatus.HEALTHY.name(), hostObject.getStatus());
reset(am);
@@ -2199,10 +1235,12 @@ public class TestHeartbeatHandler {
componentStatus.add(dataNodeStatus);
hb4.setComponentStatus(componentStatus);
handler.handleHeartBeat(hb4);
+ heartbeatProcessor.processHeartbeat(hb4);
assertEquals(HostHealthStatus.HealthStatus.UNHEALTHY.name(), hostObject.getStatus());
hb1.setResponseId(6);
handler.handleHeartBeat(hb1);
+ heartbeatProcessor.processHeartbeat(hb1);
assertEquals(HostHealthStatus.HealthStatus.HEALTHY.name(), hostObject.getStatus());
//Some command reports
@@ -2225,6 +1263,7 @@ public class TestHeartbeatHandler {
reports.add(cr1);
hb5.setReports(reports);
handler.handleHeartBeat(hb5);
+ heartbeatProcessor.processHeartbeat(hb5);
assertEquals(HostHealthStatus.HealthStatus.ALERT.name(), hostObject.getStatus());
}
@@ -2270,7 +1309,7 @@ public class TestHeartbeatHandler {
final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
Role.DATANODE, null, null);
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(
new ArrayList<HostRoleCommand>() {{
add(command);
@@ -2278,12 +1317,13 @@ public class TestHeartbeatHandler {
}});
replay(am);
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
// CUSTOM_COMMAND and ACTIONEXECUTE reports are ignored
// they should not change the host component state
try {
handler.handleHeartBeat(hb);
+ handler.getHeartbeatProcessor().processHeartbeat(hb);
} catch (Exception e) {
fail();
}
@@ -2321,8 +1361,8 @@ public class TestHeartbeatHandler {
expected.setStackVersion(dummyStackId.getStackVersion());
expected.setComponents(dummyComponents);
- getDummyCluster();
- HeartBeatHandler handler = getHeartBeatHandler(getMockActionManager(),
+ heartbeatTestHelper.getDummyCluster();
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(heartbeatTestHelper.getMockActionManager(),
new ActionQueue());
ComponentsResponse actual = handler.handleComponents(DummyCluster);
@@ -2337,19 +1377,7 @@ public class TestHeartbeatHandler {
assertEquals(expected.getComponents(), actual.getComponents());
}
- private ActionManager getMockActionManager() {
- ActionQueue actionQueueMock = createNiceMock(ActionQueue.class);
- Clusters clustersMock = createNiceMock(Clusters.class);
- Configuration configurationMock = createNiceMock(Configuration.class);
-
- ActionManager actionManager = createMockBuilder(ActionManager.class).
- addMockedMethod("getTasks").
- withConstructor((long)0, (long)0, actionQueueMock, clustersMock,
- actionDBAccessor, new HostsMap((String) null), unitOfWork,
- injector.getInstance(RequestFactory.class), configurationMock, createNiceMock(AmbariEventPublisher.class)).
- createMock();
- return actionManager;
- }
+
private ComponentStatus createComponentStatus(String clusterName, String serviceName, String message,
@@ -2366,164 +1394,11 @@ public class TestHeartbeatHandler {
return componentStatus1;
}
- private HeartBeatHandler getHeartBeatHandler(ActionManager am, ActionQueue aq)
- throws InvalidStateTransitionException, AmbariException {
- HeartBeatHandler handler = new HeartBeatHandler(clusters, aq, am, injector);
- Register reg = new Register();
- HostInfo hi = new HostInfo();
- hi.setHostName(DummyHostname1);
- hi.setOS(DummyOs);
- hi.setOSRelease(DummyOSRelease);
- reg.setHostname(DummyHostname1);
- reg.setResponseId(0);
- reg.setHardwareProfile(hi);
- reg.setAgentVersion(metaInfo.getServerVersion());
- handler.handleRegistration(reg);
- return handler;
- }
-
- private Cluster getDummyCluster()
- throws AmbariException {
- StackEntity stackEntity = stackDAO.find(HDP_22_STACK.getStackName(), HDP_22_STACK.getStackVersion());
- org.junit.Assert.assertNotNull(stackEntity);
-
- // Create the cluster
- ResourceTypeEntity resourceTypeEntity = new ResourceTypeEntity();
- resourceTypeEntity.setId(ResourceType.CLUSTER.getId());
- resourceTypeEntity.setName(ResourceType.CLUSTER.name());
- resourceTypeEntity = resourceTypeDAO.merge(resourceTypeEntity);
-
- ResourceEntity resourceEntity = new ResourceEntity();
- resourceEntity.setResourceType(resourceTypeEntity);
-
- ClusterEntity clusterEntity = new ClusterEntity();
- clusterEntity.setClusterName(DummyCluster);
- clusterEntity.setClusterInfo("test_cluster_info1");
- clusterEntity.setResource(resourceEntity);
- clusterEntity.setDesiredStack(stackEntity);
-
- clusterDAO.create(clusterEntity);
-
- StackId stackId = new StackId(DummyStackId);
-
- Cluster cluster = clusters.getCluster(DummyCluster);
-
- cluster.setDesiredStackVersion(stackId);
- cluster.setCurrentStackVersion(stackId);
- helper.getOrCreateRepositoryVersion(stackId, stackId.getStackVersion());
- cluster.createClusterVersion(stackId, stackId.getStackVersion(), "admin",
- RepositoryVersionState.UPGRADING);
-
- Set<String> hostNames = new HashSet<String>(){{
- add(DummyHostname1);
- }};
-
- Map<String, String> hostAttributes = new HashMap<String, String>();
- hostAttributes.put("os_family", "redhat");
- hostAttributes.put("os_release_version", "6.3");
-
- List<HostEntity> hostEntities = new ArrayList<HostEntity>();
- for(String hostName : hostNames) {
- clusters.addHost(hostName);
- Host host = clusters.getHost(hostName);
- host.setHostAttributes(hostAttributes);
- host.persist();
-
- HostEntity hostEntity = hostDAO.findByName(hostName);
- Assert.assertNotNull(hostEntity);
- hostEntities.add(hostEntity);
- }
- clusterEntity.setHostEntities(hostEntities);
- clusters.mapHostsToCluster(hostNames, DummyCluster);
-
- return cluster;
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testCommandStatusProcesses() throws Exception {
- Cluster cluster = getDummyCluster();
- Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED);
-
- ActionQueue aq = new ActionQueue();
-
- HeartBeat hb = new HeartBeat();
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(0);
- hb.setHostname(DummyHostname1);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- hb.setReports(new ArrayList<CommandReport>());
-
- List<Map<String, String>> procs = new ArrayList<Map<String, String>>();
- Map<String, String> proc1info = new HashMap<String, String>();
- proc1info.put("name", "a");
- proc1info.put("status", "RUNNING");
- procs.add(proc1info);
-
- Map<String, String> proc2info = new HashMap<String, String>();
- proc2info.put("name", "b");
- proc2info.put("status", "NOT_RUNNING");
- procs.add(proc2info);
-
- Map<String, Object> extra = new HashMap<String, Object>();
- extra.put("processes", procs);
-
- ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
- ComponentStatus componentStatus1 = new ComponentStatus();
- componentStatus1.setClusterName(DummyCluster);
- componentStatus1.setServiceName(HDFS);
- componentStatus1.setMessage(DummyHostStatus);
- componentStatus1.setStatus(State.STARTED.name());
- componentStatus1.setSecurityState(SecurityState.UNSECURED.name());
- componentStatus1.setComponentName(DATANODE);
-
- componentStatus1.setExtra(extra);
- componentStatuses.add(componentStatus1);
- hb.setComponentStatus(componentStatuses);
-
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
-
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>() {{
- add(command);
- }}).anyTimes();
- replay(am);
-
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
- handler.handleHeartBeat(hb);
- ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
-
- Assert.assertEquals(Integer.valueOf(2), Integer.valueOf(sch.getProcesses().size()));
-
- hb = new HeartBeat();
- hb.setTimestamp(System.currentTimeMillis());
- hb.setResponseId(1);
- hb.setHostname(DummyHostname1);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- hb.setReports(new ArrayList<CommandReport>());
-
- componentStatus1 = new ComponentStatus();
- componentStatus1.setClusterName(DummyCluster);
- componentStatus1.setServiceName(HDFS);
- componentStatus1.setMessage(DummyHostStatus);
- componentStatus1.setStatus(State.STARTED.name());
- componentStatus1.setSecurityState(SecurityState.UNSECURED.name());
- componentStatus1.setComponentName(DATANODE);
- hb.setComponentStatus(Collections.singletonList(componentStatus1));
-
- handler.handleHeartBeat(hb);
- }
@Test
@SuppressWarnings("unchecked")
public void testCommandStatusProcesses_empty() throws Exception {
- Cluster cluster = getDummyCluster();
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
hdfs.persist();
hdfs.addServiceComponent(DATANODE).persist();
@@ -2553,137 +1428,19 @@ public class TestHeartbeatHandler {
final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
Role.DATANODE, null, null);
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(
new ArrayList<HostRoleCommand>() {{
add(command);
}});
replay(am);
- HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
Assert.assertEquals(Integer.valueOf(0), Integer.valueOf(sch.getProcesses().size()));
}
- /**
- * Tests that if there is an invalid cluster in heartbeat data, the heartbeat
- * doesn't fail.
- *
- * @throws Exception
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testHeartBeatWithAlertAndInvalidCluster() throws Exception {
- ActionManager am = getMockActionManager();
-
- expect(am.getTasks(anyObject(List.class))).andReturn(
- new ArrayList<HostRoleCommand>());
-
- replay(am);
-
- Cluster cluster = getDummyCluster();
- Clusters fsm = clusters;
- Host hostObject = clusters.getHost(DummyHostname1);
- hostObject.setIPv4("ipv4");
- hostObject.setIPv6("ipv6");
- hostObject.setOsType(DummyOsType);
-
- ActionQueue aq = new ActionQueue();
-
- HeartBeatHandler handler = new HeartBeatHandler(fsm, aq, am, injector);
- Register reg = new Register();
- HostInfo hi = new HostInfo();
- hi.setHostName(DummyHostname1);
- hi.setOS(DummyOs);
- hi.setOSRelease(DummyOSRelease);
- reg.setHostname(DummyHostname1);
- reg.setHardwareProfile(hi);
- reg.setAgentVersion(metaInfo.getServerVersion());
- handler.handleRegistration(reg);
-
- hostObject.setState(HostState.UNHEALTHY);
-
- ExecutionCommand execCmd = new ExecutionCommand();
- execCmd.setRequestAndStage(2, 34);
- execCmd.setHostname(DummyHostname1);
- aq.enqueue(DummyHostname1, new ExecutionCommand());
-
- HeartBeat hb = new HeartBeat();
- HostStatus hs = new HostStatus(Status.HEALTHY, DummyHostStatus);
-
- hb.setResponseId(0);
- hb.setNodeStatus(hs);
- hb.setHostname(DummyHostname1);
-
- Alert alert = new Alert("foo", "bar", "baz", "foobar", "foobarbaz",
- AlertState.OK);
-
- alert.setCluster("BADCLUSTER");
-
- List<Alert> alerts = Collections.singletonList(alert);
- hb.setAlerts(alerts);
-
- // should NOT throw AmbariException from alerts.
- handler.handleHeartBeat(hb);
- }
-
- @Test
- public void testInstallPackagesWithVersion() throws Exception {
- // required since this test method checks the DAO result of handling a
- // heartbeat which performs some async tasks
- EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
-
- final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
- Role.DATANODE, null, null);
-
- ActionManager am = getMockActionManager();
- expect(am.getTasks(anyObject(List.class))).andReturn(
- Collections.singletonList(command)).anyTimes();
- replay(am);
-
- Cluster cluster = getDummyCluster();
- HeartBeatHandler handler = getHeartBeatHandler(am, new ActionQueue());
- HeartBeat hb = new HeartBeat();
-
- JsonObject json = new JsonObject();
- json.addProperty("actual_version", "2.2.1.0-2222");
- json.addProperty("package_installation_result", "SUCCESS");
- json.addProperty("installed_repository_version", "0.1");
- json.addProperty("stack_id", cluster.getDesiredStackVersion().getStackId());
-
-
- CommandReport cmdReport = new CommandReport();
- cmdReport.setActionId(StageUtils.getActionId(requestId, stageId));
- cmdReport.setTaskId(1);
- cmdReport.setCustomCommand("install_packages");
- cmdReport.setStructuredOut(json.toString());
- cmdReport.setRoleCommand(RoleCommand.ACTIONEXECUTE.name());
- cmdReport.setStatus(HostRoleStatus.COMPLETED.name());
- cmdReport.setRole("install_packages");
- cmdReport.setClusterName(DummyCluster);
-
- hb.setReports(Collections.singletonList(cmdReport));
- hb.setTimestamp(0L);
- hb.setResponseId(0);
- hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
- hb.setHostname(DummyHostname1);
- hb.setComponentStatus(new ArrayList<ComponentStatus>());
-
- StackId stackId = new StackId("HDP", "0.1");
-
- RepositoryVersionDAO dao = injector.getInstance(RepositoryVersionDAO.class);
- RepositoryVersionEntity entity = dao.findByStackAndVersion(stackId, "0.1");
- Assert.assertNotNull(entity);
-
- handler.handleHeartBeat(hb);
-
- entity = dao.findByStackAndVersion(stackId, "0.1");
- Assert.assertNull(entity);
-
- entity = dao.findByStackAndVersion(stackId, "2.2.1.0-2222");
- Assert.assertNotNull(entity);
- }
@Test
public void testInjectKeytabApplicableHost() throws Exception {
@@ -2758,14 +1515,14 @@ public class TestHeartbeatHandler {
final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
Role.DATANODE, null, null);
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(
new ArrayList<HostRoleCommand>() {{
add(command);
}});
replay(am);
- getHeartBeatHandler(am, aq).injectKeytab(executionCommand, "SET_KEYTAB", targetHost);
+ heartbeatTestHelper.getHeartBeatHandler(am, aq).injectKeytab(executionCommand, "SET_KEYTAB", targetHost);
return executionCommand.getKerberosCommandParams();
}
@@ -2789,14 +1546,14 @@ public class TestHeartbeatHandler {
final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
Role.DATANODE, null, null);
- ActionManager am = getMockActionManager();
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
expect(am.getTasks(anyObject(List.class))).andReturn(
new ArrayList<HostRoleCommand>() {{
add(command);
}});
replay(am);
- getHeartBeatHandler(am, aq).injectKeytab(executionCommand, "REMOVE_KEYTAB", targetHost);
+ heartbeatTestHelper.getHeartBeatHandler(am, aq).injectKeytab(executionCommand, "REMOVE_KEYTAB", targetHost);
return executionCommand.getKerberosCommandParams();
}
[2/3] ambari git commit: AMBARI-15141. Start all services request
aborts in the middle and hosts go into heartbeat-lost state. (mpapirkovskyy)
Posted by mp...@apache.org.
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
new file mode 100644
index 0000000..eb99142
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
@@ -0,0 +1,1290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import com.google.gson.JsonObject;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.persist.PersistService;
+import com.google.inject.persist.UnitOfWork;
+import junit.framework.Assert;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.ActionDBAccessor;
+import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.RequestFactory;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.StageFactory;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.HostsMap;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.HostState;
+import org.apache.ambari.server.state.SecurityState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
+import org.apache.ambari.server.utils.StageUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DATANODE;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyCluster;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostStatus;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostname1;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOSRelease;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOs;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOsType;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyStackId;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HBASE_MASTER;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS_CLIENT;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.NAMENODE;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.SECONDARY_NAMENODE;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class HeartbeatProcessorTest {
+
+ private static final Logger log = LoggerFactory.getLogger(TestHeartbeatHandler.class);
+ private Injector injector;
+ private Clusters clusters;
+ long requestId = 23;
+ long stageId = 31;
+ private UnitOfWork unitOfWork;
+
+ @Inject
+ Configuration config;
+
+ @Inject
+ ActionDBAccessor actionDBAccessor;
+
+ @Inject
+ HeartbeatTestHelper heartbeatTestHelper;
+
+ @Inject
+ private HostRoleCommandFactory hostRoleCommandFactory;
+
+ @Inject
+ private HostDAO hostDAO;
+
+ @Inject
+ private StageFactory stageFactory;
+
+ @Inject
+ private AmbariMetaInfo metaInfo;
+
+ private final static StackId HDP_22_STACK = new StackId("HDP", "2.2.0");
+
+ @Before
+ public void setup() throws Exception {
+ InMemoryDefaultTestModule module = HeartbeatTestHelper.getTestModule();
+ injector = Guice.createInjector(module);
+ injector.getInstance(GuiceJpaInitializer.class);
+ clusters = injector.getInstance(Clusters.class);
+ injector.injectMembers(this);
+ unitOfWork = injector.getInstance(UnitOfWork.class);
+ }
+
+ @After
+ public void teardown() throws AmbariException {
+ injector.getInstance(PersistService.class).stop();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testHeartbeatWithConfigs() throws Exception {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(NAMENODE).persist();
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
+ hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+ ActionQueue aq = new ActionQueue();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+ serviceComponentHost1.setState(State.INSTALLED);
+ serviceComponentHost2.setState(State.INSTALLED);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setResponseId(0);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+ hb.setHostname(DummyHostname1);
+
+ List<CommandReport> reports = new ArrayList<CommandReport>();
+ CommandReport cr = new CommandReport();
+ cr.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr.setServiceName(HDFS);
+ cr.setTaskId(1);
+ cr.setRole(DATANODE);
+ cr.setStatus("COMPLETED");
+ cr.setStdErr("");
+ cr.setStdOut("");
+ cr.setExitCode(215);
+ cr.setRoleCommand("START");
+ cr.setClusterName(DummyCluster);
+
+ cr.setConfigurationTags(new HashMap<String, Map<String, String>>() {{
+ put("global", new HashMap<String, String>() {{
+ put("tag", "version1");
+ }});
+ }});
+
+ reports.add(cr);
+ hb.setReports(reports);
+
+ HostEntity host1 = hostDAO.findByName(DummyHostname1);
+ Assert.assertNotNull(host1);
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ heartbeatProcessor.processHeartbeat(hb);
+
+ // the heartbeat test passed if actual configs is populated
+ Assert.assertNotNull(serviceComponentHost1.getActualConfigs());
+ Assert.assertEquals(serviceComponentHost1.getActualConfigs().size(), 1);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testRestartRequiredAfterInstallClient() throws Exception {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(HDFS_CLIENT).persist();
+ hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+ ActionQueue aq = new ActionQueue();
+
+ ServiceComponentHost serviceComponentHost = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(HDFS_CLIENT).getServiceComponentHost(DummyHostname1);
+
+ serviceComponentHost.setState(State.INSTALLED);
+ serviceComponentHost.setRestartRequired(true);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setResponseId(0);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+ hb.setHostname(DummyHostname1);
+
+
+ List<CommandReport> reports = new ArrayList<CommandReport>();
+ CommandReport cr = new CommandReport();
+ cr.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr.setServiceName(HDFS);
+ cr.setRoleCommand("INSTALL");
+ cr.setCustomCommand("EXECUTION_COMMAND");
+ cr.setTaskId(1);
+ cr.setRole(HDFS_CLIENT);
+ cr.setStatus("COMPLETED");
+ cr.setStdErr("");
+ cr.setStdOut("");
+ cr.setExitCode(215);
+ cr.setClusterName(DummyCluster);
+ cr.setConfigurationTags(new HashMap<String, Map<String, String>>() {{
+ put("global", new HashMap<String, String>() {{
+ put("tag", "version1");
+ }});
+ }});
+ reports.add(cr);
+ hb.setReports(reports);
+
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ heartbeatProcessor.processHeartbeat(hb);
+
+ Assert.assertNotNull(serviceComponentHost.getActualConfigs());
+ Assert.assertFalse(serviceComponentHost.isRestartRequired());
+ Assert.assertEquals(serviceComponentHost.getActualConfigs().size(), 1);
+
+ }
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testHeartbeatCustomCommandWithConfigs() throws Exception {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(NAMENODE).persist();
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
+ hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+ ActionQueue aq = new ActionQueue();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+ serviceComponentHost1.setState(State.INSTALLED);
+ serviceComponentHost2.setState(State.INSTALLED);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setResponseId(0);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+ hb.setHostname(DummyHostname1);
+
+ List<CommandReport> reports = new ArrayList<CommandReport>();
+ CommandReport cr = new CommandReport();
+ cr.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr.setServiceName(HDFS);
+ cr.setRoleCommand("CUSTOM_COMMAND");
+ cr.setCustomCommand("RESTART");
+ cr.setTaskId(1);
+ cr.setRole(DATANODE);
+ cr.setStatus("COMPLETED");
+ cr.setStdErr("");
+ cr.setStdOut("");
+ cr.setExitCode(215);
+ cr.setClusterName(DummyCluster);
+ cr.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
+ put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
+ }});
+ CommandReport crn = new CommandReport();
+ crn.setActionId(StageUtils.getActionId(requestId, stageId));
+ crn.setServiceName(HDFS);
+ crn.setRoleCommand("CUSTOM_COMMAND");
+ crn.setCustomCommand("START");
+ crn.setTaskId(1);
+ crn.setRole(NAMENODE);
+ crn.setStatus("COMPLETED");
+ crn.setStdErr("");
+ crn.setStdOut("");
+ crn.setExitCode(215);
+ crn.setClusterName(DummyCluster);
+ crn.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
+ put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
+ }});
+
+ reports.add(cr);
+ reports.add(crn);
+ hb.setReports(reports);
+
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ heartbeatProcessor.processHeartbeat(hb);
+
+ // the heartbeat test passed if actual configs is populated
+ Assert.assertNotNull(serviceComponentHost1.getActualConfigs());
+ Assert.assertEquals(serviceComponentHost1.getActualConfigs().size(), 1);
+ Assert.assertNotNull(serviceComponentHost2.getActualConfigs());
+ Assert.assertEquals(serviceComponentHost2.getActualConfigs().size(), 1);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testHeartbeatCustomStartStop() throws Exception {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(NAMENODE).persist();
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
+ hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+ ActionQueue aq = new ActionQueue();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+ serviceComponentHost1.setState(State.INSTALLED);
+ serviceComponentHost2.setState(State.STARTED);
+ serviceComponentHost1.setRestartRequired(true);
+ serviceComponentHost2.setRestartRequired(true);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setResponseId(0);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+ hb.setHostname(DummyHostname1);
+
+ List<CommandReport> reports = new ArrayList<CommandReport>();
+ CommandReport cr = new CommandReport();
+ cr.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr.setServiceName(HDFS);
+ cr.setRoleCommand("CUSTOM_COMMAND");
+ cr.setCustomCommand("START");
+ cr.setTaskId(1);
+ cr.setRole(DATANODE);
+ cr.setStatus("COMPLETED");
+ cr.setStdErr("");
+ cr.setStdOut("");
+ cr.setExitCode(215);
+ cr.setClusterName(DummyCluster);
+ CommandReport crn = new CommandReport();
+ crn.setActionId(StageUtils.getActionId(requestId, stageId));
+ crn.setServiceName(HDFS);
+ crn.setRoleCommand("CUSTOM_COMMAND");
+ crn.setCustomCommand("STOP");
+ crn.setTaskId(1);
+ crn.setRole(NAMENODE);
+ crn.setStatus("COMPLETED");
+ crn.setStdErr("");
+ crn.setStdOut("");
+ crn.setExitCode(215);
+ crn.setClusterName(DummyCluster);
+
+ reports.add(cr);
+ reports.add(crn);
+ hb.setReports(reports);
+
+ assertTrue(serviceComponentHost1.isRestartRequired());
+
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ heartbeatProcessor.processHeartbeat(hb);
+
+ // the heartbeat test passed if actual configs is populated
+ State componentState1 = serviceComponentHost1.getState();
+ assertEquals(State.STARTED, componentState1);
+ assertFalse(serviceComponentHost1.isRestartRequired());
+ State componentState2 = serviceComponentHost2.getState();
+ assertEquals(State.INSTALLED, componentState2);
+ assertTrue(serviceComponentHost2.isRestartRequired());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testStatusHeartbeat() throws Exception {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(NAMENODE).persist();
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
+ hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+ ActionQueue aq = new ActionQueue();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost3 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(SECONDARY_NAMENODE).getServiceComponentHost(DummyHostname1);
+ serviceComponentHost1.setState(State.INSTALLED);
+ serviceComponentHost1.setSecurityState(SecurityState.UNSECURED);
+ serviceComponentHost2.setState(State.INSTALLED);
+ serviceComponentHost2.setSecurityState(SecurityState.SECURING);
+ serviceComponentHost3.setState(State.STARTING);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(0);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+ hb.setReports(new ArrayList<CommandReport>());
+ ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
+ ComponentStatus componentStatus1 = new ComponentStatus();
+ componentStatus1.setClusterName(DummyCluster);
+ componentStatus1.setServiceName(HDFS);
+ componentStatus1.setMessage(DummyHostStatus);
+ componentStatus1.setStatus(State.STARTED.name());
+ componentStatus1.setSecurityState(SecurityState.SECURED_KERBEROS.name());
+ componentStatus1.setComponentName(DATANODE);
+ componentStatuses.add(componentStatus1);
+ ComponentStatus componentStatus2 = new ComponentStatus();
+ componentStatus2.setClusterName(DummyCluster);
+ componentStatus2.setServiceName(HDFS);
+ componentStatus2.setMessage(DummyHostStatus);
+ componentStatus2.setStatus(State.STARTED.name());
+ componentStatus2.setSecurityState(SecurityState.UNSECURED.name());
+ componentStatus2.setComponentName(SECONDARY_NAMENODE);
+ componentStatuses.add(componentStatus2);
+ hb.setComponentStatus(componentStatuses);
+
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ heartbeatProcessor.processHeartbeat(hb);
+ State componentState1 = serviceComponentHost1.getState();
+ State componentState2 = serviceComponentHost2.getState();
+ State componentState3 = serviceComponentHost3.getState();
+ assertEquals(State.STARTED, componentState1);
+ assertEquals(SecurityState.SECURED_KERBEROS, serviceComponentHost1.getSecurityState());
+ assertEquals(State.INSTALLED, componentState2);
+ assertEquals(SecurityState.SECURING, serviceComponentHost2.getSecurityState());
+ assertEquals(State.STARTED, componentState3);
+ assertEquals(SecurityState.UNSECURED, serviceComponentHost3.getSecurityState());
+ }
+
+
+ @Test
+ public void testCommandReport() throws AmbariException {
+ injector.injectMembers(this);
+ clusters.addHost(DummyHostname1);
+ clusters.getHost(DummyHostname1).persist();
+
+ StackId dummyStackId = new StackId(DummyStackId);
+ clusters.addCluster(DummyCluster, dummyStackId);
+
+ ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
+ ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
+ new HostsMap((String) null), unitOfWork, injector.getInstance(RequestFactory.class), null, null);
+ heartbeatTestHelper.populateActionDB(db, DummyHostname1, requestId, stageId);
+ Stage stage = db.getAllStages(requestId).get(0);
+ Assert.assertEquals(stageId, stage.getStageId());
+ stage.setHostRoleStatus(DummyHostname1, HBASE_MASTER, HostRoleStatus.QUEUED);
+ db.hostRoleScheduled(stage, DummyHostname1, HBASE_MASTER);
+ List<CommandReport> reports = new ArrayList<CommandReport>();
+ CommandReport cr = new CommandReport();
+ cr.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr.setTaskId(1);
+ cr.setRole(HBASE_MASTER);
+ cr.setStatus("COMPLETED");
+ cr.setStdErr("");
+ cr.setStdOut("");
+ cr.setExitCode(215);
+
+ cr.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
+ put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
+ }});
+
+
+ reports.add(cr);
+ am.processTaskResponse(DummyHostname1, reports, stage.getOrderedHostRoleCommands());
+ assertEquals(215,
+ am.getAction(requestId, stageId).getExitCode(DummyHostname1, HBASE_MASTER));
+ assertEquals(HostRoleStatus.COMPLETED, am.getAction(requestId, stageId)
+ .getHostRoleStatus(DummyHostname1, HBASE_MASTER));
+ Stage s = db.getAllStages(requestId).get(0);
+ assertEquals(HostRoleStatus.COMPLETED,
+ s.getHostRoleStatus(DummyHostname1, HBASE_MASTER));
+ assertEquals(215,
+ s.getExitCode(DummyHostname1, HBASE_MASTER));
+ }
+
+ /**
+ * Tests the fact that when START and STOP commands are in progress, and heartbeat
+ * forces the host component state to STARTED or INSTALLED, there are no undesired
+ * side effects.
+ * @throws AmbariException
+ * @throws InvalidStateTransitionException
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCommandReportOnHeartbeatUpdatedState()
+ throws AmbariException, InvalidStateTransitionException {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+
+ ActionQueue aq = new ActionQueue();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ serviceComponentHost1.setState(State.INSTALLED);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(0);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+
+ List<CommandReport> reports = new ArrayList<CommandReport>();
+ CommandReport cr = new CommandReport();
+ cr.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr.setTaskId(1);
+ cr.setClusterName(DummyCluster);
+ cr.setServiceName(HDFS);
+ cr.setRole(DATANODE);
+ cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr.setStdErr("none");
+ cr.setStdOut("dummy output");
+ cr.setExitCode(777);
+ cr.setRoleCommand("START");
+ reports.add(cr);
+ hb.setReports(reports);
+ hb.setComponentStatus(new ArrayList<ComponentStatus>());
+
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }}).anyTimes();
+ replay(am);
+
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ heartbeatProcessor.processHeartbeat(hb);
+
+ assertEquals("Host state should be " + State.INSTALLED,
+ State.INSTALLED, serviceComponentHost1.getState());
+
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(1);
+ cr.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr.setExitCode(0);
+
+ heartbeatProcessor.processHeartbeat(hb);
+ assertEquals("Host state should be " + State.STARTED,
+ State.STARTED, serviceComponentHost1.getState());
+
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(2);
+ cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr.setRoleCommand("STOP");
+ cr.setExitCode(777);
+
+ heartbeatProcessor.processHeartbeat(hb);
+ assertEquals("Host state should be " + State.STARTED,
+ State.STARTED, serviceComponentHost1.getState());
+
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(3);
+ cr.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr.setExitCode(0);
+
+ heartbeatProcessor.processHeartbeat(hb);
+ assertEquals("Host state should be " + State.INSTALLED,
+ State.INSTALLED, serviceComponentHost1.getState());
+
+ // validate the transitions when there is no heartbeat
+ serviceComponentHost1.setState(State.STARTING);
+ cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr.setExitCode(777);
+ cr.setRoleCommand("START");
+ hb.setResponseId(4);
+
+ heartbeatProcessor.processHeartbeat(hb);
+ assertEquals("Host state should be " + State.STARTING,
+ State.STARTING, serviceComponentHost1.getState());
+
+ cr.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr.setExitCode(0);
+ hb.setResponseId(5);
+
+ heartbeatProcessor.processHeartbeat(hb);
+ assertEquals("Host state should be " + State.STARTED,
+ State.STARTED, serviceComponentHost1.getState());
+
+ serviceComponentHost1.setState(State.STOPPING);
+ cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr.setExitCode(777);
+ cr.setRoleCommand("STOP");
+ hb.setResponseId(6);
+
+ heartbeatProcessor.processHeartbeat(hb);
+ assertEquals("Host state should be " + State.STOPPING,
+ State.STOPPING, serviceComponentHost1.getState());
+
+ cr.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr.setExitCode(0);
+ hb.setResponseId(7);
+
+ heartbeatProcessor.processHeartbeat(hb);
+ assertEquals("Host state should be " + State.INSTALLED,
+ State.INSTALLED, serviceComponentHost1.getState());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testUpgradeSpecificHandling() throws AmbariException, InvalidStateTransitionException {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+
+ ActionQueue aq = new ActionQueue();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ serviceComponentHost1.setState(State.UPGRADING);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(0);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+
+ List<CommandReport> reports = new ArrayList<CommandReport>();
+ CommandReport cr = new CommandReport();
+ cr.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr.setTaskId(1);
+ cr.setClusterName(DummyCluster);
+ cr.setServiceName(HDFS);
+ cr.setRole(DATANODE);
+ cr.setRoleCommand("INSTALL");
+ cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr.setStdErr("none");
+ cr.setStdOut("dummy output");
+ cr.setExitCode(777);
+ reports.add(cr);
+ hb.setReports(reports);
+ hb.setComponentStatus(new ArrayList<ComponentStatus>());
+
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }}).anyTimes();
+ replay(am);
+
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ heartbeatProcessor.processHeartbeat(hb);
+
+ assertEquals("Host state should be " + State.UPGRADING,
+ State.UPGRADING, serviceComponentHost1.getState());
+
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(1);
+ cr.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr.setExitCode(0);
+
+ heartbeatProcessor.processHeartbeat(hb);
+ assertEquals("Host state should be " + State.INSTALLED,
+ State.INSTALLED, serviceComponentHost1.getState());
+
+ serviceComponentHost1.setState(State.UPGRADING);
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(2);
+ cr.setStatus(HostRoleStatus.FAILED.toString());
+ cr.setExitCode(3);
+
+ heartbeatProcessor.processHeartbeat(hb);
+ assertEquals("Host state should be " + State.UPGRADING,
+ State.UPGRADING, serviceComponentHost1.getState());
+
+ serviceComponentHost1.setState(State.UPGRADING);
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(3);
+ cr.setStatus(HostRoleStatus.PENDING.toString());
+ cr.setExitCode(55);
+
+ heartbeatProcessor.processHeartbeat(hb);
+ assertEquals("Host state should be " + State.UPGRADING,
+ State.UPGRADING, serviceComponentHost1.getState());
+
+ serviceComponentHost1.setState(State.UPGRADING);
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(4);
+ cr.setStatus(HostRoleStatus.QUEUED.toString());
+ cr.setExitCode(55);
+
+ heartbeatProcessor.processHeartbeat(hb);
+ assertEquals("Host state should be " + State.UPGRADING,
+ State.UPGRADING, serviceComponentHost1.getState());
+ }
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCommandStatusProcesses() throws Exception {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED);
+
+ ActionQueue aq = new ActionQueue();
+
+ HeartBeat hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(0);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+ hb.setReports(new ArrayList<CommandReport>());
+
+ List<Map<String, String>> procs = new ArrayList<Map<String, String>>();
+ Map<String, String> proc1info = new HashMap<String, String>();
+ proc1info.put("name", "a");
+ proc1info.put("status", "RUNNING");
+ procs.add(proc1info);
+
+ Map<String, String> proc2info = new HashMap<String, String>();
+ proc2info.put("name", "b");
+ proc2info.put("status", "NOT_RUNNING");
+ procs.add(proc2info);
+
+ Map<String, Object> extra = new HashMap<String, Object>();
+ extra.put("processes", procs);
+
+ ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
+ ComponentStatus componentStatus1 = new ComponentStatus();
+ componentStatus1.setClusterName(DummyCluster);
+ componentStatus1.setServiceName(HDFS);
+ componentStatus1.setMessage(DummyHostStatus);
+ componentStatus1.setStatus(State.STARTED.name());
+ componentStatus1.setSecurityState(SecurityState.UNSECURED.name());
+ componentStatus1.setComponentName(DATANODE);
+
+ componentStatus1.setExtra(extra);
+ componentStatuses.add(componentStatus1);
+ hb.setComponentStatus(componentStatuses);
+
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ }}).anyTimes();
+ replay(am);
+
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ heartbeatProcessor.processHeartbeat(hb);
+ ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+
+ Assert.assertEquals(Integer.valueOf(2), Integer.valueOf(sch.getProcesses().size()));
+
+ hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(1);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+ hb.setReports(new ArrayList<CommandReport>());
+
+ componentStatus1 = new ComponentStatus();
+ componentStatus1.setClusterName(DummyCluster);
+ componentStatus1.setServiceName(HDFS);
+ componentStatus1.setMessage(DummyHostStatus);
+ componentStatus1.setStatus(State.STARTED.name());
+ componentStatus1.setSecurityState(SecurityState.UNSECURED.name());
+ componentStatus1.setComponentName(DATANODE);
+ hb.setComponentStatus(Collections.singletonList(componentStatus1));
+
+ heartbeatProcessor.processHeartbeat(hb);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testComponentUpgradeCompleteReport() throws AmbariException, InvalidStateTransitionException {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(NAMENODE).persist();
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(HDFS_CLIENT).persist();
+ hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+ StackId stack130 = new StackId("HDP-1.3.0");
+ StackId stack120 = new StackId("HDP-1.2.0");
+
+ serviceComponentHost1.setState(State.UPGRADING);
+ serviceComponentHost2.setState(State.INSTALLING);
+
+ serviceComponentHost1.setStackVersion(stack120);
+ serviceComponentHost1.setDesiredStackVersion(stack130);
+ serviceComponentHost2.setStackVersion(stack120);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(0);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+ CommandReport cr1 = new CommandReport();
+ cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr1.setTaskId(1);
+ cr1.setClusterName(DummyCluster);
+ cr1.setServiceName(HDFS);
+ cr1.setRole(DATANODE);
+ cr1.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr1.setStdErr("none");
+ cr1.setStdOut("dummy output");
+ cr1.setExitCode(0);
+ cr1.setRoleCommand(RoleCommand.UPGRADE.toString());
+
+ CommandReport cr2 = new CommandReport();
+ cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr2.setTaskId(2);
+ cr2.setClusterName(DummyCluster);
+ cr2.setServiceName(HDFS);
+ cr2.setRole(NAMENODE);
+ cr2.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr2.setStdErr("none");
+ cr2.setStdOut("dummy output");
+ cr2.setExitCode(0);
+ cr2.setRoleCommand(RoleCommand.UPGRADE.toString());
+ ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+ reports.add(cr1);
+ reports.add(cr2);
+ hb.setReports(reports);
+
+ ActionQueue aq = new ActionQueue();
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ heartbeatProcessor.processHeartbeat(hb);
+
+ assertEquals("Stack version for SCH should be updated to " +
+ serviceComponentHost1.getDesiredStackVersion(),
+ stack130, serviceComponentHost1.getStackVersion());
+ assertEquals("Stack version for SCH should not change ",
+ stack120, serviceComponentHost2.getStackVersion());
+ }
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testComponentUpgradeFailReport() throws AmbariException, InvalidStateTransitionException {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(NAMENODE).persist();
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(HDFS_CLIENT).persist();
+ hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+ StackId stack130 = new StackId("HDP-1.3.0");
+ StackId stack120 = new StackId("HDP-1.2.0");
+
+ serviceComponentHost1.setState(State.UPGRADING);
+ serviceComponentHost2.setState(State.INSTALLING);
+
+ serviceComponentHost1.setStackVersion(stack120);
+ serviceComponentHost1.setDesiredStackVersion(stack130);
+ serviceComponentHost2.setStackVersion(stack120);
+
+ Stage s = stageFactory.createNew(requestId, "/a/b", "cluster1", 1L, "action manager test",
+ "clusterHostInfo", "commandParamsStage", "hostParamsStage");
+ s.setStageId(stageId);
+ s.addHostRoleExecutionCommand(DummyHostname1, Role.DATANODE, RoleCommand.UPGRADE,
+ new ServiceComponentHostUpgradeEvent(Role.DATANODE.toString(),
+ DummyHostname1, System.currentTimeMillis(), "HDP-1.3.0"),
+ DummyCluster, "HDFS", false, false);
+ s.addHostRoleExecutionCommand(DummyHostname1, Role.NAMENODE, RoleCommand.INSTALL,
+ new ServiceComponentHostInstallEvent(Role.NAMENODE.toString(),
+ DummyHostname1, System.currentTimeMillis(), "HDP-1.3.0"),
+ DummyCluster, "HDFS", false, false);
+ List<Stage> stages = new ArrayList<Stage>();
+ stages.add(s);
+ Request request = new Request(stages, clusters);
+ actionDBAccessor.persistActions(request);
+ CommandReport cr = new CommandReport();
+ cr.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr.setTaskId(1);
+ cr.setClusterName(DummyCluster);
+ cr.setServiceName(HDFS);
+ cr.setRole(DATANODE);
+ cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr.setStdErr("none");
+ cr.setStdOut("dummy output");
+ actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
+ Role.DATANODE.name(), cr);
+ cr.setRole(NAMENODE);
+ cr.setTaskId(2);
+ actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId,
+ Role.NAMENODE.name(), cr);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(0);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+ CommandReport cr1 = new CommandReport();
+ cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr1.setTaskId(1);
+ cr1.setClusterName(DummyCluster);
+ cr1.setServiceName(HDFS);
+ cr1.setRole(DATANODE);
+ cr1.setRoleCommand("INSTALL");
+ cr1.setStatus(HostRoleStatus.FAILED.toString());
+ cr1.setStdErr("none");
+ cr1.setStdOut("dummy output");
+ cr1.setExitCode(0);
+
+ CommandReport cr2 = new CommandReport();
+ cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr2.setTaskId(2);
+ cr2.setClusterName(DummyCluster);
+ cr2.setServiceName(HDFS);
+ cr2.setRole(NAMENODE);
+ cr2.setRoleCommand("INSTALL");
+ cr2.setStatus(HostRoleStatus.FAILED.toString());
+ cr2.setStdErr("none");
+ cr2.setStdOut("dummy output");
+ cr2.setExitCode(0);
+ ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+ reports.add(cr1);
+ reports.add(cr2);
+ hb.setReports(reports);
+
+ ActionQueue aq = new ActionQueue();
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ heartbeatProcessor.processHeartbeat(hb);
+
+ assertEquals("State of SCH should change after fail report",
+ State.UPGRADING, serviceComponentHost1.getState());
+ assertEquals("State of SCH should change after fail report",
+ State.INSTALL_FAILED, serviceComponentHost2.getState());
+ assertEquals("Stack version of SCH should not change after fail report",
+ stack120, serviceComponentHost1.getStackVersion());
+ assertEquals("Stack version of SCH should not change after fail report",
+ stack130, serviceComponentHost1.getDesiredStackVersion());
+ assertEquals("Stack version of SCH should not change after fail report",
+ State.INSTALL_FAILED, serviceComponentHost2.getState());
+ }
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testComponentUpgradeInProgressReport() throws AmbariException, InvalidStateTransitionException {
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(NAMENODE).persist();
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(HDFS_CLIENT).persist();
+ hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+
+ ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+
+ StackId stack130 = new StackId("HDP-1.3.0");
+ StackId stack120 = new StackId("HDP-1.2.0");
+
+ serviceComponentHost1.setState(State.UPGRADING);
+ serviceComponentHost2.setState(State.INSTALLING);
+
+ serviceComponentHost1.setStackVersion(stack120);
+ serviceComponentHost1.setDesiredStackVersion(stack130);
+ serviceComponentHost2.setStackVersion(stack120);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(0);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+ CommandReport cr1 = new CommandReport();
+ cr1.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr1.setTaskId(1);
+ cr1.setClusterName(DummyCluster);
+ cr1.setServiceName(HDFS);
+ cr1.setRole(DATANODE);
+ cr1.setRoleCommand("INSTALL");
+ cr1.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr1.setStdErr("none");
+ cr1.setStdOut("dummy output");
+ cr1.setExitCode(777);
+
+ CommandReport cr2 = new CommandReport();
+ cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr2.setTaskId(2);
+ cr2.setClusterName(DummyCluster);
+ cr2.setServiceName(HDFS);
+ cr2.setRole(NAMENODE);
+ cr2.setRoleCommand("INSTALL");
+ cr2.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr2.setStdErr("none");
+ cr2.setStdOut("dummy output");
+ cr2.setExitCode(777);
+ ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
+ reports.add(cr1);
+ reports.add(cr2);
+ hb.setReports(reports);
+
+ ActionQueue aq = new ActionQueue();
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>() {{
+ add(command);
+ add(command);
+ }});
+ replay(am);
+
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, aq);
+ handler.handleHeartBeat(hb);
+ assertEquals("State of SCH not change while operation is in progress",
+ State.UPGRADING, serviceComponentHost1.getState());
+ assertEquals("Stack version of SCH should not change after in progress report",
+ stack130, serviceComponentHost1.getDesiredStackVersion());
+ assertEquals("State of SCH not change while operation is in progress",
+ State.INSTALLING, serviceComponentHost2.getState());
+ }
+
+
+ /**
+ * Tests that if there is an invalid cluster in heartbeat data, the heartbeat
+ * doesn't fail.
+ *
+ * @throws Exception
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testHeartBeatWithAlertAndInvalidCluster() throws Exception {
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ new ArrayList<HostRoleCommand>());
+
+ replay(am);
+
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ Clusters fsm = clusters;
+ Host hostObject = clusters.getHost(DummyHostname1);
+ hostObject.setIPv4("ipv4");
+ hostObject.setIPv6("ipv6");
+ hostObject.setOsType(DummyOsType);
+
+ ActionQueue aq = new ActionQueue();
+
+ HeartBeatHandler handler = new HeartBeatHandler(fsm, aq, am, injector);
+ Register reg = new Register();
+ HostInfo hi = new HostInfo();
+ hi.setHostName(DummyHostname1);
+ hi.setOS(DummyOs);
+ hi.setOSRelease(DummyOSRelease);
+ reg.setHostname(DummyHostname1);
+ reg.setHardwareProfile(hi);
+ reg.setAgentVersion(metaInfo.getServerVersion());
+ handler.handleRegistration(reg);
+
+ hostObject.setState(HostState.UNHEALTHY);
+
+ ExecutionCommand execCmd = new ExecutionCommand();
+ execCmd.setRequestAndStage(2, 34);
+ execCmd.setHostname(DummyHostname1);
+ aq.enqueue(DummyHostname1, new ExecutionCommand());
+
+ HeartBeat hb = new HeartBeat();
+ HostStatus hs = new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus);
+
+ hb.setResponseId(0);
+ hb.setNodeStatus(hs);
+ hb.setHostname(DummyHostname1);
+
+ Alert alert = new Alert("foo", "bar", "baz", "foobar", "foobarbaz",
+ AlertState.OK);
+
+ alert.setCluster("BADCLUSTER");
+
+ List<Alert> alerts = Collections.singletonList(alert);
+ hb.setAlerts(alerts);
+
+ // should NOT throw AmbariException from alerts.
+ handler.getHeartbeatProcessor().processHeartbeat(hb);
+ }
+
+
+ @Test
+ public void testInstallPackagesWithVersion() throws Exception {
+ // required since this test method checks the DAO result of handling a
+ // heartbeat which performs some async tasks
+ EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
+
+ final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
+ Role.DATANODE, null, null);
+
+ ActionManager am = heartbeatTestHelper.getMockActionManager();
+ expect(am.getTasks(anyObject(List.class))).andReturn(
+ Collections.singletonList(command)).anyTimes();
+ replay(am);
+
+ Cluster cluster = heartbeatTestHelper.getDummyCluster();
+ HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(am, new ActionQueue());
+ HeartbeatProcessor heartbeatProcessor = handler.getHeartbeatProcessor();
+ HeartBeat hb = new HeartBeat();
+
+ JsonObject json = new JsonObject();
+ json.addProperty("actual_version", "2.2.1.0-2222");
+ json.addProperty("package_installation_result", "SUCCESS");
+ json.addProperty("installed_repository_version", "0.1");
+ json.addProperty("stack_id", cluster.getDesiredStackVersion().getStackId());
+
+
+ CommandReport cmdReport = new CommandReport();
+ cmdReport.setActionId(StageUtils.getActionId(requestId, stageId));
+ cmdReport.setTaskId(1);
+ cmdReport.setCustomCommand("install_packages");
+ cmdReport.setStructuredOut(json.toString());
+ cmdReport.setRoleCommand(RoleCommand.ACTIONEXECUTE.name());
+ cmdReport.setStatus(HostRoleStatus.COMPLETED.name());
+ cmdReport.setRole("install_packages");
+ cmdReport.setClusterName(DummyCluster);
+
+ hb.setReports(Collections.singletonList(cmdReport));
+ hb.setTimestamp(0L);
+ hb.setResponseId(0);
+ hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));
+ hb.setHostname(DummyHostname1);
+ hb.setComponentStatus(new ArrayList<ComponentStatus>());
+
+ StackId stackId = new StackId("HDP", "0.1");
+
+ RepositoryVersionDAO dao = injector.getInstance(RepositoryVersionDAO.class);
+ RepositoryVersionEntity entity = dao.findByStackAndVersion(stackId, "0.1");
+ Assert.assertNotNull(entity);
+
+ heartbeatProcessor.processHeartbeat(hb);
+
+ entity = dao.findByStackAndVersion(stackId, "0.1");
+ Assert.assertNull(entity);
+
+ entity = dao.findByStackAndVersion(stackId, "2.2.1.0-2222");
+ Assert.assertNotNull(entity);
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java
new file mode 100644
index 0000000..02974ca
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
+import com.google.inject.persist.UnitOfWork;
+import junit.framework.Assert;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.ActionDBAccessor;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.RequestFactory;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.StageFactory;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.HostsMap;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.OrmTestHelper;
+import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.ResourceTypeDAO;
+import org.apache.ambari.server.orm.dao.StackDAO;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.ResourceEntity;
+import org.apache.ambari.server.orm.entities.ResourceTypeEntity;
+import org.apache.ambari.server.orm.entities.StackEntity;
+import org.apache.ambari.server.security.authorization.ResourceType;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.RepositoryVersionState;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyCluster;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostname1;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOSRelease;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOs;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyStackId;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HBASE;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createNiceMock;
+
+@Singleton
+public class HeartbeatTestHelper {
+
+ @Inject
+ Clusters clusters;
+
+ @Inject
+ Injector injector;
+
+ @Inject
+ AmbariMetaInfo metaInfo;
+
+ @Inject
+ ActionDBAccessor actionDBAccessor;
+
+ @Inject
+ ClusterDAO clusterDAO;
+
+ @Inject
+ StackDAO stackDAO;
+
+ @Inject
+ UnitOfWork unitOfWork;
+
+ @Inject
+ ResourceTypeDAO resourceTypeDAO;
+
+ @Inject
+ OrmTestHelper helper;
+
+ @Inject
+ private HostDAO hostDAO;
+
+ @Inject
+ private StageFactory stageFactory;
+
+ public final static StackId HDP_22_STACK = new StackId("HDP", "2.2.0");
+
+ public static InMemoryDefaultTestModule getTestModule() {
+ return new InMemoryDefaultTestModule(){
+
+ @Override
+ protected void configure() {
+ getProperties().put("recovery.type", "FULL");
+ getProperties().put("recovery.lifetime_max_count", "10");
+ getProperties().put("recovery.max_count", "4");
+ getProperties().put("recovery.window_in_minutes", "23");
+ getProperties().put("recovery.retry_interval", "2");
+ super.configure();
+ }
+ };
+ }
+
+ public HeartBeatHandler getHeartBeatHandler(ActionManager am, ActionQueue aq)
+ throws InvalidStateTransitionException, AmbariException {
+ HeartBeatHandler handler = new HeartBeatHandler(clusters, aq, am, injector);
+ Register reg = new Register();
+ HostInfo hi = new HostInfo();
+ hi.setHostName(DummyHostname1);
+ hi.setOS(DummyOs);
+ hi.setOSRelease(DummyOSRelease);
+ reg.setHostname(DummyHostname1);
+ reg.setResponseId(0);
+ reg.setHardwareProfile(hi);
+ reg.setAgentVersion(metaInfo.getServerVersion());
+ handler.handleRegistration(reg);
+ return handler;
+ }
+
+ public ActionManager getMockActionManager() {
+ ActionQueue actionQueueMock = createNiceMock(ActionQueue.class);
+ Clusters clustersMock = createNiceMock(Clusters.class);
+ Configuration configurationMock = createNiceMock(Configuration.class);
+
+ ActionManager actionManager = createMockBuilder(ActionManager.class).
+ addMockedMethod("getTasks").
+ withConstructor((long)0, (long)0, actionQueueMock, clustersMock,
+ actionDBAccessor, new HostsMap((String) null), unitOfWork,
+ injector.getInstance(RequestFactory.class), configurationMock, createNiceMock(AmbariEventPublisher.class)).
+ createMock();
+ return actionManager;
+ }
+
+ public Cluster getDummyCluster()
+ throws AmbariException {
+ StackEntity stackEntity = stackDAO.find(HDP_22_STACK.getStackName(), HDP_22_STACK.getStackVersion());
+ org.junit.Assert.assertNotNull(stackEntity);
+
+ // Create the cluster
+ ResourceTypeEntity resourceTypeEntity = new ResourceTypeEntity();
+ resourceTypeEntity.setId(ResourceType.CLUSTER.getId());
+ resourceTypeEntity.setName(ResourceType.CLUSTER.name());
+ resourceTypeEntity = resourceTypeDAO.merge(resourceTypeEntity);
+
+ ResourceEntity resourceEntity = new ResourceEntity();
+ resourceEntity.setResourceType(resourceTypeEntity);
+
+ ClusterEntity clusterEntity = new ClusterEntity();
+ clusterEntity.setClusterName(DummyCluster);
+ clusterEntity.setClusterInfo("test_cluster_info1");
+ clusterEntity.setResource(resourceEntity);
+ clusterEntity.setDesiredStack(stackEntity);
+
+ clusterDAO.create(clusterEntity);
+
+ StackId stackId = new StackId(DummyStackId);
+
+ Cluster cluster = clusters.getCluster(DummyCluster);
+
+ cluster.setDesiredStackVersion(stackId);
+ cluster.setCurrentStackVersion(stackId);
+ helper.getOrCreateRepositoryVersion(stackId, stackId.getStackVersion());
+ cluster.createClusterVersion(stackId, stackId.getStackVersion(), "admin",
+ RepositoryVersionState.UPGRADING);
+
+ Set<String> hostNames = new HashSet<String>(){{
+ add(DummyHostname1);
+ }};
+
+ Map<String, String> hostAttributes = new HashMap<String, String>();
+ hostAttributes.put("os_family", "redhat");
+ hostAttributes.put("os_release_version", "6.3");
+
+ List<HostEntity> hostEntities = new ArrayList<HostEntity>();
+ for(String hostName : hostNames) {
+ clusters.addHost(hostName);
+ Host host = clusters.getHost(hostName);
+ host.setHostAttributes(hostAttributes);
+ host.persist();
+
+ HostEntity hostEntity = hostDAO.findByName(hostName);
+ Assert.assertNotNull(hostEntity);
+ hostEntities.add(hostEntity);
+ }
+ clusterEntity.setHostEntities(hostEntities);
+ clusters.mapHostsToCluster(hostNames, DummyCluster);
+
+ return cluster;
+ }
+
+ public void populateActionDB(ActionDBAccessor db, String DummyHostname1, long requestId, long stageId) throws AmbariException {
+ Stage s = stageFactory.createNew(requestId, "/a/b", DummyCluster, 1L, "heartbeat handler test",
+ "clusterHostInfo", "commandParamsStage", "hostParamsStage");
+ s.setStageId(stageId);
+ String filename = null;
+ s.addHostRoleExecutionCommand(DummyHostname1, Role.HBASE_MASTER,
+ RoleCommand.START,
+ new ServiceComponentHostStartEvent(Role.HBASE_MASTER.toString(),
+ DummyHostname1, System.currentTimeMillis()), DummyCluster, HBASE, false, false);
+ List<Stage> stages = new ArrayList<Stage>();
+ stages.add(s);
+ Request request = new Request(stages, clusters);
+ db.persistActions(request);
+ }
+
+}
[3/3] ambari git commit: AMBARI-15141. Start all services request
aborts in the middle and hosts go into heartbeat-lost state. (mpapirkovskyy)
Posted by mp...@apache.org.
AMBARI-15141. Start all services request aborts in the middle and hosts go into heartbeat-lost state. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/083ac6da
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/083ac6da
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/083ac6da
Branch: refs/heads/trunk
Commit: 083ac6dab5cf59c01da054eb656507c089a54620
Parents: 9d7ff5f
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Tue Feb 23 13:01:13 2016 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Tue Feb 23 21:04:22 2016 +0200
----------------------------------------------------------------------
.../ambari/server/agent/HeartBeatHandler.java | 582 +------
.../ambari/server/agent/HeartbeatMonitor.java | 6 +
.../ambari/server/agent/HeartbeatProcessor.java | 773 +++++++++
.../ambari/server/orm/dao/HostVersionDAO.java | 78 +-
.../server/orm/entities/HostVersionEntity.java | 9 +
.../server/state/cluster/ClusterImpl.java | 6 +-
.../svccomphost/ServiceComponentHostImpl.java | 72 +-
.../server/agent/HeartbeatProcessorTest.java | 1290 +++++++++++++++
.../server/agent/HeartbeatTestHelper.java | 229 +++
.../server/agent/TestHeartbeatHandler.java | 1489 ++----------------
10 files changed, 2559 insertions(+), 1975 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 248ce4b..ba14446 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -126,6 +126,7 @@ public class HeartBeatHandler {
private final ActionQueue actionQueue;
private final ActionManager actionManager;
private HeartbeatMonitor heartbeatMonitor;
+ private HeartbeatProcessor heartbeatProcessor;
@Inject
private Injector injector;
@@ -137,38 +138,11 @@ public class HeartBeatHandler {
private AmbariMetaInfo ambariMetaInfo;
@Inject
- private ActionMetadata actionMetadata;
-
- @Inject
- private Gson gson;
-
- @Inject
private ConfigHelper configHelper;
@Inject
- private HostDAO hostDAO;
-
- @Inject
private AlertDefinitionHash alertDefinitionHash;
- /**
- * Publishes {@link AlertEvent} instances.
- */
- @Inject
- private AlertEventPublisher alertEventPublisher;
-
- @Inject
- private AmbariEventPublisher ambariEventPublisher;
-
- @Inject
- private VersionEventPublisher versionEventPublisher;
-
-
- /**
- * KerberosPrincipalHostDAO used to set and get Kerberos principal details
- */
- @Inject
- private KerberosPrincipalHostDAO kerberosPrincipalHostDAO;
/**
* KerberosIdentityDataFileReaderFactory used to create KerberosIdentityDataFileReader instances
@@ -187,10 +161,12 @@ public class HeartBeatHandler {
actionQueue = aq;
actionManager = am;
heartbeatMonitor = new HeartbeatMonitor(fsm, aq, am, 60000, injector);
+ heartbeatProcessor = new HeartbeatProcessor(fsm, am, heartbeatMonitor, injector); //TODO modify to match pattern
injector.injectMembers(this);
}
public void start() {
+ heartbeatProcessor.startAsync();
heartbeatMonitor.start();
}
@@ -198,6 +174,14 @@ public class HeartBeatHandler {
this.heartbeatMonitor = heartbeatMonitor;
}
+ public void setHeartbeatProcessor(HeartbeatProcessor heartbeatProcessor) {
+ this.heartbeatProcessor = heartbeatProcessor;
+ }
+
+ public HeartbeatProcessor getHeartbeatProcessor() {
+ return heartbeatProcessor;
+ }
+
public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
throws AmbariException {
long now = System.currentTimeMillis();
@@ -283,18 +267,7 @@ public class HeartBeatHandler {
return createRegisterCommand();
}
- // Examine heartbeat for command reports
- processCommandReports(heartbeat, hostname, clusterFsm, now);
-
- // Examine heartbeat for component live status reports
- processStatusReports(heartbeat, hostname, clusterFsm);
-
- // Calculate host status
- // NOTE: This step must be after processing command/status reports
- processHostStatus(heartbeat, hostname);
-
- // Example heartbeat for alerts from the host or its components
- processAlerts(heartbeat, hostname);
+ heartbeatProcessor.addHeartbeat(heartbeat);
// Send commands if node is active
if (hostObject.getState().equals(HostState.HEALTHY)) {
@@ -305,33 +278,7 @@ public class HeartBeatHandler {
return response;
}
- /**
- * Extracts all of the {@link Alert}s from the heartbeat and fires
- * {@link AlertEvent}s for each one. If there is a problem looking up the
- * cluster, then alerts will not be processed.
- *
- * @param heartbeat
- * the heartbeat to process.
- * @param hostname
- * the host that the heartbeat is for.
- */
- protected void processAlerts(HeartBeat heartbeat, String hostname) {
-
- if (null == hostname || null == heartbeat) {
- return;
- }
- if (null != heartbeat.getAlerts()) {
- AlertEvent event = new AlertReceivedEvent(heartbeat.getAlerts());
- for (Alert alert : event.getAlerts()) {
- if (alert.getHostName() == null) {
- alert.setHostName(hostname);
- }
- }
- alertEventPublisher.publish(event);
-
- }
- }
protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException {
LOG.debug("Received recovery report: " + recoveryReport.toString());
@@ -339,480 +286,6 @@ public class HeartBeatHandler {
host.setRecoveryReport(recoveryReport);
}
- protected void processHostStatus(HeartBeat heartbeat, String hostname) throws AmbariException {
-
- Host host = clusterFsm.getHost(hostname);
- HealthStatus healthStatus = host.getHealthStatus().getHealthStatus();
-
- if (!healthStatus.equals(HostHealthStatus.HealthStatus.UNKNOWN)) {
-
- List<ComponentStatus> componentStatuses = heartbeat.getComponentStatus();
- //Host status info could be calculated only if agent returned statuses in heartbeat
- //Or, if a command is executed that can change component status
- boolean calculateHostStatus = false;
- String clusterName = null;
- if (componentStatuses.size() > 0) {
- calculateHostStatus = true;
- for (ComponentStatus componentStatus : componentStatuses) {
- clusterName = componentStatus.getClusterName();
- break;
- }
- }
-
- if (!calculateHostStatus) {
- List<CommandReport> reports = heartbeat.getReports();
- for (CommandReport report : reports) {
- if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand())) {
- continue;
- }
-
- String service = report.getServiceName();
- if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
- continue;
- }
- if (report.getStatus().equals("COMPLETED")) {
- calculateHostStatus = true;
- clusterName = report.getClusterName();
- break;
- }
- }
- }
-
- if (calculateHostStatus) {
- //Use actual component status to compute the host status
- int masterCount = 0;
- int mastersRunning = 0;
- int slaveCount = 0;
- int slavesRunning = 0;
-
- StackId stackId;
- Cluster cluster = clusterFsm.getCluster(clusterName);
- stackId = cluster.getDesiredStackVersion();
-
- MaintenanceStateHelper psh = injector.getInstance(MaintenanceStateHelper.class);
-
- List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(heartbeat.getHostname());
- for (ServiceComponentHost scHost : scHosts) {
- ComponentInfo componentInfo =
- ambariMetaInfo.getComponent(stackId.getStackName(),
- stackId.getStackVersion(), scHost.getServiceName(),
- scHost.getServiceComponentName());
-
- String status = scHost.getState().name();
-
- String category = componentInfo.getCategory();
-
- if (MaintenanceState.OFF == psh.getEffectiveState(scHost, host)) {
- if (category.equals("MASTER")) {
- ++masterCount;
- if (status.equals("STARTED")) {
- ++mastersRunning;
- }
- } else if (category.equals("SLAVE")) {
- ++slaveCount;
- if (status.equals("STARTED")) {
- ++slavesRunning;
- }
- }
- }
- }
-
- if (masterCount == mastersRunning && slaveCount == slavesRunning) {
- healthStatus = HealthStatus.HEALTHY;
- } else if (masterCount > 0 && mastersRunning < masterCount) {
- healthStatus = HealthStatus.UNHEALTHY;
- } else {
- healthStatus = HealthStatus.ALERT;
- }
-
- host.setStatus(healthStatus.name());
- host.persist();
- }
-
- //If host doesn't belong to any cluster
- if ((clusterFsm.getClustersForHost(host.getHostName())).size() == 0) {
- healthStatus = HealthStatus.HEALTHY;
- host.setStatus(healthStatus.name());
- host.persist();
- }
- }
- }
-
- protected void processCommandReports(
- HeartBeat heartbeat, String hostname, Clusters clusterFsm, long now)
- throws AmbariException {
- List<CommandReport> reports = heartbeat.getReports();
-
- // Cache HostRoleCommand entities because we will need them few times
- List<Long> taskIds = new ArrayList<Long>();
- for (CommandReport report : reports) {
- taskIds.add(report.getTaskId());
- }
- Collection<HostRoleCommand> commands = actionManager.getTasks(taskIds);
-
- Iterator<HostRoleCommand> hostRoleCommandIterator = commands.iterator();
- for (CommandReport report : reports) {
-
- Long clusterId = null;
- if (report.getClusterName() != null) {
- try {
- Cluster cluster = clusterFsm.getCluster(report.getClusterName());
- clusterId = Long.valueOf(cluster.getClusterId());
- } catch (AmbariException e) {
- }
- }
-
- LOG.debug("Received command report: " + report);
- // Fetch HostRoleCommand that corresponds to a given task ID
- HostRoleCommand hostRoleCommand = hostRoleCommandIterator.next();
- HostEntity hostEntity = hostDAO.findByName(hostname);
- if (hostEntity == null) {
- LOG.error("Received a command report and was unable to retrieve HostEntity for hostname = " + hostname);
- continue;
- }
-
- // Send event for final command reports for actions
- if (RoleCommand.valueOf(report.getRoleCommand()) == RoleCommand.ACTIONEXECUTE &&
- HostRoleStatus.valueOf(report.getStatus()).isCompletedState()) {
- ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
- clusterId, hostname, report, false);
- ambariEventPublisher.publish(event);
- }
-
- // Skip sending events for command reports for ABORTed commands
- if (hostRoleCommand.getStatus() == HostRoleStatus.ABORTED) {
- continue;
- }
- if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED &&
- report.getStatus().equals("IN_PROGRESS")) {
- hostRoleCommand.setStartTime(now);
- }
-
- // If the report indicates the keytab file was successfully transferred to a host or removed
- // from a host, record this for future reference
- if (Service.Type.KERBEROS.name().equalsIgnoreCase(report.getServiceName()) &&
- Role.KERBEROS_CLIENT.name().equalsIgnoreCase(report.getRole()) &&
- RoleCommand.CUSTOM_COMMAND.name().equalsIgnoreCase(report.getRoleCommand()) &&
- RequestExecution.Status.COMPLETED.name().equalsIgnoreCase(report.getStatus())) {
-
- String customCommand = report.getCustomCommand();
-
- boolean adding = "SET_KEYTAB".equalsIgnoreCase(customCommand);
- if (adding || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) {
- WriteKeytabsStructuredOut writeKeytabsStructuredOut;
- try {
- writeKeytabsStructuredOut = gson.fromJson(report.getStructuredOut(), WriteKeytabsStructuredOut.class);
- } catch (JsonSyntaxException ex) {
- //Json structure was incorrect do nothing, pass this data further for processing
- writeKeytabsStructuredOut = null;
- }
-
- if (writeKeytabsStructuredOut != null) {
- Map<String, String> keytabs = writeKeytabsStructuredOut.getKeytabs();
- if (keytabs != null) {
- for (Map.Entry<String, String> entry : keytabs.entrySet()) {
- String principal = entry.getKey();
- if (!kerberosPrincipalHostDAO.exists(principal, hostEntity.getHostId())) {
- if (adding) {
- kerberosPrincipalHostDAO.create(principal, hostEntity.getHostId());
- } else if ("_REMOVED_".equalsIgnoreCase(entry.getValue())) {
- kerberosPrincipalHostDAO.remove(principal, hostEntity.getHostId());
- }
- }
- }
- }
- }
- }
- }
-
- //pass custom START, STOP and RESTART
- if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand()) ||
- (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
- !("RESTART".equals(report.getCustomCommand()) ||
- "START".equals(report.getCustomCommand()) ||
- "STOP".equals(report.getCustomCommand())))) {
- continue;
- }
-
- Cluster cl = clusterFsm.getCluster(report.getClusterName());
- String service = report.getServiceName();
- if (service == null || service.isEmpty()) {
- throw new AmbariException("Invalid command report, service: " + service);
- }
- if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
- LOG.debug(report.getRole() + " is an action - skip component lookup");
- } else {
- try {
- Service svc = cl.getService(service);
- ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
- ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
- String schName = scHost.getServiceComponentName();
-
- if (report.getStatus().equals(HostRoleStatus.COMPLETED.toString())) {
-
- // Reading component version if it is present
- if (StringUtils.isNotBlank(report.getStructuredOut())) {
- ComponentVersionStructuredOut structuredOutput = null;
- try {
- structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
- } catch (JsonSyntaxException ex) {
- //Json structure for component version was incorrect
- //do nothing, pass this data further for processing
- }
-
- String newVersion = structuredOutput == null ? null : structuredOutput.version;
-
- // Pass true to always publish a version event. It is safer to recalculate the version even if we don't
- // detect a difference in the value. This is useful in case that a manual database edit is done while
- // ambari-server is stopped.
- handleComponentVersionReceived(cl, scHost, newVersion, true);
- }
-
- // Updating stack version, if needed (this is not actually for express/rolling upgrades!)
- if (scHost.getState().equals(State.UPGRADING)) {
- scHost.setStackVersion(scHost.getDesiredStackVersion());
- } else if ((report.getRoleCommand().equals(RoleCommand.START.toString()) ||
- (report.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND.toString()) &&
- ("START".equals(report.getCustomCommand()) ||
- "RESTART".equals(report.getCustomCommand()))))
- && null != report.getConfigurationTags()
- && !report.getConfigurationTags().isEmpty()) {
- LOG.info("Updating applied config on service " + scHost.getServiceName() +
- ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
- scHost.updateActualConfigs(report.getConfigurationTags());
- scHost.setRestartRequired(false);
- }
- // Necessary for resetting clients stale configs after starting service
- if ((RoleCommand.INSTALL.toString().equals(report.getRoleCommand()) ||
- (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
- "INSTALL".equals(report.getCustomCommand()))) && svcComp.isClientComponent()){
- scHost.updateActualConfigs(report.getConfigurationTags());
- scHost.setRestartRequired(false);
- }
- if (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
- !("START".equals(report.getCustomCommand()) ||
- "STOP".equals(report.getCustomCommand()))) {
- //do not affect states for custom commands except START and STOP
- //lets status commands to be responsible for this
- continue;
- }
-
- if (RoleCommand.START.toString().equals(report.getRoleCommand()) ||
- (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
- "START".equals(report.getCustomCommand()))) {
- scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
- hostname, now));
- scHost.setRestartRequired(false);
- } else if (RoleCommand.STOP.toString().equals(report.getRoleCommand()) ||
- (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
- "STOP".equals(report.getCustomCommand()))) {
- scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
- hostname, now));
- } else {
- scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
- hostname, now));
- }
- } else if (report.getStatus().equals("FAILED")) {
-
- if (StringUtils.isNotBlank(report.getStructuredOut())) {
- try {
- ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
-
- if (null != structuredOutput.upgradeDirection && structuredOutput.upgradeDirection.isUpgrade()) {
- scHost.setUpgradeState(UpgradeState.FAILED);
- }
- } catch (JsonSyntaxException ex) {
- LOG.warn("Structured output was found, but not parseable: {}", report.getStructuredOut());
- }
- }
-
- LOG.warn("Operation failed - may be retried. Service component host: "
- + schName + ", host: " + hostname + " Action id" + report.getActionId());
- if (actionManager.isInProgressCommand(report)) {
- scHost.handleEvent(new ServiceComponentHostOpFailedEvent
- (schName, hostname, now));
- } else {
- LOG.info("Received report for a command that is no longer active. " + report);
- }
- } else if (report.getStatus().equals("IN_PROGRESS")) {
- scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
- hostname, now));
- }
- } catch (ServiceComponentNotFoundException scnex) {
- LOG.warn("Service component not found ", scnex);
- } catch (InvalidStateTransitionException ex) {
- if (LOG.isDebugEnabled()) {
- LOG.warn("State machine exception.", ex);
- } else {
- LOG.warn("State machine exception. " + ex.getMessage());
- }
- }
- }
- }
-
- //Update state machines from reports
- actionManager.processTaskResponse(hostname, reports, commands);
- }
-
- protected void processStatusReports(HeartBeat heartbeat,
- String hostname,
- Clusters clusterFsm)
- throws AmbariException {
- Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
- for (Cluster cl : clusters) {
- for (ComponentStatus status : heartbeat.componentStatus) {
- if (status.getClusterName().equals(cl.getClusterName())) {
- try {
- Service svc = cl.getService(status.getServiceName());
-
- String componentName = status.getComponentName();
- if (svc.getServiceComponents().containsKey(componentName)) {
- ServiceComponent svcComp = svc.getServiceComponent(
- componentName);
- ServiceComponentHost scHost = svcComp.getServiceComponentHost(
- hostname);
- State prevState = scHost.getState();
- State liveState = State.valueOf(State.class, status.getStatus());
- if (prevState.equals(State.INSTALLED)
- || prevState.equals(State.STARTED)
- || prevState.equals(State.STARTING)
- || prevState.equals(State.STOPPING)
- || prevState.equals(State.UNKNOWN)) {
- scHost.setState(liveState); //TODO direct status set breaks state machine sometimes !!!
- if (!prevState.equals(liveState)) {
- LOG.info("State of service component " + componentName
- + " of service " + status.getServiceName()
- + " of cluster " + status.getClusterName()
- + " has changed from " + prevState + " to " + liveState
- + " at host " + hostname);
- }
- }
-
- SecurityState prevSecurityState = scHost.getSecurityState();
- SecurityState currentSecurityState = SecurityState.valueOf(status.getSecurityState());
- if((prevSecurityState != currentSecurityState)) {
- if(prevSecurityState.isEndpoint()) {
- scHost.setSecurityState(currentSecurityState);
- LOG.info(String.format("Security of service component %s of service %s of cluster %s " +
- "has changed from %s to %s on host %s",
- componentName, status.getServiceName(), status.getClusterName(), prevSecurityState,
- currentSecurityState, hostname));
- }
- else {
- LOG.debug(String.format("Security of service component %s of service %s of cluster %s " +
- "has changed from %s to %s on host %s but will be ignored since %s is a " +
- "transitional state",
- componentName, status.getServiceName(), status.getClusterName(),
- prevSecurityState, currentSecurityState, hostname, prevSecurityState));
- }
- }
-
- if (null != status.getStackVersion() && !status.getStackVersion().isEmpty()) {
- scHost.setStackVersion(gson.fromJson(status.getStackVersion(), StackId.class));
- }
-
- if (null != status.getConfigTags()) {
- scHost.updateActualConfigs(status.getConfigTags());
- }
-
- Map<String, Object> extra = status.getExtra();
- if (null != extra && !extra.isEmpty()) {
- try {
- if (extra.containsKey("processes")) {
- @SuppressWarnings("unchecked")
- List<Map<String, String>> list = (List<Map<String, String>>) extra.get("processes");
- scHost.setProcesses(list);
- }
- if (extra.containsKey("version")) {
- String version = extra.get("version").toString();
-
- handleComponentVersionReceived(cl, scHost, version, false);
- }
-
- } catch (Exception e) {
- LOG.error("Could not access extra JSON for " +
- scHost.getServiceComponentName() + " from " +
- scHost.getHostName() + ": " + status.getExtra() +
- " (" + e.getMessage() + ")");
- }
- }
-
- this.heartbeatMonitor.getAgentRequests()
- .setExecutionDetailsRequest(hostname, componentName, status.getSendExecCmdDet());
- } else {
- // TODO: What should be done otherwise?
- }
- } catch (ServiceNotFoundException e) {
- LOG.warn("Received a live status update for a non-initialized"
- + " service"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName());
- // FIXME ignore invalid live update and continue for now?
- continue;
- } catch (ServiceComponentNotFoundException e) {
- LOG.warn("Received a live status update for a non-initialized"
- + " servicecomponent"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName()
- + ", componentName=" + status.getComponentName());
- // FIXME ignore invalid live update and continue for now?
- continue;
- } catch (ServiceComponentHostNotFoundException e) {
- LOG.warn("Received a live status update for a non-initialized"
- + " service"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName()
- + ", componentName=" + status.getComponentName()
- + ", hostname=" + hostname);
- // FIXME ignore invalid live update and continue for now?
- continue;
- } catch (RuntimeException e) {
- LOG.warn("Received a live status with invalid payload"
- + " service"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName()
- + ", componentName=" + status.getComponentName()
- + ", hostname=" + hostname
- + ", error=" + e.getMessage());
- continue;
- }
- }
- }
- }
- }
-
- /**
- * Updates the version of the given service component, sets the upgrade state (if needed)
- * and publishes a version event through the version event publisher.
- *
- * @param cluster the cluster
- * @param scHost service component host
- * @param newVersion new version of service component
- * @param alwaysPublish if true, always publish a version event; if false,
- * only publish if the component version was updated
- */
- private void handleComponentVersionReceived(Cluster cluster, ServiceComponentHost scHost,
- String newVersion, boolean alwaysPublish) {
-
- boolean updated = false;
-
- if (StringUtils.isNotBlank(newVersion)) {
- final String previousVersion = scHost.getVersion();
- if (!StringUtils.equals(previousVersion, newVersion)) {
- scHost.setVersion(newVersion);
- scHost.setStackVersion(cluster.getDesiredStackVersion());
- if (previousVersion != null && !previousVersion.equalsIgnoreCase(State.UNKNOWN.toString())) {
- scHost.setUpgradeState(UpgradeState.COMPLETE);
- }
- updated = true;
- }
- }
-
- if (updated || alwaysPublish) {
- HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, scHost);
- versionEventPublisher.publish(event);
- }
- }
/**
* Adds commands from action queue to a heartbeat response.
@@ -1229,35 +702,4 @@ public class HeartBeatHandler {
}
}
- /**
- * This class is used for mapping json of structured output for component START action.
- */
- private static class ComponentVersionStructuredOut {
- @SerializedName("version")
- private String version;
-
- @SerializedName("upgrade_type")
- private UpgradeType upgradeType = null;
-
- @SerializedName("direction")
- private Direction upgradeDirection = null;
-
- }
-
- /**
- * This class is used for mapping json of structured output for keytab distribution actions.
- */
- private static class WriteKeytabsStructuredOut {
- @SerializedName("keytabs")
- private Map<String,String> keytabs;
-
- public Map<String, String> getKeytabs() {
- return keytabs;
- }
-
- public void setKeytabs(Map<String, String> keytabs) {
- this.keytabs = keytabs;
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index efc717d..378e123 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -140,6 +140,10 @@ public class HeartbeatMonitor implements Runnable {
List<Host> allHosts = clusters.getHosts();
long now = System.currentTimeMillis();
for (Host hostObj : allHosts) {
+ if (hostObj.getState() == HostState.HEARTBEAT_LOST) {
+ //do not check if host already known be lost
+ continue;
+ }
String host = hostObj.getHostName();
HostState hostState = hostObj.getState();
String hostname = hostObj.getHostName();
@@ -212,6 +216,8 @@ public class HeartbeatMonitor implements Runnable {
switch (sch.getState()) {
case INIT:
case INSTALLING:
+ case STARTING:
+ case STOPPING:
//don't send commands until component is installed at least
continue;
default:
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
new file mode 100644
index 0000000..2188a77
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.annotations.SerializedName;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.ServiceComponentHostNotFoundException;
+import org.apache.ambari.server.ServiceComponentNotFoundException;
+import org.apache.ambari.server.ServiceNotFoundException;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.controller.MaintenanceStateHelper;
+import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
+import org.apache.ambari.server.events.AlertEvent;
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.HostComponentVersionEvent;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.VersionEventPublisher;
+import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.ComponentInfo;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.HostHealthStatus;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.SecurityState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.state.UpgradeState;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
+import org.apache.ambari.server.state.scheduler.RequestExecution;
+import org.apache.ambari.server.state.stack.upgrade.Direction;
+import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpSucceededEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartedEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStoppedEvent;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HeartbeatProcessor class is used for bulk processing data retrieved from agents in background
+ *
+ */
+public class HeartbeatProcessor extends AbstractService{
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatProcessor.class);
+
+ private ScheduledExecutorService executor;
+
+ private ConcurrentLinkedQueue<HeartBeat> heartBeatsQueue = new ConcurrentLinkedQueue<>();
+
+ private volatile boolean shouldRun = true;
+
+ //TODO rewrite to correlate with heartbeat frequency, hardcoded in agent as of now
+ private long delay = 5000;
+ private long period = 1000;
+
+ private int poolSize = 1;
+
+ private Clusters clusterFsm;
+ private HeartbeatMonitor heartbeatMonitor;
+ private Injector injector;
+ private ActionManager actionManager;
+
+ /**
+ * Publishes {@link AlertEvent} instances.
+ */
+ @Inject
+ AlertEventPublisher alertEventPublisher;
+
+ @Inject
+ AmbariEventPublisher ambariEventPublisher;
+
+ @Inject
+ VersionEventPublisher versionEventPublisher;
+
+ @Inject
+ ActionMetadata actionMetadata;
+
+ @Inject
+ MaintenanceStateHelper maintenanceStateHelper;
+
+ @Inject
+ AmbariMetaInfo ambariMetaInfo;
+
+ @Inject
+ KerberosPrincipalHostDAO kerberosPrincipalHostDAO;
+
+ @Inject
+ Gson gson;
+
+ @Inject
+ public HeartbeatProcessor(Clusters clusterFsm, ActionManager am, HeartbeatMonitor heartbeatMonitor,
+ Injector injector) {
+ injector.injectMembers(this);
+
+ this.injector = injector;
+ this.heartbeatMonitor = heartbeatMonitor;
+ this.clusterFsm = clusterFsm;
+ actionManager = am;
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("ambari-heartbeat-processor-%d").build();
+ executor = Executors.newScheduledThreadPool(poolSize, threadFactory);
+ }
+
+ @Override
+ protected void doStart() {
+ LOG.info("**** Starting heartbeats processing threads ****");
+ for (int i=0; i< poolSize; i++) {
+ executor.scheduleAtFixedRate(new HeartbeatProcessingTask(), delay, period, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ LOG.info("**** Stopping heartbeats processing threads ****");
+ shouldRun = false;
+ executor.shutdown();
+ }
+
+ public void addHeartbeat(HeartBeat heartBeat) {
+ heartBeatsQueue.add(heartBeat);
+ }
+
+ private HeartBeat pollHeartbeat() {
+ return heartBeatsQueue.poll();
+ }
+
+ /**
+ * Processing task to be scheduled for execution
+ */
+ private class HeartbeatProcessingTask implements Runnable {
+
+ @Override
+ public void run() {
+ while (shouldRun) {
+ try {
+ HeartBeat heartbeat = pollHeartbeat();
+ if (heartbeat == null) {
+ break;
+ }
+ processHeartbeat(heartbeat);
+ } catch (Exception e) {
+ LOG.error("Exception received while processing heartbeat", e);
+ } catch (Throwable throwable) {
+ //catch everything to prevent task suppression
+ LOG.error("ERROR: ", throwable);
+ }
+
+
+ }
+ }
+ }
+
+ /**
+ * Incapsulates logic for processing data from agent heartbeat
+ * @param heartbeat Agent heartbeat object
+ * @throws AmbariException
+ */
+ public void processHeartbeat(HeartBeat heartbeat) throws AmbariException {
+ long now = System.currentTimeMillis();
+
+ processAlerts(heartbeat);
+
+ processCommandReports(heartbeat, now);
+ processStatusReports(heartbeat);
+ //host status calculation are based on task and status reports, should be performed last
+ processHostStatus(heartbeat);
+ }
+
+
+
+ /**
+ * Extracts all of the {@link Alert}s from the heartbeat and fires
+ * {@link AlertEvent}s for each one. If there is a problem looking up the
+ * cluster, then alerts will not be processed.
+ *
+ * @param heartbeat
+ * the heartbeat to process.
+ */
+ protected void processAlerts(HeartBeat heartbeat) {
+ if (heartbeat == null) {
+ return;
+ }
+
+ String hostname = heartbeat.getHostname();
+
+ if (null != heartbeat.getAlerts()) {
+ AlertEvent event = new AlertReceivedEvent(heartbeat.getAlerts());
+ for (Alert alert : event.getAlerts()) {
+ if (alert.getHostName() == null) {
+ alert.setHostName(hostname);
+ }
+ }
+ alertEventPublisher.publish(event);
+
+ }
+ }
+
+ /**
+ * Update host status basing on components statuses
+ * @param heartbeat heartbeat to process
+ * @throws AmbariException
+ */
+ protected void processHostStatus(HeartBeat heartbeat) throws AmbariException {
+
+ String hostname = heartbeat.getHostname();
+ Host host = clusterFsm.getHost(hostname);
+ HostHealthStatus.HealthStatus healthStatus = host.getHealthStatus().getHealthStatus();
+
+ if (!healthStatus.equals(HostHealthStatus.HealthStatus.UNKNOWN)) {
+
+ List<ComponentStatus> componentStatuses = heartbeat.getComponentStatus();
+ //Host status info could be calculated only if agent returned statuses in heartbeat
+ //Or, if a command is executed that can change component status
+ boolean calculateHostStatus = false;
+ String clusterName = null;
+ if (componentStatuses.size() > 0) {
+ calculateHostStatus = true;
+ for (ComponentStatus componentStatus : componentStatuses) {
+ clusterName = componentStatus.getClusterName();
+ break;
+ }
+ }
+
+ if (!calculateHostStatus) {
+ List<CommandReport> reports = heartbeat.getReports();
+ for (CommandReport report : reports) {
+ if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand())) {
+ continue;
+ }
+
+ String service = report.getServiceName();
+ if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
+ continue;
+ }
+ if (report.getStatus().equals("COMPLETED")) {
+ calculateHostStatus = true;
+ clusterName = report.getClusterName();
+ break;
+ }
+ }
+ }
+
+ if (calculateHostStatus) {
+ //Use actual component status to compute the host status
+ int masterCount = 0;
+ int mastersRunning = 0;
+ int slaveCount = 0;
+ int slavesRunning = 0;
+
+ StackId stackId;
+ Cluster cluster = clusterFsm.getCluster(clusterName);
+ stackId = cluster.getDesiredStackVersion();
+
+
+ List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(heartbeat.getHostname());
+ for (ServiceComponentHost scHost : scHosts) {
+ ComponentInfo componentInfo =
+ ambariMetaInfo.getComponent(stackId.getStackName(),
+ stackId.getStackVersion(), scHost.getServiceName(),
+ scHost.getServiceComponentName());
+
+ String status = scHost.getState().name();
+
+ String category = componentInfo.getCategory();
+
+ if (MaintenanceState.OFF == maintenanceStateHelper.getEffectiveState(scHost, host)) {
+ if (category.equals("MASTER")) {
+ ++masterCount;
+ if (status.equals("STARTED")) {
+ ++mastersRunning;
+ }
+ } else if (category.equals("SLAVE")) {
+ ++slaveCount;
+ if (status.equals("STARTED")) {
+ ++slavesRunning;
+ }
+ }
+ }
+ }
+
+ if (masterCount == mastersRunning && slaveCount == slavesRunning) {
+ healthStatus = HostHealthStatus.HealthStatus.HEALTHY;
+ } else if (masterCount > 0 && mastersRunning < masterCount) {
+ healthStatus = HostHealthStatus.HealthStatus.UNHEALTHY;
+ } else {
+ healthStatus = HostHealthStatus.HealthStatus.ALERT;
+ }
+
+ host.setStatus(healthStatus.name());
+ host.persist();
+ }
+
+ //If host doesn't belong to any cluster
+ if ((clusterFsm.getClustersForHost(host.getHostName())).size() == 0) {
+ healthStatus = HostHealthStatus.HealthStatus.HEALTHY;
+ host.setStatus(healthStatus.name());
+ host.persist();
+ }
+ }
+ }
+
+ /**
+ * Process reports of tasks executed on agents
+ * @param heartbeat heartbeat to process
+ * @param now cached current time
+ * @throws AmbariException
+ */
+ protected void processCommandReports(
+ HeartBeat heartbeat, long now)
+ throws AmbariException {
+ String hostname = heartbeat.getHostname();
+ List<CommandReport> reports = heartbeat.getReports();
+
+ // Cache HostRoleCommand entities because we will need them few times
+ List<Long> taskIds = new ArrayList<Long>();
+ for (CommandReport report : reports) {
+ taskIds.add(report.getTaskId());
+ }
+ Collection<HostRoleCommand> commands = actionManager.getTasks(taskIds);
+
+ Iterator<HostRoleCommand> hostRoleCommandIterator = commands.iterator();
+ for (CommandReport report : reports) {
+
+ Long clusterId = null;
+ if (report.getClusterName() != null) {
+ try {
+ Cluster cluster = clusterFsm.getCluster(report.getClusterName());
+ clusterId = cluster.getClusterId();
+ } catch (AmbariException e) {
+ }
+ }
+
+ LOG.debug("Received command report: " + report);
+ // Fetch HostRoleCommand that corresponds to a given task ID
+ HostRoleCommand hostRoleCommand = hostRoleCommandIterator.next();
+ Host host = clusterFsm.getHost(hostname);
+// HostEntity hostEntity = hostDAO.findByName(hostname); //don't touch database
+ if (host == null) {
+ LOG.error("Received a command report and was unable to retrieve Host for hostname = " + hostname);
+ continue;
+ }
+
+ // Send event for final command reports for actions
+ if (RoleCommand.valueOf(report.getRoleCommand()) == RoleCommand.ACTIONEXECUTE &&
+ HostRoleStatus.valueOf(report.getStatus()).isCompletedState()) {
+ ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
+ clusterId, hostname, report, false);
+ ambariEventPublisher.publish(event);
+ }
+
+ // Skip sending events for command reports for ABORTed commands
+ if (hostRoleCommand.getStatus() == HostRoleStatus.ABORTED) {
+ continue;
+ }
+ if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED &&
+ report.getStatus().equals("IN_PROGRESS")) {
+ hostRoleCommand.setStartTime(now);
+ }
+
+ // If the report indicates the keytab file was successfully transferred to a host or removed
+ // from a host, record this for future reference
+ if (Service.Type.KERBEROS.name().equalsIgnoreCase(report.getServiceName()) &&
+ Role.KERBEROS_CLIENT.name().equalsIgnoreCase(report.getRole()) &&
+ RoleCommand.CUSTOM_COMMAND.name().equalsIgnoreCase(report.getRoleCommand()) &&
+ RequestExecution.Status.COMPLETED.name().equalsIgnoreCase(report.getStatus())) {
+
+ String customCommand = report.getCustomCommand();
+
+ boolean adding = "SET_KEYTAB".equalsIgnoreCase(customCommand);
+ if (adding || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) {
+ WriteKeytabsStructuredOut writeKeytabsStructuredOut;
+ try {
+ writeKeytabsStructuredOut = gson.fromJson(report.getStructuredOut(), WriteKeytabsStructuredOut.class);
+ } catch (JsonSyntaxException ex) {
+ //Json structure was incorrect do nothing, pass this data further for processing
+ writeKeytabsStructuredOut = null;
+ }
+
+ if (writeKeytabsStructuredOut != null) {
+ Map<String, String> keytabs = writeKeytabsStructuredOut.getKeytabs();
+ if (keytabs != null) {
+ for (Map.Entry<String, String> entry : keytabs.entrySet()) {
+ String principal = entry.getKey();
+ if (!kerberosPrincipalHostDAO.exists(principal, host.getHostId())) {
+ if (adding) {
+ kerberosPrincipalHostDAO.create(principal, host.getHostId());
+ } else if ("_REMOVED_".equalsIgnoreCase(entry.getValue())) {
+ kerberosPrincipalHostDAO.remove(principal, host.getHostId());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ //pass custom START, STOP and RESTART
+ if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand()) ||
+ (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+ !("RESTART".equals(report.getCustomCommand()) ||
+ "START".equals(report.getCustomCommand()) ||
+ "STOP".equals(report.getCustomCommand())))) {
+ continue;
+ }
+
+ Cluster cl = clusterFsm.getCluster(report.getClusterName());
+ String service = report.getServiceName();
+ if (service == null || service.isEmpty()) {
+ throw new AmbariException("Invalid command report, service: " + service);
+ }
+ if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
+ LOG.debug(report.getRole() + " is an action - skip component lookup");
+ } else {
+ try {
+ Service svc = cl.getService(service);
+ ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
+ ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
+ String schName = scHost.getServiceComponentName();
+
+ if (report.getStatus().equals(HostRoleStatus.COMPLETED.toString())) {
+
+ // Reading component version if it is present
+ if (StringUtils.isNotBlank(report.getStructuredOut())) {
+ ComponentVersionStructuredOut structuredOutput = null;
+ try {
+ structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
+ } catch (JsonSyntaxException ex) {
+ //Json structure for component version was incorrect
+ //do nothing, pass this data further for processing
+ }
+
+ String newVersion = structuredOutput == null ? null : structuredOutput.version;
+
+ // Pass true to always publish a version event. It is safer to recalculate the version even if we don't
+ // detect a difference in the value. This is useful in case that a manual database edit is done while
+ // ambari-server is stopped.
+ handleComponentVersionReceived(cl, scHost, newVersion, true);
+ }
+
+ // Updating stack version, if needed (this is not actually for express/rolling upgrades!)
+ if (scHost.getState().equals(org.apache.ambari.server.state.State.UPGRADING)) {
+ scHost.setStackVersion(scHost.getDesiredStackVersion());
+ } else if ((report.getRoleCommand().equals(RoleCommand.START.toString()) ||
+ (report.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND.toString()) &&
+ ("START".equals(report.getCustomCommand()) ||
+ "RESTART".equals(report.getCustomCommand()))))
+ && null != report.getConfigurationTags()
+ && !report.getConfigurationTags().isEmpty()) {
+ LOG.info("Updating applied config on service " + scHost.getServiceName() +
+ ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
+ scHost.updateActualConfigs(report.getConfigurationTags());
+ scHost.setRestartRequired(false);
+ }
+ // Necessary for resetting clients stale configs after starting service
+ if ((RoleCommand.INSTALL.toString().equals(report.getRoleCommand()) ||
+ (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+ "INSTALL".equals(report.getCustomCommand()))) && svcComp.isClientComponent()){
+ scHost.updateActualConfigs(report.getConfigurationTags());
+ scHost.setRestartRequired(false);
+ }
+ if (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+ !("START".equals(report.getCustomCommand()) ||
+ "STOP".equals(report.getCustomCommand()))) {
+ //do not affect states for custom commands except START and STOP
+ //lets status commands to be responsible for this
+ continue;
+ }
+
+ if (RoleCommand.START.toString().equals(report.getRoleCommand()) ||
+ (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+ "START".equals(report.getCustomCommand()))) {
+ scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
+ hostname, now));
+ scHost.setRestartRequired(false);
+ } else if (RoleCommand.STOP.toString().equals(report.getRoleCommand()) ||
+ (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+ "STOP".equals(report.getCustomCommand()))) {
+ scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
+ hostname, now));
+ } else {
+ scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
+ hostname, now));
+ }
+ } else if (report.getStatus().equals("FAILED")) {
+
+ if (StringUtils.isNotBlank(report.getStructuredOut())) {
+ try {
+ ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
+
+ if (null != structuredOutput.upgradeDirection && structuredOutput.upgradeDirection.isUpgrade()) {
+ scHost.setUpgradeState(UpgradeState.FAILED);
+ }
+ } catch (JsonSyntaxException ex) {
+ LOG.warn("Structured output was found, but not parseable: {}", report.getStructuredOut());
+ }
+ }
+
+ LOG.warn("Operation failed - may be retried. Service component host: "
+ + schName + ", host: " + hostname + " Action id" + report.getActionId());
+ if (actionManager.isInProgressCommand(report)) {
+ scHost.handleEvent(new ServiceComponentHostOpFailedEvent
+ (schName, hostname, now));
+ } else {
+ LOG.info("Received report for a command that is no longer active. " + report);
+ }
+ } else if (report.getStatus().equals("IN_PROGRESS")) {
+ scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
+ hostname, now));
+ }
+ } catch (ServiceComponentNotFoundException scnex) {
+ LOG.warn("Service component not found ", scnex);
+ } catch (InvalidStateTransitionException ex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.warn("State machine exception.", ex);
+ } else {
+ LOG.warn("State machine exception. " + ex.getMessage());
+ }
+ }
+ }
+ }
+
+ //Update state machines from reports
+ actionManager.processTaskResponse(hostname, reports, commands);
+ }
+
+ /**
+ * Process reports of status commands
+ * @param heartbeat heartbeat to process
+ * @throws AmbariException
+ */
+ protected void processStatusReports(HeartBeat heartbeat) throws AmbariException {
+ String hostname = heartbeat.getHostname();
+ Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
+ for (Cluster cl : clusters) {
+ for (ComponentStatus status : heartbeat.componentStatus) {
+ if (status.getClusterName().equals(cl.getClusterName())) {
+ try {
+ Service svc = cl.getService(status.getServiceName());
+
+ String componentName = status.getComponentName();
+ if (svc.getServiceComponents().containsKey(componentName)) {
+ ServiceComponent svcComp = svc.getServiceComponent(
+ componentName);
+ ServiceComponentHost scHost = svcComp.getServiceComponentHost(
+ hostname);
+ org.apache.ambari.server.state.State prevState = scHost.getState();
+ org.apache.ambari.server.state.State liveState =
+ org.apache.ambari.server.state.State.valueOf(org.apache.ambari.server.state.State.class,
+ status.getStatus());
+ if (prevState.equals(org.apache.ambari.server.state.State.INSTALLED)
+ || prevState.equals(org.apache.ambari.server.state.State.STARTED)
+ || prevState.equals(org.apache.ambari.server.state.State.STARTING)
+ || prevState.equals(org.apache.ambari.server.state.State.STOPPING)
+ || prevState.equals(org.apache.ambari.server.state.State.UNKNOWN)) {
+ scHost.setState(liveState); //TODO direct status set breaks state machine sometimes !!!
+ if (!prevState.equals(liveState)) {
+ LOG.info("State of service component " + componentName
+ + " of service " + status.getServiceName()
+ + " of cluster " + status.getClusterName()
+ + " has changed from " + prevState + " to " + liveState
+ + " at host " + hostname);
+ }
+ }
+
+ SecurityState prevSecurityState = scHost.getSecurityState();
+ SecurityState currentSecurityState = SecurityState.valueOf(status.getSecurityState());
+ if((prevSecurityState != currentSecurityState)) {
+ if(prevSecurityState.isEndpoint()) {
+ scHost.setSecurityState(currentSecurityState);
+ LOG.info(String.format("Security of service component %s of service %s of cluster %s " +
+ "has changed from %s to %s on host %s",
+ componentName, status.getServiceName(), status.getClusterName(), prevSecurityState,
+ currentSecurityState, hostname));
+ }
+ else {
+ LOG.debug(String.format("Security of service component %s of service %s of cluster %s " +
+ "has changed from %s to %s on host %s but will be ignored since %s is a " +
+ "transitional state",
+ componentName, status.getServiceName(), status.getClusterName(),
+ prevSecurityState, currentSecurityState, hostname, prevSecurityState));
+ }
+ }
+
+ if (null != status.getStackVersion() && !status.getStackVersion().isEmpty()) {
+ scHost.setStackVersion(gson.fromJson(status.getStackVersion(), StackId.class));
+ }
+
+ if (null != status.getConfigTags()) {
+ scHost.updateActualConfigs(status.getConfigTags());
+ }
+
+ Map<String, Object> extra = status.getExtra();
+ if (null != extra && !extra.isEmpty()) {
+ try {
+ if (extra.containsKey("processes")) {
+ @SuppressWarnings("unchecked")
+ List<Map<String, String>> list = (List<Map<String, String>>) extra.get("processes");
+ scHost.setProcesses(list);
+ }
+ if (extra.containsKey("version")) {
+ String version = extra.get("version").toString();
+
+ handleComponentVersionReceived(cl, scHost, version, false);
+ }
+
+ } catch (Exception e) {
+ LOG.error("Could not access extra JSON for " +
+ scHost.getServiceComponentName() + " from " +
+ scHost.getHostName() + ": " + status.getExtra() +
+ " (" + e.getMessage() + ")");
+ }
+ }
+
+ this.heartbeatMonitor.getAgentRequests()
+ .setExecutionDetailsRequest(hostname, componentName, status.getSendExecCmdDet());
+ } else {
+ // TODO: What should be done otherwise?
+ }
+ } catch (ServiceNotFoundException e) {
+ LOG.warn("Received a live status update for a non-initialized"
+ + " service"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName());
+ // FIXME ignore invalid live update and continue for now?
+ continue;
+ } catch (ServiceComponentNotFoundException e) {
+ LOG.warn("Received a live status update for a non-initialized"
+ + " servicecomponent"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName()
+ + ", componentName=" + status.getComponentName());
+ // FIXME ignore invalid live update and continue for now?
+ continue;
+ } catch (ServiceComponentHostNotFoundException e) {
+ LOG.warn("Received a live status update for a non-initialized"
+ + " service"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName()
+ + ", componentName=" + status.getComponentName()
+ + ", hostname=" + hostname);
+ // FIXME ignore invalid live update and continue for now?
+ continue;
+ } catch (RuntimeException e) {
+ LOG.warn("Received a live status with invalid payload"
+ + " service"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName()
+ + ", componentName=" + status.getComponentName()
+ + ", hostname=" + hostname
+ + ", error=" + e.getMessage());
+ continue;
+ }
+ }
+ }
+ }
+ }
+
+
+
+ /**
+ * Updates the version of the given service component, sets the upgrade state (if needed)
+ * and publishes a version event through the version event publisher.
+ *
+ * @param cluster the cluster
+ * @param scHost service component host
+ * @param newVersion new version of service component
+ * @param alwaysPublish if true, always publish a version event; if false,
+ * only publish if the component version was updated
+ */
+ private void handleComponentVersionReceived(Cluster cluster, ServiceComponentHost scHost,
+ String newVersion, boolean alwaysPublish) {
+
+ boolean updated = false;
+
+ if (StringUtils.isNotBlank(newVersion)) {
+ final String previousVersion = scHost.getVersion();
+ if (!StringUtils.equals(previousVersion, newVersion)) {
+ scHost.setVersion(newVersion);
+ scHost.setStackVersion(cluster.getDesiredStackVersion());
+ if (previousVersion != null && !previousVersion.equalsIgnoreCase(
+ org.apache.ambari.server.state.State.UNKNOWN.toString())) {
+ scHost.setUpgradeState(UpgradeState.COMPLETE);
+ }
+ updated = true;
+ }
+ }
+
+ if (updated || alwaysPublish) {
+ HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, scHost);
+ versionEventPublisher.publish(event);
+ }
+ }
+
+ /**
+ * This class is used for mapping json of structured output for keytab distribution actions.
+ */
+ private static class WriteKeytabsStructuredOut {
+ @SerializedName("keytabs")
+ private Map<String,String> keytabs;
+
+ public Map<String, String> getKeytabs() {
+ return keytabs;
+ }
+
+ public void setKeytabs(Map<String, String> keytabs) {
+ this.keytabs = keytabs;
+ }
+ }
+
+
+ /**
+ * This class is used for mapping json of structured output for component START action.
+ */
+ private static class ComponentVersionStructuredOut {
+ @SerializedName("version")
+ private String version;
+
+ @SerializedName("upgrade_type")
+ private UpgradeType upgradeType = null;
+
+ @SerializedName("direction")
+ private Direction upgradeDirection = null;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
index 040876a..e28f9ef 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
@@ -131,8 +131,8 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
}
/**
- * Retrieve all of the host versions for the given cluster name, host name, and state.
- *
+ * Retrieve all of the host versions for the given cluster name, host name, and state. <br/>
+ * Consider using faster method: {@link HostVersionDAO#findByClusterHostAndState(long, long, org.apache.ambari.server.state.RepositoryVersionState)}
* @param clusterName Cluster name
* @param hostName FQDN of host
* @param state repository version state
@@ -150,8 +150,29 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
}
/**
+ * Faster version of {@link HostVersionDAO#findByClusterHostAndState(java.lang.String, java.lang.String, org.apache.ambari.server.state.RepositoryVersionState)}
+ *
+ * @param clusterId Cluster ID
+ * @param hostId Host ID
+ * @param state repository version state
+ * @return Return all of the host versions that match the criteria.
+ */
+ @RequiresSession
+ public List<HostVersionEntity> findByClusterHostAndState(long clusterId, long hostId, RepositoryVersionState state) {
+ TypedQuery<HostVersionEntity> query =
+ entityManagerProvider.get().createNamedQuery("hostVersionByClusterHostIdAndState", HostVersionEntity.class);
+
+ query.setParameter("clusterId", clusterId);
+ query.setParameter("hostId", hostId);
+ query.setParameter("state", state);
+
+ return daoUtils.selectList(query);
+ }
+
+ /**
* Retrieve the single host version whose state is {@link org.apache.ambari.server.state.RepositoryVersionState#CURRENT}, of which there should be exactly one at all times
* for the given host.
+ * Consider using faster method {@link HostVersionDAO#findByHostAndStateCurrent(long, long)}
*
* @param clusterName Cluster name
* @param hostName Host name
@@ -175,8 +196,36 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
}
/**
+ * Retrieve the single host version whose state is {@link org.apache.ambari.server.state.RepositoryVersionState#CURRENT}, of which there should be exactly one at all times
+ * for the given host.
+ * Faster version of {@link HostVersionDAO#findByHostAndStateCurrent(java.lang.String, java.lang.String)}
+ * @param clusterId Cluster ID
+ * @param hostId host ID
+ * @return Returns the single host version for this host whose state is {@link org.apache.ambari.server.state.RepositoryVersionState#CURRENT}, or {@code null} otherwise.
+ */
+ @RequiresSession
+ public HostVersionEntity findByHostAndStateCurrent(long clusterId, long hostId) {
+ try {
+ List<?> results = findByClusterHostAndState(clusterId, hostId, RepositoryVersionState.CURRENT);
+ if (results.isEmpty()) {
+ return null;
+ } else {
+ if (results.size() == 1) {
+ return (HostVersionEntity) results.get(0);
+ }
+ }
+ throw new NonUniqueResultException();
+ } catch (NoResultException ignored) {
+ return null;
+ }
+ }
+
+ /**
* Retrieve the single host version for the given cluster, stack name, stack
- * version, and host name.
+ * version, and host name. <br/>
+ * This query is slow and not suitable for frequent use. <br/>
+ * Please, use {@link HostVersionDAO#findByClusterStackVersionAndHost(long, org.apache.ambari.server.state.StackId, java.lang.String, long)} <br/>
+ * It is ~50 times faster
*
* @param clusterName
* Cluster name
@@ -203,6 +252,29 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
return daoUtils.selectSingle(query);
}
+ /**
+ * Optimized version of {@link HostVersionDAO#findByClusterStackVersionAndHost(java.lang.String, org.apache.ambari.server.state.StackId, java.lang.String, java.lang.String)}
+ * @param clusterId Id of cluster
+ * @param stackId Stack ID (e.g., HDP-2.2)
+ * @param version Stack version (e.g., 2.2.0.1-995)
+ * @param hostId Host Id
+ * @return Returns the single host version that matches the criteria.
+ */
+ @RequiresSession
+ public HostVersionEntity findByClusterStackVersionAndHost(long clusterId, StackId stackId, String version,
+ long hostId) {
+ TypedQuery<HostVersionEntity> query = entityManagerProvider.get()
+ .createNamedQuery("hostVersionByClusterStackVersionAndHostId", HostVersionEntity.class);
+
+ query.setParameter("clusterId", clusterId);
+ query.setParameter("stackName", stackId.getStackName());
+ query.setParameter("stackVersion", stackId.getStackVersion());
+ query.setParameter("version", version);
+ query.setParameter("hostId", hostId);
+
+ return daoUtils.selectSingle(query);
+ }
+
@Transactional
public void removeByHostName(String hostName) {
Collection<HostVersionEntity> hostVersions = this.findByHost(hostName);
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
index b69518b..6be4b50 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
@@ -62,6 +62,15 @@ import org.apache.ambari.server.state.RepositoryVersionState;
"SELECT hostVersion FROM HostVersionEntity hostVersion JOIN hostVersion.hostEntity host JOIN host.clusterEntities clusters " +
"WHERE clusters.clusterName=:clusterName AND hostVersion.repositoryVersion.stack.stackName=:stackName AND hostVersion.repositoryVersion.stack.stackVersion=:stackVersion AND hostVersion.repositoryVersion.version=:version AND " +
"hostVersion.hostEntity.hostName=:hostName"),
+
+ @NamedQuery(name = "hostVersionByClusterHostIdAndState", query =
+ "SELECT hostVersion FROM HostVersionEntity hostVersion JOIN hostVersion.hostEntity host JOIN host.clusterEntities clusters " +
+ "WHERE clusters.clusterId=:clusterId AND hostVersion.hostId=:hostId AND hostVersion.state=:state"),
+
+ @NamedQuery(name = "hostVersionByClusterStackVersionAndHostId", query =
+ "SELECT hostVersion FROM HostVersionEntity hostVersion JOIN hostVersion.hostEntity host JOIN host.clusterEntities clusters " +
+ "WHERE hostVersion.hostId=:hostId AND clusters.clusterId=:clusterId AND hostVersion.repositoryVersion.stack.stackName=:stackName " +
+ "AND hostVersion.repositoryVersion.stack.stackVersion=:stackVersion AND hostVersion.repositoryVersion.version=:version")
})
public class HostVersionEntity {
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 4212975..c6d01e8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -1560,8 +1560,8 @@ public class ClusterImpl implements Cluster {
StackId repoVersionStackId = new StackId(repoVersionStackEntity);
HostVersionEntity hostVersionEntity = hostVersionDAO.findByClusterStackVersionAndHost(
- getClusterName(), repoVersionStackId, repositoryVersion.getVersion(),
- host.getHostName());
+ getClusterId(), repoVersionStackId, repositoryVersion.getVersion(),
+ host.getHostId());
hostTransitionStateWriteLock.lock();
try {
@@ -1576,7 +1576,7 @@ public class ClusterImpl implements Cluster {
hostVersionDAO.create(hostVersionEntity);
}
- HostVersionEntity currentVersionEntity = hostVersionDAO.findByHostAndStateCurrent(getClusterName(), host.getHostName());
+ HostVersionEntity currentVersionEntity = hostVersionDAO.findByHostAndStateCurrent(getClusterId(), host.getHostId());
boolean isCurrentPresent = (currentVersionEntity != null);
final ServiceComponentHostSummary hostSummary = new ServiceComponentHostSummary(ambariMetaInfo, host, stack);
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index bfb6214..1bd60a8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -1319,43 +1319,48 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public ServiceComponentHostResponse convertToResponse() {
- readLock.lock();
+ clusterGlobalLock.readLock().lock();
try {
- HostComponentStateEntity hostComponentStateEntity = getStateEntity();
- if (null == hostComponentStateEntity) {
- LOG.warn("Could not convert ServiceComponentHostResponse to a response. It's possible that Host " + getHostName() + " was deleted.");
- return null;
- }
+ readLock.lock();
+ try {
+ HostComponentStateEntity hostComponentStateEntity = getStateEntity();
+ if (null == hostComponentStateEntity) {
+ LOG.warn("Could not convert ServiceComponentHostResponse to a response. It's possible that Host " + getHostName() + " was deleted.");
+ return null;
+ }
- String clusterName = serviceComponent.getClusterName();
- String serviceName = serviceComponent.getServiceName();
- String serviceComponentName = serviceComponent.getName();
- String hostName = getHostName();
- String state = getState().toString();
- String stackId = getStackVersion().getStackId();
- String desiredState = getDesiredState().toString();
- String desiredStackId = getDesiredStackVersion().getStackId();
- HostComponentAdminState componentAdminState = getComponentAdminState();
- UpgradeState upgradeState = hostComponentStateEntity.getUpgradeState();
-
- ServiceComponentHostResponse r = new ServiceComponentHostResponse(
- clusterName, serviceName,
- serviceComponentName, hostName, state,
- stackId, desiredState,
- desiredStackId, componentAdminState);
-
- r.setActualConfigs(actualConfigs);
- r.setUpgradeState(upgradeState);
+ String clusterName = serviceComponent.getClusterName();
+ String serviceName = serviceComponent.getServiceName();
+ String serviceComponentName = serviceComponent.getName();
+ String hostName = getHostName();
+ String state = getState().toString();
+ String stackId = getStackVersion().getStackId();
+ String desiredState = getDesiredState().toString();
+ String desiredStackId = getDesiredStackVersion().getStackId();
+ HostComponentAdminState componentAdminState = getComponentAdminState();
+ UpgradeState upgradeState = hostComponentStateEntity.getUpgradeState();
+
+ ServiceComponentHostResponse r = new ServiceComponentHostResponse(
+ clusterName, serviceName,
+ serviceComponentName, hostName, state,
+ stackId, desiredState,
+ desiredStackId, componentAdminState);
+
+ r.setActualConfigs(actualConfigs);
+ r.setUpgradeState(upgradeState);
- try {
- r.setStaleConfig(helper.isStaleConfigs(this));
- } catch (Exception e) {
- LOG.error("Could not determine stale config", e);
- }
+ try {
+ r.setStaleConfig(helper.isStaleConfigs(this));
+ } catch (Exception e) {
+ LOG.error("Could not determine stale config", e);
+ }
- return r;
+ return r;
+ } finally {
+ readLock.unlock();
+ }
} finally {
- readLock.unlock();
+ clusterGlobalLock.readLock().unlock();
}
}
@@ -1797,6 +1802,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
}
final String hostName = getHostName();
+ final long hostId = getHost().getHostId();
final Set<Cluster> clustersForHost = clusters.getClustersForHost(hostName);
if (clustersForHost.size() != 1) {
throw new AmbariException("Host " + hostName + " should be assigned only to one cluster");
@@ -1815,7 +1821,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
repositoryVersion = createRepositoryVersion(version, stackId, stackInfo);
}
- final HostEntity host = hostDAO.findByName(hostName);
+ final HostEntity host = hostDAO.findById(hostId);
cluster.transitionHostVersionState(host, repositoryVersion, stackId);
} finally {
writeLock.unlock();