You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2018/08/21 17:38:20 UTC

[ambari] branch branch-2.7 updated: AMBARI-24518. Requests STOMP topic sent updates for host check request. (#2142)

This is an automated email from the ASF dual-hosted git repository.

jluniya pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new ec968ed  AMBARI-24518. Requests STOMP topic sent updates for host check request. (#2142)
ec968ed is described below

commit ec968edf78a11921d0555e3e481c0ce9763d80f1
Author: Myroslav Papirkovskyi <mp...@apache.org>
AuthorDate: Tue Aug 21 20:38:18 2018 +0300

    AMBARI-24518. Requests STOMP topic sent updates for host check request. (#2142)
    
    * AMBARI-24518. Requests STOMP topic sent updates for host check request. (mpapirkovskyy)
    
    * AMBARI-24518. Requests STOMP topic sent updates for host check request. (mpapirkovskyy)
---
 .../server/actionmanager/ActionDBAccessorImpl.java | 14 ++++++-
 .../events/listeners/tasks/TaskStatusListener.java | 46 ++++++++++++++++------
 2 files changed, 47 insertions(+), 13 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 3543486..1a055b3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -436,8 +436,18 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
     taskEventPublisher.publish(taskCreateEvent);
     List<HostRoleCommandEntity> hostRoleCommandEntities = hostRoleCommandDAO.findByRequest(requestEntity.getRequestId());
-    STOMPUpdatePublisher.publish(new RequestUpdateEvent(requestEntity,
-        hostRoleCommandDAO, topologyManager, clusterName, hostRoleCommandEntities));
+
+    // "requests" STOMP topic is used for clusters related requests only.
+    // Requests without clusters (like host checks) should be posted to divided topic.
+    if (clusterName != null) {
+      STOMPUpdatePublisher.publish(new RequestUpdateEvent(requestEntity,
+          hostRoleCommandDAO, topologyManager, clusterName, hostRoleCommandEntities));
+    } else {
+      LOG.debug("No STOMP request update event was fired for new request due no cluster related, " +
+              "request id: {}, command name: {}",
+          requestEntity.getRequestId(),
+          requestEntity.getCommandName());
+    }
   }
 
   @Override
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
index 0570fdf..b188729 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.ambari.server.ClusterNotFoundException;
 import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
@@ -123,7 +122,7 @@ public class TaskStatusListener {
    * @param event Consumes {@link TaskUpdateEvent}.
    */
   @Subscribe
-  public void onTaskUpdateEvent(TaskUpdateEvent event) throws ClusterNotFoundException {
+  public void onTaskUpdateEvent(TaskUpdateEvent event) {
     LOG.debug("Received task update event {}", event);
     List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
     List<HostRoleCommand>  hostRoleCommandWithReceivedStatus =  new ArrayList<>();
@@ -145,13 +144,27 @@ public class TaskStatusListener {
         requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
 
         if (!activeTasksMap.get(reportedTaskId).getStatus().equals(hostRoleCommand.getStatus())) {
-          Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new HashSet<>();
-          hostRoleCommands.add(new RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(),
-              hostRoleCommand.getRequestId(),
-              hostRoleCommand.getStatus(),
-              hostRoleCommand.getHostName()));
-          requestsToPublish.add(new RequestUpdateEvent(hostRoleCommand.getRequestId(),
-              activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), hostRoleCommands));
+          // Ignore requests not related to any cluster. "requests" topic is used for cluster requests only.
+          Long clusterId = activeRequestMap.get(hostRoleCommand.getRequestId()).getClusterId();
+          if (clusterId != null && clusterId != -1) {
+            Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new HashSet<>();
+            hostRoleCommands.add(new RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(),
+                hostRoleCommand.getRequestId(),
+                hostRoleCommand.getStatus(),
+                hostRoleCommand.getHostName()));
+            requestsToPublish.add(new RequestUpdateEvent(hostRoleCommand.getRequestId(),
+                activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), hostRoleCommands));
+          } else {
+            LOG.debug("No STOMP request update event was fired for host component status change due no cluster related, " +
+                    "request id: {}, role: {}, role command: {}, host: {}, task id: {}, old state: {}, new state: {}",
+                hostRoleCommand.getRequestId(),
+                hostRoleCommand.getRole(),
+                hostRoleCommand.getRoleCommand(),
+                hostRoleCommand.getHostName(),
+                hostRoleCommand.getTaskId(),
+                activeTasksMap.get(reportedTaskId).getStatus(),
+                hostRoleCommand.getStatus());
+          }
         }
       }
     }
@@ -264,7 +277,8 @@ public class TaskStatusListener {
       // Request entity of the hostrolecommand should be persisted before publishing task create event
       assert requestEntity != null;
       Set<StageEntityPK> stageEntityPKs =  Sets.newHashSet(stageEntityPK);
-      ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(), stageEntityPKs);
+      ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(),
+          stageEntityPKs, requestEntity.getClusterId());
       activeRequestMap.put(requestId, request);
     }
   }
@@ -524,11 +538,14 @@ public class TaskStatusListener {
     private HostRoleStatus status;
     private HostRoleStatus displayStatus;
     private Set <StageEntityPK> stageEntityPks;
+    private Long clusterId;
 
-    public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks) {
+    public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks,
+                         Long clusterId) {
       this.status = status;
       this.displayStatus = displayStatus;
       this.stageEntityPks = stageEntityPks;
+      this.clusterId = clusterId;
     }
 
     public HostRoleStatus getStatus() {
@@ -559,6 +576,13 @@ public class TaskStatusListener {
       stageEntityPks.add(stageEntityPK);
     }
 
+    public Long getClusterId() {
+      return clusterId;
+    }
+
+    public void setClusterId(Long clusterId) {
+      this.clusterId = clusterId;
+    }
   }
 
   /**