You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2016/02/23 22:21:55 UTC
[10/11] ambari git commit: AMBARI-15141. Start all services request
aborts in the middle and hosts go into heartbeat-lost state. (mpapirkovskyy)
AMBARI-15141. Start all services request aborts in the middle and hosts go into heartbeat-lost state. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/083ac6da
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/083ac6da
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/083ac6da
Branch: refs/heads/branch-dev-patch-upgrade
Commit: 083ac6dab5cf59c01da054eb656507c089a54620
Parents: 9d7ff5f
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Tue Feb 23 13:01:13 2016 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Tue Feb 23 21:04:22 2016 +0200
----------------------------------------------------------------------
.../ambari/server/agent/HeartBeatHandler.java | 582 +------
.../ambari/server/agent/HeartbeatMonitor.java | 6 +
.../ambari/server/agent/HeartbeatProcessor.java | 773 +++++++++
.../ambari/server/orm/dao/HostVersionDAO.java | 78 +-
.../server/orm/entities/HostVersionEntity.java | 9 +
.../server/state/cluster/ClusterImpl.java | 6 +-
.../svccomphost/ServiceComponentHostImpl.java | 72 +-
.../server/agent/HeartbeatProcessorTest.java | 1290 +++++++++++++++
.../server/agent/HeartbeatTestHelper.java | 229 +++
.../server/agent/TestHeartbeatHandler.java | 1489 ++----------------
10 files changed, 2559 insertions(+), 1975 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 248ce4b..ba14446 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -126,6 +126,7 @@ public class HeartBeatHandler {
private final ActionQueue actionQueue;
private final ActionManager actionManager;
private HeartbeatMonitor heartbeatMonitor;
+ private HeartbeatProcessor heartbeatProcessor;
@Inject
private Injector injector;
@@ -137,38 +138,11 @@ public class HeartBeatHandler {
private AmbariMetaInfo ambariMetaInfo;
@Inject
- private ActionMetadata actionMetadata;
-
- @Inject
- private Gson gson;
-
- @Inject
private ConfigHelper configHelper;
@Inject
- private HostDAO hostDAO;
-
- @Inject
private AlertDefinitionHash alertDefinitionHash;
- /**
- * Publishes {@link AlertEvent} instances.
- */
- @Inject
- private AlertEventPublisher alertEventPublisher;
-
- @Inject
- private AmbariEventPublisher ambariEventPublisher;
-
- @Inject
- private VersionEventPublisher versionEventPublisher;
-
-
- /**
- * KerberosPrincipalHostDAO used to set and get Kerberos principal details
- */
- @Inject
- private KerberosPrincipalHostDAO kerberosPrincipalHostDAO;
/**
* KerberosIdentityDataFileReaderFactory used to create KerberosIdentityDataFileReader instances
@@ -187,10 +161,12 @@ public class HeartBeatHandler {
actionQueue = aq;
actionManager = am;
heartbeatMonitor = new HeartbeatMonitor(fsm, aq, am, 60000, injector);
+ heartbeatProcessor = new HeartbeatProcessor(fsm, am, heartbeatMonitor, injector); //TODO modify to match pattern
injector.injectMembers(this);
}
public void start() {
+ heartbeatProcessor.startAsync();
heartbeatMonitor.start();
}
@@ -198,6 +174,14 @@ public class HeartBeatHandler {
this.heartbeatMonitor = heartbeatMonitor;
}
+ public void setHeartbeatProcessor(HeartbeatProcessor heartbeatProcessor) {
+ this.heartbeatProcessor = heartbeatProcessor;
+ }
+
+ public HeartbeatProcessor getHeartbeatProcessor() {
+ return heartbeatProcessor;
+ }
+
public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
throws AmbariException {
long now = System.currentTimeMillis();
@@ -283,18 +267,7 @@ public class HeartBeatHandler {
return createRegisterCommand();
}
- // Examine heartbeat for command reports
- processCommandReports(heartbeat, hostname, clusterFsm, now);
-
- // Examine heartbeat for component live status reports
- processStatusReports(heartbeat, hostname, clusterFsm);
-
- // Calculate host status
- // NOTE: This step must be after processing command/status reports
- processHostStatus(heartbeat, hostname);
-
- // Example heartbeat for alerts from the host or its components
- processAlerts(heartbeat, hostname);
+ heartbeatProcessor.addHeartbeat(heartbeat);
// Send commands if node is active
if (hostObject.getState().equals(HostState.HEALTHY)) {
@@ -305,33 +278,7 @@ public class HeartBeatHandler {
return response;
}
- /**
- * Extracts all of the {@link Alert}s from the heartbeat and fires
- * {@link AlertEvent}s for each one. If there is a problem looking up the
- * cluster, then alerts will not be processed.
- *
- * @param heartbeat
- * the heartbeat to process.
- * @param hostname
- * the host that the heartbeat is for.
- */
- protected void processAlerts(HeartBeat heartbeat, String hostname) {
-
- if (null == hostname || null == heartbeat) {
- return;
- }
- if (null != heartbeat.getAlerts()) {
- AlertEvent event = new AlertReceivedEvent(heartbeat.getAlerts());
- for (Alert alert : event.getAlerts()) {
- if (alert.getHostName() == null) {
- alert.setHostName(hostname);
- }
- }
- alertEventPublisher.publish(event);
-
- }
- }
protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException {
LOG.debug("Received recovery report: " + recoveryReport.toString());
@@ -339,480 +286,6 @@ public class HeartBeatHandler {
host.setRecoveryReport(recoveryReport);
}
- protected void processHostStatus(HeartBeat heartbeat, String hostname) throws AmbariException {
-
- Host host = clusterFsm.getHost(hostname);
- HealthStatus healthStatus = host.getHealthStatus().getHealthStatus();
-
- if (!healthStatus.equals(HostHealthStatus.HealthStatus.UNKNOWN)) {
-
- List<ComponentStatus> componentStatuses = heartbeat.getComponentStatus();
- //Host status info could be calculated only if agent returned statuses in heartbeat
- //Or, if a command is executed that can change component status
- boolean calculateHostStatus = false;
- String clusterName = null;
- if (componentStatuses.size() > 0) {
- calculateHostStatus = true;
- for (ComponentStatus componentStatus : componentStatuses) {
- clusterName = componentStatus.getClusterName();
- break;
- }
- }
-
- if (!calculateHostStatus) {
- List<CommandReport> reports = heartbeat.getReports();
- for (CommandReport report : reports) {
- if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand())) {
- continue;
- }
-
- String service = report.getServiceName();
- if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
- continue;
- }
- if (report.getStatus().equals("COMPLETED")) {
- calculateHostStatus = true;
- clusterName = report.getClusterName();
- break;
- }
- }
- }
-
- if (calculateHostStatus) {
- //Use actual component status to compute the host status
- int masterCount = 0;
- int mastersRunning = 0;
- int slaveCount = 0;
- int slavesRunning = 0;
-
- StackId stackId;
- Cluster cluster = clusterFsm.getCluster(clusterName);
- stackId = cluster.getDesiredStackVersion();
-
- MaintenanceStateHelper psh = injector.getInstance(MaintenanceStateHelper.class);
-
- List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(heartbeat.getHostname());
- for (ServiceComponentHost scHost : scHosts) {
- ComponentInfo componentInfo =
- ambariMetaInfo.getComponent(stackId.getStackName(),
- stackId.getStackVersion(), scHost.getServiceName(),
- scHost.getServiceComponentName());
-
- String status = scHost.getState().name();
-
- String category = componentInfo.getCategory();
-
- if (MaintenanceState.OFF == psh.getEffectiveState(scHost, host)) {
- if (category.equals("MASTER")) {
- ++masterCount;
- if (status.equals("STARTED")) {
- ++mastersRunning;
- }
- } else if (category.equals("SLAVE")) {
- ++slaveCount;
- if (status.equals("STARTED")) {
- ++slavesRunning;
- }
- }
- }
- }
-
- if (masterCount == mastersRunning && slaveCount == slavesRunning) {
- healthStatus = HealthStatus.HEALTHY;
- } else if (masterCount > 0 && mastersRunning < masterCount) {
- healthStatus = HealthStatus.UNHEALTHY;
- } else {
- healthStatus = HealthStatus.ALERT;
- }
-
- host.setStatus(healthStatus.name());
- host.persist();
- }
-
- //If host doesn't belong to any cluster
- if ((clusterFsm.getClustersForHost(host.getHostName())).size() == 0) {
- healthStatus = HealthStatus.HEALTHY;
- host.setStatus(healthStatus.name());
- host.persist();
- }
- }
- }
-
- protected void processCommandReports(
- HeartBeat heartbeat, String hostname, Clusters clusterFsm, long now)
- throws AmbariException {
- List<CommandReport> reports = heartbeat.getReports();
-
- // Cache HostRoleCommand entities because we will need them few times
- List<Long> taskIds = new ArrayList<Long>();
- for (CommandReport report : reports) {
- taskIds.add(report.getTaskId());
- }
- Collection<HostRoleCommand> commands = actionManager.getTasks(taskIds);
-
- Iterator<HostRoleCommand> hostRoleCommandIterator = commands.iterator();
- for (CommandReport report : reports) {
-
- Long clusterId = null;
- if (report.getClusterName() != null) {
- try {
- Cluster cluster = clusterFsm.getCluster(report.getClusterName());
- clusterId = Long.valueOf(cluster.getClusterId());
- } catch (AmbariException e) {
- }
- }
-
- LOG.debug("Received command report: " + report);
- // Fetch HostRoleCommand that corresponds to a given task ID
- HostRoleCommand hostRoleCommand = hostRoleCommandIterator.next();
- HostEntity hostEntity = hostDAO.findByName(hostname);
- if (hostEntity == null) {
- LOG.error("Received a command report and was unable to retrieve HostEntity for hostname = " + hostname);
- continue;
- }
-
- // Send event for final command reports for actions
- if (RoleCommand.valueOf(report.getRoleCommand()) == RoleCommand.ACTIONEXECUTE &&
- HostRoleStatus.valueOf(report.getStatus()).isCompletedState()) {
- ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
- clusterId, hostname, report, false);
- ambariEventPublisher.publish(event);
- }
-
- // Skip sending events for command reports for ABORTed commands
- if (hostRoleCommand.getStatus() == HostRoleStatus.ABORTED) {
- continue;
- }
- if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED &&
- report.getStatus().equals("IN_PROGRESS")) {
- hostRoleCommand.setStartTime(now);
- }
-
- // If the report indicates the keytab file was successfully transferred to a host or removed
- // from a host, record this for future reference
- if (Service.Type.KERBEROS.name().equalsIgnoreCase(report.getServiceName()) &&
- Role.KERBEROS_CLIENT.name().equalsIgnoreCase(report.getRole()) &&
- RoleCommand.CUSTOM_COMMAND.name().equalsIgnoreCase(report.getRoleCommand()) &&
- RequestExecution.Status.COMPLETED.name().equalsIgnoreCase(report.getStatus())) {
-
- String customCommand = report.getCustomCommand();
-
- boolean adding = "SET_KEYTAB".equalsIgnoreCase(customCommand);
- if (adding || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) {
- WriteKeytabsStructuredOut writeKeytabsStructuredOut;
- try {
- writeKeytabsStructuredOut = gson.fromJson(report.getStructuredOut(), WriteKeytabsStructuredOut.class);
- } catch (JsonSyntaxException ex) {
- //Json structure was incorrect do nothing, pass this data further for processing
- writeKeytabsStructuredOut = null;
- }
-
- if (writeKeytabsStructuredOut != null) {
- Map<String, String> keytabs = writeKeytabsStructuredOut.getKeytabs();
- if (keytabs != null) {
- for (Map.Entry<String, String> entry : keytabs.entrySet()) {
- String principal = entry.getKey();
- if (!kerberosPrincipalHostDAO.exists(principal, hostEntity.getHostId())) {
- if (adding) {
- kerberosPrincipalHostDAO.create(principal, hostEntity.getHostId());
- } else if ("_REMOVED_".equalsIgnoreCase(entry.getValue())) {
- kerberosPrincipalHostDAO.remove(principal, hostEntity.getHostId());
- }
- }
- }
- }
- }
- }
- }
-
- //pass custom START, STOP and RESTART
- if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand()) ||
- (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
- !("RESTART".equals(report.getCustomCommand()) ||
- "START".equals(report.getCustomCommand()) ||
- "STOP".equals(report.getCustomCommand())))) {
- continue;
- }
-
- Cluster cl = clusterFsm.getCluster(report.getClusterName());
- String service = report.getServiceName();
- if (service == null || service.isEmpty()) {
- throw new AmbariException("Invalid command report, service: " + service);
- }
- if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
- LOG.debug(report.getRole() + " is an action - skip component lookup");
- } else {
- try {
- Service svc = cl.getService(service);
- ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
- ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
- String schName = scHost.getServiceComponentName();
-
- if (report.getStatus().equals(HostRoleStatus.COMPLETED.toString())) {
-
- // Reading component version if it is present
- if (StringUtils.isNotBlank(report.getStructuredOut())) {
- ComponentVersionStructuredOut structuredOutput = null;
- try {
- structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
- } catch (JsonSyntaxException ex) {
- //Json structure for component version was incorrect
- //do nothing, pass this data further for processing
- }
-
- String newVersion = structuredOutput == null ? null : structuredOutput.version;
-
- // Pass true to always publish a version event. It is safer to recalculate the version even if we don't
- // detect a difference in the value. This is useful in case that a manual database edit is done while
- // ambari-server is stopped.
- handleComponentVersionReceived(cl, scHost, newVersion, true);
- }
-
- // Updating stack version, if needed (this is not actually for express/rolling upgrades!)
- if (scHost.getState().equals(State.UPGRADING)) {
- scHost.setStackVersion(scHost.getDesiredStackVersion());
- } else if ((report.getRoleCommand().equals(RoleCommand.START.toString()) ||
- (report.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND.toString()) &&
- ("START".equals(report.getCustomCommand()) ||
- "RESTART".equals(report.getCustomCommand()))))
- && null != report.getConfigurationTags()
- && !report.getConfigurationTags().isEmpty()) {
- LOG.info("Updating applied config on service " + scHost.getServiceName() +
- ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
- scHost.updateActualConfigs(report.getConfigurationTags());
- scHost.setRestartRequired(false);
- }
- // Necessary for resetting clients stale configs after starting service
- if ((RoleCommand.INSTALL.toString().equals(report.getRoleCommand()) ||
- (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
- "INSTALL".equals(report.getCustomCommand()))) && svcComp.isClientComponent()){
- scHost.updateActualConfigs(report.getConfigurationTags());
- scHost.setRestartRequired(false);
- }
- if (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
- !("START".equals(report.getCustomCommand()) ||
- "STOP".equals(report.getCustomCommand()))) {
- //do not affect states for custom commands except START and STOP
- //lets status commands to be responsible for this
- continue;
- }
-
- if (RoleCommand.START.toString().equals(report.getRoleCommand()) ||
- (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
- "START".equals(report.getCustomCommand()))) {
- scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
- hostname, now));
- scHost.setRestartRequired(false);
- } else if (RoleCommand.STOP.toString().equals(report.getRoleCommand()) ||
- (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
- "STOP".equals(report.getCustomCommand()))) {
- scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
- hostname, now));
- } else {
- scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
- hostname, now));
- }
- } else if (report.getStatus().equals("FAILED")) {
-
- if (StringUtils.isNotBlank(report.getStructuredOut())) {
- try {
- ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
-
- if (null != structuredOutput.upgradeDirection && structuredOutput.upgradeDirection.isUpgrade()) {
- scHost.setUpgradeState(UpgradeState.FAILED);
- }
- } catch (JsonSyntaxException ex) {
- LOG.warn("Structured output was found, but not parseable: {}", report.getStructuredOut());
- }
- }
-
- LOG.warn("Operation failed - may be retried. Service component host: "
- + schName + ", host: " + hostname + " Action id" + report.getActionId());
- if (actionManager.isInProgressCommand(report)) {
- scHost.handleEvent(new ServiceComponentHostOpFailedEvent
- (schName, hostname, now));
- } else {
- LOG.info("Received report for a command that is no longer active. " + report);
- }
- } else if (report.getStatus().equals("IN_PROGRESS")) {
- scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
- hostname, now));
- }
- } catch (ServiceComponentNotFoundException scnex) {
- LOG.warn("Service component not found ", scnex);
- } catch (InvalidStateTransitionException ex) {
- if (LOG.isDebugEnabled()) {
- LOG.warn("State machine exception.", ex);
- } else {
- LOG.warn("State machine exception. " + ex.getMessage());
- }
- }
- }
- }
-
- //Update state machines from reports
- actionManager.processTaskResponse(hostname, reports, commands);
- }
-
- protected void processStatusReports(HeartBeat heartbeat,
- String hostname,
- Clusters clusterFsm)
- throws AmbariException {
- Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
- for (Cluster cl : clusters) {
- for (ComponentStatus status : heartbeat.componentStatus) {
- if (status.getClusterName().equals(cl.getClusterName())) {
- try {
- Service svc = cl.getService(status.getServiceName());
-
- String componentName = status.getComponentName();
- if (svc.getServiceComponents().containsKey(componentName)) {
- ServiceComponent svcComp = svc.getServiceComponent(
- componentName);
- ServiceComponentHost scHost = svcComp.getServiceComponentHost(
- hostname);
- State prevState = scHost.getState();
- State liveState = State.valueOf(State.class, status.getStatus());
- if (prevState.equals(State.INSTALLED)
- || prevState.equals(State.STARTED)
- || prevState.equals(State.STARTING)
- || prevState.equals(State.STOPPING)
- || prevState.equals(State.UNKNOWN)) {
- scHost.setState(liveState); //TODO direct status set breaks state machine sometimes !!!
- if (!prevState.equals(liveState)) {
- LOG.info("State of service component " + componentName
- + " of service " + status.getServiceName()
- + " of cluster " + status.getClusterName()
- + " has changed from " + prevState + " to " + liveState
- + " at host " + hostname);
- }
- }
-
- SecurityState prevSecurityState = scHost.getSecurityState();
- SecurityState currentSecurityState = SecurityState.valueOf(status.getSecurityState());
- if((prevSecurityState != currentSecurityState)) {
- if(prevSecurityState.isEndpoint()) {
- scHost.setSecurityState(currentSecurityState);
- LOG.info(String.format("Security of service component %s of service %s of cluster %s " +
- "has changed from %s to %s on host %s",
- componentName, status.getServiceName(), status.getClusterName(), prevSecurityState,
- currentSecurityState, hostname));
- }
- else {
- LOG.debug(String.format("Security of service component %s of service %s of cluster %s " +
- "has changed from %s to %s on host %s but will be ignored since %s is a " +
- "transitional state",
- componentName, status.getServiceName(), status.getClusterName(),
- prevSecurityState, currentSecurityState, hostname, prevSecurityState));
- }
- }
-
- if (null != status.getStackVersion() && !status.getStackVersion().isEmpty()) {
- scHost.setStackVersion(gson.fromJson(status.getStackVersion(), StackId.class));
- }
-
- if (null != status.getConfigTags()) {
- scHost.updateActualConfigs(status.getConfigTags());
- }
-
- Map<String, Object> extra = status.getExtra();
- if (null != extra && !extra.isEmpty()) {
- try {
- if (extra.containsKey("processes")) {
- @SuppressWarnings("unchecked")
- List<Map<String, String>> list = (List<Map<String, String>>) extra.get("processes");
- scHost.setProcesses(list);
- }
- if (extra.containsKey("version")) {
- String version = extra.get("version").toString();
-
- handleComponentVersionReceived(cl, scHost, version, false);
- }
-
- } catch (Exception e) {
- LOG.error("Could not access extra JSON for " +
- scHost.getServiceComponentName() + " from " +
- scHost.getHostName() + ": " + status.getExtra() +
- " (" + e.getMessage() + ")");
- }
- }
-
- this.heartbeatMonitor.getAgentRequests()
- .setExecutionDetailsRequest(hostname, componentName, status.getSendExecCmdDet());
- } else {
- // TODO: What should be done otherwise?
- }
- } catch (ServiceNotFoundException e) {
- LOG.warn("Received a live status update for a non-initialized"
- + " service"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName());
- // FIXME ignore invalid live update and continue for now?
- continue;
- } catch (ServiceComponentNotFoundException e) {
- LOG.warn("Received a live status update for a non-initialized"
- + " servicecomponent"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName()
- + ", componentName=" + status.getComponentName());
- // FIXME ignore invalid live update and continue for now?
- continue;
- } catch (ServiceComponentHostNotFoundException e) {
- LOG.warn("Received a live status update for a non-initialized"
- + " service"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName()
- + ", componentName=" + status.getComponentName()
- + ", hostname=" + hostname);
- // FIXME ignore invalid live update and continue for now?
- continue;
- } catch (RuntimeException e) {
- LOG.warn("Received a live status with invalid payload"
- + " service"
- + ", clusterName=" + status.getClusterName()
- + ", serviceName=" + status.getServiceName()
- + ", componentName=" + status.getComponentName()
- + ", hostname=" + hostname
- + ", error=" + e.getMessage());
- continue;
- }
- }
- }
- }
- }
-
- /**
- * Updates the version of the given service component, sets the upgrade state (if needed)
- * and publishes a version event through the version event publisher.
- *
- * @param cluster the cluster
- * @param scHost service component host
- * @param newVersion new version of service component
- * @param alwaysPublish if true, always publish a version event; if false,
- * only publish if the component version was updated
- */
- private void handleComponentVersionReceived(Cluster cluster, ServiceComponentHost scHost,
- String newVersion, boolean alwaysPublish) {
-
- boolean updated = false;
-
- if (StringUtils.isNotBlank(newVersion)) {
- final String previousVersion = scHost.getVersion();
- if (!StringUtils.equals(previousVersion, newVersion)) {
- scHost.setVersion(newVersion);
- scHost.setStackVersion(cluster.getDesiredStackVersion());
- if (previousVersion != null && !previousVersion.equalsIgnoreCase(State.UNKNOWN.toString())) {
- scHost.setUpgradeState(UpgradeState.COMPLETE);
- }
- updated = true;
- }
- }
-
- if (updated || alwaysPublish) {
- HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, scHost);
- versionEventPublisher.publish(event);
- }
- }
/**
* Adds commands from action queue to a heartbeat response.
@@ -1229,35 +702,4 @@ public class HeartBeatHandler {
}
}
- /**
- * This class is used for mapping json of structured output for component START action.
- */
- private static class ComponentVersionStructuredOut {
- @SerializedName("version")
- private String version;
-
- @SerializedName("upgrade_type")
- private UpgradeType upgradeType = null;
-
- @SerializedName("direction")
- private Direction upgradeDirection = null;
-
- }
-
- /**
- * This class is used for mapping json of structured output for keytab distribution actions.
- */
- private static class WriteKeytabsStructuredOut {
- @SerializedName("keytabs")
- private Map<String,String> keytabs;
-
- public Map<String, String> getKeytabs() {
- return keytabs;
- }
-
- public void setKeytabs(Map<String, String> keytabs) {
- this.keytabs = keytabs;
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index efc717d..378e123 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -140,6 +140,10 @@ public class HeartbeatMonitor implements Runnable {
List<Host> allHosts = clusters.getHosts();
long now = System.currentTimeMillis();
for (Host hostObj : allHosts) {
+ if (hostObj.getState() == HostState.HEARTBEAT_LOST) {
+ //do not check if host already known be lost
+ continue;
+ }
String host = hostObj.getHostName();
HostState hostState = hostObj.getState();
String hostname = hostObj.getHostName();
@@ -212,6 +216,8 @@ public class HeartbeatMonitor implements Runnable {
switch (sch.getState()) {
case INIT:
case INSTALLING:
+ case STARTING:
+ case STOPPING:
//don't send commands until component is installed at least
continue;
default:
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
new file mode 100644
index 0000000..2188a77
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.annotations.SerializedName;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.ServiceComponentHostNotFoundException;
+import org.apache.ambari.server.ServiceComponentNotFoundException;
+import org.apache.ambari.server.ServiceNotFoundException;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.controller.MaintenanceStateHelper;
+import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
+import org.apache.ambari.server.events.AlertEvent;
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.HostComponentVersionEvent;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.VersionEventPublisher;
+import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.ComponentInfo;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.HostHealthStatus;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.SecurityState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.state.UpgradeState;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
+import org.apache.ambari.server.state.scheduler.RequestExecution;
+import org.apache.ambari.server.state.stack.upgrade.Direction;
+import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpSucceededEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartedEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStoppedEvent;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HeartbeatProcessor class is used for bulk processing data retrieved from agents in background
+ *
+ */
+public class HeartbeatProcessor extends AbstractService{
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatProcessor.class);
+
+ private ScheduledExecutorService executor;
+
+ private ConcurrentLinkedQueue<HeartBeat> heartBeatsQueue = new ConcurrentLinkedQueue<>();
+
+ private volatile boolean shouldRun = true;
+
+ //TODO rewrite to correlate with heartbeat frequency, hardcoded in agent as of now
+ private long delay = 5000;
+ private long period = 1000;
+
+ private int poolSize = 1;
+
+ private Clusters clusterFsm;
+ private HeartbeatMonitor heartbeatMonitor;
+ private Injector injector;
+ private ActionManager actionManager;
+
+ /**
+ * Publishes {@link AlertEvent} instances.
+ */
+ @Inject
+ AlertEventPublisher alertEventPublisher;
+
+ @Inject
+ AmbariEventPublisher ambariEventPublisher;
+
+ @Inject
+ VersionEventPublisher versionEventPublisher;
+
+ @Inject
+ ActionMetadata actionMetadata;
+
+ @Inject
+ MaintenanceStateHelper maintenanceStateHelper;
+
+ @Inject
+ AmbariMetaInfo ambariMetaInfo;
+
+ @Inject
+ KerberosPrincipalHostDAO kerberosPrincipalHostDAO;
+
+ @Inject
+ Gson gson;
+
+ @Inject
+ public HeartbeatProcessor(Clusters clusterFsm, ActionManager am, HeartbeatMonitor heartbeatMonitor,
+ Injector injector) {
+ injector.injectMembers(this);
+
+ this.injector = injector;
+ this.heartbeatMonitor = heartbeatMonitor;
+ this.clusterFsm = clusterFsm;
+ actionManager = am;
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("ambari-heartbeat-processor-%d").build();
+ executor = Executors.newScheduledThreadPool(poolSize, threadFactory);
+ }
+
+ @Override
+ protected void doStart() {
+ LOG.info("**** Starting heartbeats processing threads ****");
+ for (int i=0; i< poolSize; i++) {
+ executor.scheduleAtFixedRate(new HeartbeatProcessingTask(), delay, period, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ LOG.info("**** Stopping heartbeats processing threads ****");
+ shouldRun = false;
+ executor.shutdown();
+ }
+
+ public void addHeartbeat(HeartBeat heartBeat) {
+ heartBeatsQueue.add(heartBeat);
+ }
+
+ private HeartBeat pollHeartbeat() {
+ return heartBeatsQueue.poll();
+ }
+
+ /**
+ * Processing task to be scheduled for execution
+ */
+ private class HeartbeatProcessingTask implements Runnable {
+
+ @Override
+ public void run() {
+ while (shouldRun) {
+ try {
+ HeartBeat heartbeat = pollHeartbeat();
+ if (heartbeat == null) {
+ break;
+ }
+ processHeartbeat(heartbeat);
+ } catch (Exception e) {
+ LOG.error("Exception received while processing heartbeat", e);
+ } catch (Throwable throwable) {
+ //catch everything to prevent task suppression
+ LOG.error("ERROR: ", throwable);
+ }
+
+
+ }
+ }
+ }
+
+ /**
+ * Incapsulates logic for processing data from agent heartbeat
+ * @param heartbeat Agent heartbeat object
+ * @throws AmbariException
+ */
+ public void processHeartbeat(HeartBeat heartbeat) throws AmbariException {
+ long now = System.currentTimeMillis();
+
+ processAlerts(heartbeat);
+
+ processCommandReports(heartbeat, now);
+ processStatusReports(heartbeat);
+ //host status calculation are based on task and status reports, should be performed last
+ processHostStatus(heartbeat);
+ }
+
+
+
+ /**
+ * Extracts all of the {@link Alert}s from the heartbeat and fires
+ * {@link AlertEvent}s for each one. If there is a problem looking up the
+ * cluster, then alerts will not be processed.
+ *
+ * @param heartbeat
+ * the heartbeat to process.
+ */
+ protected void processAlerts(HeartBeat heartbeat) {
+ if (heartbeat == null) {
+ return;
+ }
+
+ String hostname = heartbeat.getHostname();
+
+ if (null != heartbeat.getAlerts()) {
+ AlertEvent event = new AlertReceivedEvent(heartbeat.getAlerts());
+ for (Alert alert : event.getAlerts()) {
+ if (alert.getHostName() == null) {
+ alert.setHostName(hostname);
+ }
+ }
+ alertEventPublisher.publish(event);
+
+ }
+ }
+
+ /**
+ * Update host status basing on components statuses
+ * @param heartbeat heartbeat to process
+ * @throws AmbariException
+ */
+ protected void processHostStatus(HeartBeat heartbeat) throws AmbariException {
+
+ String hostname = heartbeat.getHostname();
+ Host host = clusterFsm.getHost(hostname);
+ HostHealthStatus.HealthStatus healthStatus = host.getHealthStatus().getHealthStatus();
+
+ if (!healthStatus.equals(HostHealthStatus.HealthStatus.UNKNOWN)) {
+
+ List<ComponentStatus> componentStatuses = heartbeat.getComponentStatus();
+ //Host status info could be calculated only if agent returned statuses in heartbeat
+ //Or, if a command is executed that can change component status
+ boolean calculateHostStatus = false;
+ String clusterName = null;
+ if (componentStatuses.size() > 0) {
+ calculateHostStatus = true;
+ for (ComponentStatus componentStatus : componentStatuses) {
+ clusterName = componentStatus.getClusterName();
+ break;
+ }
+ }
+
+ if (!calculateHostStatus) {
+ List<CommandReport> reports = heartbeat.getReports();
+ for (CommandReport report : reports) {
+ if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand())) {
+ continue;
+ }
+
+ String service = report.getServiceName();
+ if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
+ continue;
+ }
+ if (report.getStatus().equals("COMPLETED")) {
+ calculateHostStatus = true;
+ clusterName = report.getClusterName();
+ break;
+ }
+ }
+ }
+
+ if (calculateHostStatus) {
+ //Use actual component status to compute the host status
+ int masterCount = 0;
+ int mastersRunning = 0;
+ int slaveCount = 0;
+ int slavesRunning = 0;
+
+ StackId stackId;
+ Cluster cluster = clusterFsm.getCluster(clusterName);
+ stackId = cluster.getDesiredStackVersion();
+
+
+ List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(heartbeat.getHostname());
+ for (ServiceComponentHost scHost : scHosts) {
+ ComponentInfo componentInfo =
+ ambariMetaInfo.getComponent(stackId.getStackName(),
+ stackId.getStackVersion(), scHost.getServiceName(),
+ scHost.getServiceComponentName());
+
+ String status = scHost.getState().name();
+
+ String category = componentInfo.getCategory();
+
+ if (MaintenanceState.OFF == maintenanceStateHelper.getEffectiveState(scHost, host)) {
+ if (category.equals("MASTER")) {
+ ++masterCount;
+ if (status.equals("STARTED")) {
+ ++mastersRunning;
+ }
+ } else if (category.equals("SLAVE")) {
+ ++slaveCount;
+ if (status.equals("STARTED")) {
+ ++slavesRunning;
+ }
+ }
+ }
+ }
+
+ if (masterCount == mastersRunning && slaveCount == slavesRunning) {
+ healthStatus = HostHealthStatus.HealthStatus.HEALTHY;
+ } else if (masterCount > 0 && mastersRunning < masterCount) {
+ healthStatus = HostHealthStatus.HealthStatus.UNHEALTHY;
+ } else {
+ healthStatus = HostHealthStatus.HealthStatus.ALERT;
+ }
+
+ host.setStatus(healthStatus.name());
+ host.persist();
+ }
+
+ //If host doesn't belong to any cluster
+ if ((clusterFsm.getClustersForHost(host.getHostName())).size() == 0) {
+ healthStatus = HostHealthStatus.HealthStatus.HEALTHY;
+ host.setStatus(healthStatus.name());
+ host.persist();
+ }
+ }
+ }
+
+ /**
+ * Process reports of tasks executed on agents
+ * @param heartbeat heartbeat to process
+ * @param now cached current time
+ * @throws AmbariException
+ */
+ protected void processCommandReports(
+ HeartBeat heartbeat, long now)
+ throws AmbariException {
+ String hostname = heartbeat.getHostname();
+ List<CommandReport> reports = heartbeat.getReports();
+
+ // Cache HostRoleCommand entities because we will need them few times
+ List<Long> taskIds = new ArrayList<Long>();
+ for (CommandReport report : reports) {
+ taskIds.add(report.getTaskId());
+ }
+ Collection<HostRoleCommand> commands = actionManager.getTasks(taskIds);
+
+ Iterator<HostRoleCommand> hostRoleCommandIterator = commands.iterator();
+ for (CommandReport report : reports) {
+
+ Long clusterId = null;
+ if (report.getClusterName() != null) {
+ try {
+ Cluster cluster = clusterFsm.getCluster(report.getClusterName());
+ clusterId = cluster.getClusterId();
+ } catch (AmbariException e) {
+ }
+ }
+
+ LOG.debug("Received command report: " + report);
+ // Fetch HostRoleCommand that corresponds to a given task ID
+ HostRoleCommand hostRoleCommand = hostRoleCommandIterator.next();
+ Host host = clusterFsm.getHost(hostname);
+// HostEntity hostEntity = hostDAO.findByName(hostname); //don't touch database
+ if (host == null) {
+ LOG.error("Received a command report and was unable to retrieve Host for hostname = " + hostname);
+ continue;
+ }
+
+ // Send event for final command reports for actions
+ if (RoleCommand.valueOf(report.getRoleCommand()) == RoleCommand.ACTIONEXECUTE &&
+ HostRoleStatus.valueOf(report.getStatus()).isCompletedState()) {
+ ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
+ clusterId, hostname, report, false);
+ ambariEventPublisher.publish(event);
+ }
+
+ // Skip sending events for command reports for ABORTed commands
+ if (hostRoleCommand.getStatus() == HostRoleStatus.ABORTED) {
+ continue;
+ }
+ if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED &&
+ report.getStatus().equals("IN_PROGRESS")) {
+ hostRoleCommand.setStartTime(now);
+ }
+
+ // If the report indicates the keytab file was successfully transferred to a host or removed
+ // from a host, record this for future reference
+ if (Service.Type.KERBEROS.name().equalsIgnoreCase(report.getServiceName()) &&
+ Role.KERBEROS_CLIENT.name().equalsIgnoreCase(report.getRole()) &&
+ RoleCommand.CUSTOM_COMMAND.name().equalsIgnoreCase(report.getRoleCommand()) &&
+ RequestExecution.Status.COMPLETED.name().equalsIgnoreCase(report.getStatus())) {
+
+ String customCommand = report.getCustomCommand();
+
+ boolean adding = "SET_KEYTAB".equalsIgnoreCase(customCommand);
+ if (adding || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) {
+ WriteKeytabsStructuredOut writeKeytabsStructuredOut;
+ try {
+ writeKeytabsStructuredOut = gson.fromJson(report.getStructuredOut(), WriteKeytabsStructuredOut.class);
+ } catch (JsonSyntaxException ex) {
+ //Json structure was incorrect do nothing, pass this data further for processing
+ writeKeytabsStructuredOut = null;
+ }
+
+ if (writeKeytabsStructuredOut != null) {
+ Map<String, String> keytabs = writeKeytabsStructuredOut.getKeytabs();
+ if (keytabs != null) {
+ for (Map.Entry<String, String> entry : keytabs.entrySet()) {
+ String principal = entry.getKey();
+ if (!kerberosPrincipalHostDAO.exists(principal, host.getHostId())) {
+ if (adding) {
+ kerberosPrincipalHostDAO.create(principal, host.getHostId());
+ } else if ("_REMOVED_".equalsIgnoreCase(entry.getValue())) {
+ kerberosPrincipalHostDAO.remove(principal, host.getHostId());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ //pass custom START, STOP and RESTART
+ if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand()) ||
+ (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+ !("RESTART".equals(report.getCustomCommand()) ||
+ "START".equals(report.getCustomCommand()) ||
+ "STOP".equals(report.getCustomCommand())))) {
+ continue;
+ }
+
+ Cluster cl = clusterFsm.getCluster(report.getClusterName());
+ String service = report.getServiceName();
+ if (service == null || service.isEmpty()) {
+ throw new AmbariException("Invalid command report, service: " + service);
+ }
+ if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
+ LOG.debug(report.getRole() + " is an action - skip component lookup");
+ } else {
+ try {
+ Service svc = cl.getService(service);
+ ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
+ ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
+ String schName = scHost.getServiceComponentName();
+
+ if (report.getStatus().equals(HostRoleStatus.COMPLETED.toString())) {
+
+ // Reading component version if it is present
+ if (StringUtils.isNotBlank(report.getStructuredOut())) {
+ ComponentVersionStructuredOut structuredOutput = null;
+ try {
+ structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
+ } catch (JsonSyntaxException ex) {
+ //Json structure for component version was incorrect
+ //do nothing, pass this data further for processing
+ }
+
+ String newVersion = structuredOutput == null ? null : structuredOutput.version;
+
+ // Pass true to always publish a version event. It is safer to recalculate the version even if we don't
+ // detect a difference in the value. This is useful in case that a manual database edit is done while
+ // ambari-server is stopped.
+ handleComponentVersionReceived(cl, scHost, newVersion, true);
+ }
+
+ // Updating stack version, if needed (this is not actually for express/rolling upgrades!)
+ if (scHost.getState().equals(org.apache.ambari.server.state.State.UPGRADING)) {
+ scHost.setStackVersion(scHost.getDesiredStackVersion());
+ } else if ((report.getRoleCommand().equals(RoleCommand.START.toString()) ||
+ (report.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND.toString()) &&
+ ("START".equals(report.getCustomCommand()) ||
+ "RESTART".equals(report.getCustomCommand()))))
+ && null != report.getConfigurationTags()
+ && !report.getConfigurationTags().isEmpty()) {
+ LOG.info("Updating applied config on service " + scHost.getServiceName() +
+ ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
+ scHost.updateActualConfigs(report.getConfigurationTags());
+ scHost.setRestartRequired(false);
+ }
+ // Necessary for resetting clients stale configs after starting service
+ if ((RoleCommand.INSTALL.toString().equals(report.getRoleCommand()) ||
+ (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+ "INSTALL".equals(report.getCustomCommand()))) && svcComp.isClientComponent()){
+ scHost.updateActualConfigs(report.getConfigurationTags());
+ scHost.setRestartRequired(false);
+ }
+ if (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+ !("START".equals(report.getCustomCommand()) ||
+ "STOP".equals(report.getCustomCommand()))) {
+ //do not affect states for custom commands except START and STOP
+ //lets status commands to be responsible for this
+ continue;
+ }
+
+ if (RoleCommand.START.toString().equals(report.getRoleCommand()) ||
+ (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+ "START".equals(report.getCustomCommand()))) {
+ scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
+ hostname, now));
+ scHost.setRestartRequired(false);
+ } else if (RoleCommand.STOP.toString().equals(report.getRoleCommand()) ||
+ (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+ "STOP".equals(report.getCustomCommand()))) {
+ scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
+ hostname, now));
+ } else {
+ scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
+ hostname, now));
+ }
+ } else if (report.getStatus().equals("FAILED")) {
+
+ if (StringUtils.isNotBlank(report.getStructuredOut())) {
+ try {
+ ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
+
+ if (null != structuredOutput.upgradeDirection && structuredOutput.upgradeDirection.isUpgrade()) {
+ scHost.setUpgradeState(UpgradeState.FAILED);
+ }
+ } catch (JsonSyntaxException ex) {
+ LOG.warn("Structured output was found, but not parseable: {}", report.getStructuredOut());
+ }
+ }
+
+ LOG.warn("Operation failed - may be retried. Service component host: "
+ + schName + ", host: " + hostname + " Action id" + report.getActionId());
+ if (actionManager.isInProgressCommand(report)) {
+ scHost.handleEvent(new ServiceComponentHostOpFailedEvent
+ (schName, hostname, now));
+ } else {
+ LOG.info("Received report for a command that is no longer active. " + report);
+ }
+ } else if (report.getStatus().equals("IN_PROGRESS")) {
+ scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
+ hostname, now));
+ }
+ } catch (ServiceComponentNotFoundException scnex) {
+ LOG.warn("Service component not found ", scnex);
+ } catch (InvalidStateTransitionException ex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.warn("State machine exception.", ex);
+ } else {
+ LOG.warn("State machine exception. " + ex.getMessage());
+ }
+ }
+ }
+ }
+
+ //Update state machines from reports
+ actionManager.processTaskResponse(hostname, reports, commands);
+ }
+
+ /**
+ * Process reports of status commands
+ * @param heartbeat heartbeat to process
+ * @throws AmbariException
+ */
+ protected void processStatusReports(HeartBeat heartbeat) throws AmbariException {
+ String hostname = heartbeat.getHostname();
+ Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
+ for (Cluster cl : clusters) {
+ for (ComponentStatus status : heartbeat.componentStatus) {
+ if (status.getClusterName().equals(cl.getClusterName())) {
+ try {
+ Service svc = cl.getService(status.getServiceName());
+
+ String componentName = status.getComponentName();
+ if (svc.getServiceComponents().containsKey(componentName)) {
+ ServiceComponent svcComp = svc.getServiceComponent(
+ componentName);
+ ServiceComponentHost scHost = svcComp.getServiceComponentHost(
+ hostname);
+ org.apache.ambari.server.state.State prevState = scHost.getState();
+ org.apache.ambari.server.state.State liveState =
+ org.apache.ambari.server.state.State.valueOf(org.apache.ambari.server.state.State.class,
+ status.getStatus());
+ if (prevState.equals(org.apache.ambari.server.state.State.INSTALLED)
+ || prevState.equals(org.apache.ambari.server.state.State.STARTED)
+ || prevState.equals(org.apache.ambari.server.state.State.STARTING)
+ || prevState.equals(org.apache.ambari.server.state.State.STOPPING)
+ || prevState.equals(org.apache.ambari.server.state.State.UNKNOWN)) {
+ scHost.setState(liveState); //TODO direct status set breaks state machine sometimes !!!
+ if (!prevState.equals(liveState)) {
+ LOG.info("State of service component " + componentName
+ + " of service " + status.getServiceName()
+ + " of cluster " + status.getClusterName()
+ + " has changed from " + prevState + " to " + liveState
+ + " at host " + hostname);
+ }
+ }
+
+ SecurityState prevSecurityState = scHost.getSecurityState();
+ SecurityState currentSecurityState = SecurityState.valueOf(status.getSecurityState());
+ if((prevSecurityState != currentSecurityState)) {
+ if(prevSecurityState.isEndpoint()) {
+ scHost.setSecurityState(currentSecurityState);
+ LOG.info(String.format("Security of service component %s of service %s of cluster %s " +
+ "has changed from %s to %s on host %s",
+ componentName, status.getServiceName(), status.getClusterName(), prevSecurityState,
+ currentSecurityState, hostname));
+ }
+ else {
+ LOG.debug(String.format("Security of service component %s of service %s of cluster %s " +
+ "has changed from %s to %s on host %s but will be ignored since %s is a " +
+ "transitional state",
+ componentName, status.getServiceName(), status.getClusterName(),
+ prevSecurityState, currentSecurityState, hostname, prevSecurityState));
+ }
+ }
+
+ if (null != status.getStackVersion() && !status.getStackVersion().isEmpty()) {
+ scHost.setStackVersion(gson.fromJson(status.getStackVersion(), StackId.class));
+ }
+
+ if (null != status.getConfigTags()) {
+ scHost.updateActualConfigs(status.getConfigTags());
+ }
+
+ Map<String, Object> extra = status.getExtra();
+ if (null != extra && !extra.isEmpty()) {
+ try {
+ if (extra.containsKey("processes")) {
+ @SuppressWarnings("unchecked")
+ List<Map<String, String>> list = (List<Map<String, String>>) extra.get("processes");
+ scHost.setProcesses(list);
+ }
+ if (extra.containsKey("version")) {
+ String version = extra.get("version").toString();
+
+ handleComponentVersionReceived(cl, scHost, version, false);
+ }
+
+ } catch (Exception e) {
+ LOG.error("Could not access extra JSON for " +
+ scHost.getServiceComponentName() + " from " +
+ scHost.getHostName() + ": " + status.getExtra() +
+ " (" + e.getMessage() + ")");
+ }
+ }
+
+ this.heartbeatMonitor.getAgentRequests()
+ .setExecutionDetailsRequest(hostname, componentName, status.getSendExecCmdDet());
+ } else {
+ // TODO: What should be done otherwise?
+ }
+ } catch (ServiceNotFoundException e) {
+ LOG.warn("Received a live status update for a non-initialized"
+ + " service"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName());
+ // FIXME ignore invalid live update and continue for now?
+ continue;
+ } catch (ServiceComponentNotFoundException e) {
+ LOG.warn("Received a live status update for a non-initialized"
+ + " servicecomponent"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName()
+ + ", componentName=" + status.getComponentName());
+ // FIXME ignore invalid live update and continue for now?
+ continue;
+ } catch (ServiceComponentHostNotFoundException e) {
+ LOG.warn("Received a live status update for a non-initialized"
+ + " service"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName()
+ + ", componentName=" + status.getComponentName()
+ + ", hostname=" + hostname);
+ // FIXME ignore invalid live update and continue for now?
+ continue;
+ } catch (RuntimeException e) {
+ LOG.warn("Received a live status with invalid payload"
+ + " service"
+ + ", clusterName=" + status.getClusterName()
+ + ", serviceName=" + status.getServiceName()
+ + ", componentName=" + status.getComponentName()
+ + ", hostname=" + hostname
+ + ", error=" + e.getMessage());
+ continue;
+ }
+ }
+ }
+ }
+ }
+
+
+
+ /**
+ * Updates the version of the given service component, sets the upgrade state (if needed)
+ * and publishes a version event through the version event publisher.
+ *
+ * @param cluster the cluster
+ * @param scHost service component host
+ * @param newVersion new version of service component
+ * @param alwaysPublish if true, always publish a version event; if false,
+ * only publish if the component version was updated
+ */
+ private void handleComponentVersionReceived(Cluster cluster, ServiceComponentHost scHost,
+ String newVersion, boolean alwaysPublish) {
+
+ boolean updated = false;
+
+ if (StringUtils.isNotBlank(newVersion)) {
+ final String previousVersion = scHost.getVersion();
+ if (!StringUtils.equals(previousVersion, newVersion)) {
+ scHost.setVersion(newVersion);
+ scHost.setStackVersion(cluster.getDesiredStackVersion());
+ if (previousVersion != null && !previousVersion.equalsIgnoreCase(
+ org.apache.ambari.server.state.State.UNKNOWN.toString())) {
+ scHost.setUpgradeState(UpgradeState.COMPLETE);
+ }
+ updated = true;
+ }
+ }
+
+ if (updated || alwaysPublish) {
+ HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, scHost);
+ versionEventPublisher.publish(event);
+ }
+ }
+
+ /**
+ * This class is used for mapping json of structured output for keytab distribution actions.
+ */
+ private static class WriteKeytabsStructuredOut {
+ @SerializedName("keytabs")
+ private Map<String,String> keytabs;
+
+ public Map<String, String> getKeytabs() {
+ return keytabs;
+ }
+
+ public void setKeytabs(Map<String, String> keytabs) {
+ this.keytabs = keytabs;
+ }
+ }
+
+
+ /**
+ * This class is used for mapping json of structured output for component START action.
+ */
+ private static class ComponentVersionStructuredOut {
+ @SerializedName("version")
+ private String version;
+
+ @SerializedName("upgrade_type")
+ private UpgradeType upgradeType = null;
+
+ @SerializedName("direction")
+ private Direction upgradeDirection = null;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
index 040876a..e28f9ef 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
@@ -131,8 +131,8 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
}
/**
- * Retrieve all of the host versions for the given cluster name, host name, and state.
- *
+ * Retrieve all of the host versions for the given cluster name, host name, and state. <br/>
+ * Consider using faster method: {@link HostVersionDAO#findByClusterHostAndState(long, long, org.apache.ambari.server.state.RepositoryVersionState)}
* @param clusterName Cluster name
* @param hostName FQDN of host
* @param state repository version state
@@ -150,8 +150,29 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
}
/**
+ * Faster version of {@link HostVersionDAO#findByClusterHostAndState(java.lang.String, java.lang.String, org.apache.ambari.server.state.RepositoryVersionState)}
+ *
+ * @param clusterId Cluster ID
+ * @param hostId Host ID
+ * @param state repository version state
+ * @return Return all of the host versions that match the criteria.
+ */
+ @RequiresSession
+ public List<HostVersionEntity> findByClusterHostAndState(long clusterId, long hostId, RepositoryVersionState state) {
+ TypedQuery<HostVersionEntity> query =
+ entityManagerProvider.get().createNamedQuery("hostVersionByClusterHostIdAndState", HostVersionEntity.class);
+
+ query.setParameter("clusterId", clusterId);
+ query.setParameter("hostId", hostId);
+ query.setParameter("state", state);
+
+ return daoUtils.selectList(query);
+ }
+
+ /**
* Retrieve the single host version whose state is {@link org.apache.ambari.server.state.RepositoryVersionState#CURRENT}, of which there should be exactly one at all times
* for the given host.
+ * Consider using faster method {@link HostVersionDAO#findByHostAndStateCurrent(long, long)}
*
* @param clusterName Cluster name
* @param hostName Host name
@@ -175,8 +196,36 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
}
/**
+ * Retrieve the single host version whose state is {@link org.apache.ambari.server.state.RepositoryVersionState#CURRENT}, of which there should be exactly one at all times
+ * for the given host.
+ * Faster version of {@link HostVersionDAO#findByHostAndStateCurrent(java.lang.String, java.lang.String)}
+ * @param clusterId Cluster ID
+ * @param hostId host ID
+ * @return Returns the single host version for this host whose state is {@link org.apache.ambari.server.state.RepositoryVersionState#CURRENT}, or {@code null} otherwise.
+ */
+ @RequiresSession
+ public HostVersionEntity findByHostAndStateCurrent(long clusterId, long hostId) {
+ try {
+ List<?> results = findByClusterHostAndState(clusterId, hostId, RepositoryVersionState.CURRENT);
+ if (results.isEmpty()) {
+ return null;
+ } else {
+ if (results.size() == 1) {
+ return (HostVersionEntity) results.get(0);
+ }
+ }
+ throw new NonUniqueResultException();
+ } catch (NoResultException ignored) {
+ return null;
+ }
+ }
+
+ /**
* Retrieve the single host version for the given cluster, stack name, stack
- * version, and host name.
+ * version, and host name. <br/>
+ * This query is slow and not suitable for frequent use. <br/>
+ * Please, use {@link HostVersionDAO#findByClusterStackVersionAndHost(long, org.apache.ambari.server.state.StackId, java.lang.String, long)} <br/>
+ * It is ~50 times faster
*
* @param clusterName
* Cluster name
@@ -203,6 +252,29 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
return daoUtils.selectSingle(query);
}
+ /**
+ * Optimized version of {@link HostVersionDAO#findByClusterStackVersionAndHost(java.lang.String, org.apache.ambari.server.state.StackId, java.lang.String, java.lang.String)}
+ * @param clusterId Id of cluster
+ * @param stackId Stack ID (e.g., HDP-2.2)
+ * @param version Stack version (e.g., 2.2.0.1-995)
+ * @param hostId Host Id
+ * @return Returns the single host version that matches the criteria.
+ */
+ @RequiresSession
+ public HostVersionEntity findByClusterStackVersionAndHost(long clusterId, StackId stackId, String version,
+ long hostId) {
+ TypedQuery<HostVersionEntity> query = entityManagerProvider.get()
+ .createNamedQuery("hostVersionByClusterStackVersionAndHostId", HostVersionEntity.class);
+
+ query.setParameter("clusterId", clusterId);
+ query.setParameter("stackName", stackId.getStackName());
+ query.setParameter("stackVersion", stackId.getStackVersion());
+ query.setParameter("version", version);
+ query.setParameter("hostId", hostId);
+
+ return daoUtils.selectSingle(query);
+ }
+
@Transactional
public void removeByHostName(String hostName) {
Collection<HostVersionEntity> hostVersions = this.findByHost(hostName);
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
index b69518b..6be4b50 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
@@ -62,6 +62,15 @@ import org.apache.ambari.server.state.RepositoryVersionState;
"SELECT hostVersion FROM HostVersionEntity hostVersion JOIN hostVersion.hostEntity host JOIN host.clusterEntities clusters " +
"WHERE clusters.clusterName=:clusterName AND hostVersion.repositoryVersion.stack.stackName=:stackName AND hostVersion.repositoryVersion.stack.stackVersion=:stackVersion AND hostVersion.repositoryVersion.version=:version AND " +
"hostVersion.hostEntity.hostName=:hostName"),
+
+ @NamedQuery(name = "hostVersionByClusterHostIdAndState", query =
+ "SELECT hostVersion FROM HostVersionEntity hostVersion JOIN hostVersion.hostEntity host JOIN host.clusterEntities clusters " +
+ "WHERE clusters.clusterId=:clusterId AND hostVersion.hostId=:hostId AND hostVersion.state=:state"),
+
+ @NamedQuery(name = "hostVersionByClusterStackVersionAndHostId", query =
+ "SELECT hostVersion FROM HostVersionEntity hostVersion JOIN hostVersion.hostEntity host JOIN host.clusterEntities clusters " +
+ "WHERE hostVersion.hostId=:hostId AND clusters.clusterId=:clusterId AND hostVersion.repositoryVersion.stack.stackName=:stackName " +
+ "AND hostVersion.repositoryVersion.stack.stackVersion=:stackVersion AND hostVersion.repositoryVersion.version=:version")
})
public class HostVersionEntity {
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 4212975..c6d01e8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -1560,8 +1560,8 @@ public class ClusterImpl implements Cluster {
StackId repoVersionStackId = new StackId(repoVersionStackEntity);
HostVersionEntity hostVersionEntity = hostVersionDAO.findByClusterStackVersionAndHost(
- getClusterName(), repoVersionStackId, repositoryVersion.getVersion(),
- host.getHostName());
+ getClusterId(), repoVersionStackId, repositoryVersion.getVersion(),
+ host.getHostId());
hostTransitionStateWriteLock.lock();
try {
@@ -1576,7 +1576,7 @@ public class ClusterImpl implements Cluster {
hostVersionDAO.create(hostVersionEntity);
}
- HostVersionEntity currentVersionEntity = hostVersionDAO.findByHostAndStateCurrent(getClusterName(), host.getHostName());
+ HostVersionEntity currentVersionEntity = hostVersionDAO.findByHostAndStateCurrent(getClusterId(), host.getHostId());
boolean isCurrentPresent = (currentVersionEntity != null);
final ServiceComponentHostSummary hostSummary = new ServiceComponentHostSummary(ambariMetaInfo, host, stack);
http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index bfb6214..1bd60a8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -1319,43 +1319,48 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public ServiceComponentHostResponse convertToResponse() {
- readLock.lock();
+ clusterGlobalLock.readLock().lock();
try {
- HostComponentStateEntity hostComponentStateEntity = getStateEntity();
- if (null == hostComponentStateEntity) {
- LOG.warn("Could not convert ServiceComponentHostResponse to a response. It's possible that Host " + getHostName() + " was deleted.");
- return null;
- }
+ readLock.lock();
+ try {
+ HostComponentStateEntity hostComponentStateEntity = getStateEntity();
+ if (null == hostComponentStateEntity) {
+ LOG.warn("Could not convert ServiceComponentHostResponse to a response. It's possible that Host " + getHostName() + " was deleted.");
+ return null;
+ }
- String clusterName = serviceComponent.getClusterName();
- String serviceName = serviceComponent.getServiceName();
- String serviceComponentName = serviceComponent.getName();
- String hostName = getHostName();
- String state = getState().toString();
- String stackId = getStackVersion().getStackId();
- String desiredState = getDesiredState().toString();
- String desiredStackId = getDesiredStackVersion().getStackId();
- HostComponentAdminState componentAdminState = getComponentAdminState();
- UpgradeState upgradeState = hostComponentStateEntity.getUpgradeState();
-
- ServiceComponentHostResponse r = new ServiceComponentHostResponse(
- clusterName, serviceName,
- serviceComponentName, hostName, state,
- stackId, desiredState,
- desiredStackId, componentAdminState);
-
- r.setActualConfigs(actualConfigs);
- r.setUpgradeState(upgradeState);
+ String clusterName = serviceComponent.getClusterName();
+ String serviceName = serviceComponent.getServiceName();
+ String serviceComponentName = serviceComponent.getName();
+ String hostName = getHostName();
+ String state = getState().toString();
+ String stackId = getStackVersion().getStackId();
+ String desiredState = getDesiredState().toString();
+ String desiredStackId = getDesiredStackVersion().getStackId();
+ HostComponentAdminState componentAdminState = getComponentAdminState();
+ UpgradeState upgradeState = hostComponentStateEntity.getUpgradeState();
+
+ ServiceComponentHostResponse r = new ServiceComponentHostResponse(
+ clusterName, serviceName,
+ serviceComponentName, hostName, state,
+ stackId, desiredState,
+ desiredStackId, componentAdminState);
+
+ r.setActualConfigs(actualConfigs);
+ r.setUpgradeState(upgradeState);
- try {
- r.setStaleConfig(helper.isStaleConfigs(this));
- } catch (Exception e) {
- LOG.error("Could not determine stale config", e);
- }
+ try {
+ r.setStaleConfig(helper.isStaleConfigs(this));
+ } catch (Exception e) {
+ LOG.error("Could not determine stale config", e);
+ }
- return r;
+ return r;
+ } finally {
+ readLock.unlock();
+ }
} finally {
- readLock.unlock();
+ clusterGlobalLock.readLock().unlock();
}
}
@@ -1797,6 +1802,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
}
final String hostName = getHostName();
+ final long hostId = getHost().getHostId();
final Set<Cluster> clustersForHost = clusters.getClustersForHost(hostName);
if (clustersForHost.size() != 1) {
throw new AmbariException("Host " + hostName + " should be assigned only to one cluster");
@@ -1815,7 +1821,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
repositoryVersion = createRepositoryVersion(version, stackId, stackInfo);
}
- final HostEntity host = hostDAO.findByName(hostName);
+ final HostEntity host = hostDAO.findById(hostId);
cluster.transitionHostVersionState(host, repositoryVersion, stackId);
} finally {
writeLock.unlock();