You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2018/08/21 17:38:20 UTC
[ambari] branch branch-2.7 updated: AMBARI-24518. Requests STOMP
topic sent updates for host check request. (#2142)
This is an automated email from the ASF dual-hosted git repository.
jluniya pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new ec968ed AMBARI-24518. Requests STOMP topic sent updates for host check request. (#2142)
ec968ed is described below
commit ec968edf78a11921d0555e3e481c0ce9763d80f1
Author: Myroslav Papirkovskyi <mp...@apache.org>
AuthorDate: Tue Aug 21 20:38:18 2018 +0300
AMBARI-24518. Requests STOMP topic sent updates for host check request. (#2142)
* AMBARI-24518. Requests STOMP topic sent updates for host check request. (mpapirkovskyy)
* AMBARI-24518. Requests STOMP topic sent updates for host check request. (mpapirkovskyy)
---
.../server/actionmanager/ActionDBAccessorImpl.java | 14 ++++++-
.../events/listeners/tasks/TaskStatusListener.java | 46 ++++++++++++++++------
2 files changed, 47 insertions(+), 13 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 3543486..1a055b3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -436,8 +436,18 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
taskEventPublisher.publish(taskCreateEvent);
List<HostRoleCommandEntity> hostRoleCommandEntities = hostRoleCommandDAO.findByRequest(requestEntity.getRequestId());
- STOMPUpdatePublisher.publish(new RequestUpdateEvent(requestEntity,
- hostRoleCommandDAO, topologyManager, clusterName, hostRoleCommandEntities));
+
+ // "requests" STOMP topic is used for clusters related requests only.
+ // Requests without clusters (like host checks) should be posted to divided topic.
+ if (clusterName != null) {
+ STOMPUpdatePublisher.publish(new RequestUpdateEvent(requestEntity,
+ hostRoleCommandDAO, topologyManager, clusterName, hostRoleCommandEntities));
+ } else {
+ LOG.debug("No STOMP request update event was fired for new request due no cluster related, " +
+ "request id: {}, command name: {}",
+ requestEntity.getRequestId(),
+ requestEntity.getCommandName());
+ }
}
@Override
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
index 0570fdf..b188729 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ambari.server.ClusterNotFoundException;
import org.apache.ambari.server.EagerSingleton;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
@@ -123,7 +122,7 @@ public class TaskStatusListener {
* @param event Consumes {@link TaskUpdateEvent}.
*/
@Subscribe
- public void onTaskUpdateEvent(TaskUpdateEvent event) throws ClusterNotFoundException {
+ public void onTaskUpdateEvent(TaskUpdateEvent event) {
LOG.debug("Received task update event {}", event);
List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
List<HostRoleCommand> hostRoleCommandWithReceivedStatus = new ArrayList<>();
@@ -145,13 +144,27 @@ public class TaskStatusListener {
requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
if (!activeTasksMap.get(reportedTaskId).getStatus().equals(hostRoleCommand.getStatus())) {
- Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new HashSet<>();
- hostRoleCommands.add(new RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(),
- hostRoleCommand.getRequestId(),
- hostRoleCommand.getStatus(),
- hostRoleCommand.getHostName()));
- requestsToPublish.add(new RequestUpdateEvent(hostRoleCommand.getRequestId(),
- activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), hostRoleCommands));
+ // Ignore requests not related to any cluster. "requests" topic is used for cluster requests only.
+ Long clusterId = activeRequestMap.get(hostRoleCommand.getRequestId()).getClusterId();
+ if (clusterId != null && clusterId != -1) {
+ Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new HashSet<>();
+ hostRoleCommands.add(new RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(),
+ hostRoleCommand.getRequestId(),
+ hostRoleCommand.getStatus(),
+ hostRoleCommand.getHostName()));
+ requestsToPublish.add(new RequestUpdateEvent(hostRoleCommand.getRequestId(),
+ activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), hostRoleCommands));
+ } else {
+ LOG.debug("No STOMP request update event was fired for host component status change due no cluster related, " +
+ "request id: {}, role: {}, role command: {}, host: {}, task id: {}, old state: {}, new state: {}",
+ hostRoleCommand.getRequestId(),
+ hostRoleCommand.getRole(),
+ hostRoleCommand.getRoleCommand(),
+ hostRoleCommand.getHostName(),
+ hostRoleCommand.getTaskId(),
+ activeTasksMap.get(reportedTaskId).getStatus(),
+ hostRoleCommand.getStatus());
+ }
}
}
}
@@ -264,7 +277,8 @@ public class TaskStatusListener {
// Request entity of the hostrolecommand should be persisted before publishing task create event
assert requestEntity != null;
Set<StageEntityPK> stageEntityPKs = Sets.newHashSet(stageEntityPK);
- ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(), stageEntityPKs);
+ ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(),
+ stageEntityPKs, requestEntity.getClusterId());
activeRequestMap.put(requestId, request);
}
}
@@ -524,11 +538,14 @@ public class TaskStatusListener {
private HostRoleStatus status;
private HostRoleStatus displayStatus;
private Set <StageEntityPK> stageEntityPks;
+ private Long clusterId;
- public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks) {
+ public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks,
+ Long clusterId) {
this.status = status;
this.displayStatus = displayStatus;
this.stageEntityPks = stageEntityPks;
+ this.clusterId = clusterId;
}
public HostRoleStatus getStatus() {
@@ -559,6 +576,13 @@ public class TaskStatusListener {
stageEntityPks.add(stageEntityPK);
}
+ public Long getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(Long clusterId) {
+ this.clusterId = clusterId;
+ }
}
/**