You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2019/09/26 13:14:24 UTC

[ambari] branch branch-2.7 updated: AMBARI-25380. UI does not reflect/update task logs. (#3086)

This is an automated email from the ASF dual-hosted git repository.

mpapirkovskyy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 16a1ca4  AMBARI-25380. UI does not reflect/update task logs. (#3086)
16a1ca4 is described below

commit 16a1ca4363e4237a087172b0bf52c00269c5ba18
Author: Myroslav Papirkovskyi <mp...@apache.org>
AuthorDate: Thu Sep 26 16:14:17 2019 +0300

    AMBARI-25380. UI does not reflect/update task logs. (#3086)
    
    * AMBARI-25380. UI does not reflect/update task logs. (AMBARI-24974. Sometimes Task Log is not refreshed in UI after operation completes.) (mpapirkovskyy) (#2745)
    
    * AMBARI-25380. UI does not reflect/update task logs. (AMBARI-24974. Sometimes Task Log is not refreshed in UI after operation completes.) (mpapirkovskyy) (#2747)
---
 .../api/stomp/NamedTasksSubscribeListener.java     |  75 +++++++++
 .../server/api/stomp/NamedTasksSubscriptions.java  | 166 +++++++++++++++++++
 .../configuration/spring/AgentStompConfig.java     |   3 +-
 .../configuration/spring/ApiStompConfig.java       |   6 +
 .../server/events/DefaultMessageEmitter.java       |   4 +-
 .../ambari/server/events/NamedTaskUpdateEvent.java | 176 +++++++++++++++++++++
 .../apache/ambari/server/events/STOMPEvent.java    |   5 +
 .../events/listeners/tasks/TaskStatusListener.java |  23 ++-
 .../api/stomp/NamedTasksSubscriptionsTest.java     | 150 ++++++++++++++++++
 .../listeners/tasks/TaskStatusListenerTest.java    |  92 ++++++++++-
 10 files changed, 695 insertions(+), 5 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscribeListener.java b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscribeListener.java
new file mode 100644
index 0000000..6882236
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscribeListener.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.stomp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.messaging.SessionDisconnectEvent;
+import org.springframework.web.socket.messaging.SessionSubscribeEvent;
+import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;
+
+@Component
+public class NamedTasksSubscribeListener {
+  private static Logger LOG = LoggerFactory.getLogger(NamedTasksSubscribeListener.class);
+
+  @Autowired
+  private NamedTasksSubscriptions namedTasksSubscriptions;
+
+  @EventListener
+  public void subscribe(SessionSubscribeEvent sse)
+  {
+    MessageHeaders msgHeaders = sse.getMessage().getHeaders();
+    String sessionId  = (String) msgHeaders.get("simpSessionId");
+    String destination  = (String) msgHeaders.get("simpDestination");
+    String id  = (String) msgHeaders.get("simpSubscriptionId");
+    if (sessionId != null && destination != null && id != null) {
+      namedTasksSubscriptions.addDestination(sessionId, destination, id);
+    }
+    LOG.info(String.format("API subscribe was arrived with sessionId = %s, destination = %s and id = %s",
+        sessionId, destination, id));
+  }
+
+  @EventListener
+  public void unsubscribe(SessionUnsubscribeEvent suse)
+  {
+    MessageHeaders msgHeaders = suse.getMessage().getHeaders();
+    String sessionId  = (String) msgHeaders.get("simpSessionId");
+    String id  = (String) msgHeaders.get("simpSubscriptionId");
+    if (sessionId != null && id != null) {
+      namedTasksSubscriptions.removeId(sessionId, id);
+    }
+    LOG.info(String.format("API unsubscribe was arrived with sessionId = %s and id = %s",
+        sessionId, id));
+  }
+
+  @EventListener
+  public void disconnect(SessionDisconnectEvent sde)
+  {
+    MessageHeaders msgHeaders = sde.getMessage().getHeaders();
+    String sessionId  = (String) msgHeaders.get("simpSessionId");
+    if (sessionId != null) {
+      namedTasksSubscriptions.removeSession(sessionId);
+    }
+    LOG.info(String.format("API disconnect was arrived with sessionId = %s",
+        sessionId));
+  }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptions.java b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptions.java
new file mode 100644
index 0000000..09acdf3
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptions.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.stomp;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+@Singleton
+public class NamedTasksSubscriptions {
+  private static Logger LOG = LoggerFactory.getLogger(NamedTasksSubscriptions.class);
+
+  private ConcurrentHashMap<String, List<SubscriptionId>> taskIds = new ConcurrentHashMap<>();
+  private final String subscriptionPrefix = "/events/tasks/";
+  private final Lock taskIdsLock = new ReentrantLock();
+
+  private Provider<TaskStatusListener> taskStatusListenerProvider;
+
+  @Inject
+  public NamedTasksSubscriptions(Provider<TaskStatusListener> taskStatusListenerProvider) {
+    this.taskStatusListenerProvider = taskStatusListenerProvider;
+  }
+
+  public void addTaskId(String sessionId, Long taskId, String id) {
+    try {
+      taskIdsLock.lock();
+      taskIds.compute(sessionId, (sid, ids) -> {
+        if (ids == null) {
+          ids = new ArrayList<>();
+        }
+        AtomicBoolean completed = new AtomicBoolean(false);
+        taskStatusListenerProvider.get().getActiveTasksMap().computeIfPresent(taskId, (tid, task) -> {
+          if (task.getStatus().isCompletedState()) {
+            completed.set(true);
+          }
+          return task;
+        });
+        if (!completed.get()) {
+          ids.add(new SubscriptionId(taskId, id));
+        }
+        return ids;
+      });
+      LOG.info(String.format("Task subscription was added for sessionId = %s, taskId = %s, id = %s",
+          sessionId, taskId, id));
+    } finally {
+      taskIdsLock.unlock();
+    }
+  }
+
+  public void removeId(String sessionId, String id) {
+    taskIds.computeIfPresent(sessionId, (sid, tasks) -> {
+      Iterator<SubscriptionId> iterator = tasks.iterator();
+      while (iterator.hasNext()) {
+        if (iterator.next().getId().equals(id)) {
+          iterator.remove();
+          LOG.info(String.format("Task subscription was removed for sessionId = %s, id = %s", sessionId, id));
+        }
+      }
+      return tasks;
+    });
+  }
+
+  public void removeTaskId(Long taskId) {
+    try {
+      taskIdsLock.lock();
+      for (String sessionId : taskIds.keySet()) {
+        taskIds.computeIfPresent(sessionId, (id, tasks) -> {
+          Iterator<SubscriptionId> iterator = tasks.iterator();
+          while (iterator.hasNext()) {
+            if (iterator.next().getTaskId().equals(taskId)) {
+              iterator.remove();
+              LOG.info(String.format("Task subscription was removed for sessionId = %s and taskId = %s",
+                  sessionId, taskId));
+            }
+          }
+          return tasks;
+        });
+      }
+    } finally {
+      taskIdsLock.unlock();
+    }
+  }
+
+  public void removeSession(String sessionId) {
+    try {
+      taskIdsLock.lock();
+      taskIds.remove(sessionId);
+      LOG.info(String.format("Task subscriptions were removed for sessionId = %s", sessionId));
+    } finally {
+      taskIdsLock.unlock();
+    }
+  }
+
+  public Optional<Long> matchDestination(String destination) {
+    Optional<Long> taskIdOpt = Optional.of(StringUtils.substringAfter(destination, subscriptionPrefix))
+        .filter(StringUtils::isNotEmpty)
+        .filter(StringUtils::isNumeric)
+        .map(Long::parseLong);
+    return taskIdOpt;
+  }
+
+  public void addDestination(String sessionId, String destination, String id) {
+    Optional<Long> taskIdOpt = matchDestination(destination);
+    if (taskIdOpt.isPresent()) {
+      addTaskId(sessionId, taskIdOpt.get(), id);
+    }
+  }
+
+  public boolean checkTaskId(Long taskId) {
+    for (List<SubscriptionId> ids: taskIds.values()) {
+      for (SubscriptionId subscriptionId : ids) {
+        if (subscriptionId.getTaskId().equals(taskId)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  public class SubscriptionId {
+    private final Long taskId;
+    private final String id;
+
+    public SubscriptionId(Long taskId, String id) {
+      this.taskId = taskId;
+      this.id = id;
+    }
+
+    public Long getTaskId() {
+      return taskId;
+    }
+
+    public String getId() {
+      return id;
+    }
+  }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
index e1251d9..87c36c9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
@@ -20,7 +20,6 @@ package org.apache.ambari.server.configuration.spring;
 import javax.servlet.ServletContext;
 
 import org.apache.ambari.server.agent.stomp.HeartbeatController;
-import org.apache.ambari.server.api.stomp.TestController;
 import org.apache.ambari.server.events.DefaultMessageEmitter;
 import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
 import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
@@ -41,7 +40,7 @@ import com.google.inject.Injector;
 
 @Configuration
 @EnableWebSocketMessageBroker
-@ComponentScan(basePackageClasses = {TestController.class, HeartbeatController.class})
+@ComponentScan(basePackageClasses = {HeartbeatController.class})
 @Import({RootStompConfig.class,GuiceBeansConfig.class})
 public class AgentStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
   private org.apache.ambari.server.configuration.Configuration configuration;
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
index 44479ae..e147200 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.configuration.spring;
 
+import org.apache.ambari.server.api.stomp.NamedTasksSubscriptions;
 import org.apache.ambari.server.api.stomp.TestController;
 import org.apache.ambari.server.events.DefaultMessageEmitter;
 import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
@@ -50,6 +51,11 @@ public class ApiStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
     return new STOMPUpdateListener(injector, DefaultMessageEmitter.DEFAULT_API_EVENT_TYPES);
   }
 
+  @Bean
+  public NamedTasksSubscriptions namedTasksSubscribtions(Injector injector) {
+    return injector.getInstance(NamedTasksSubscriptions.class);
+  }
+
   @Override
   public void registerStompEndpoints(StompEndpointRegistry registry) {
     registry.addEndpoint("/v1")
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
index e9f5c93..48d4fbc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
@@ -43,6 +43,7 @@ public class DefaultMessageEmitter extends MessageEmitter {
         put(STOMPEvent.Type.AGENT_CONFIGS, "/configs");
         put(STOMPEvent.Type.CONFIGS, "/events/configs");
         put(STOMPEvent.Type.HOSTCOMPONENT, "/events/hostcomponents");
+        put(STOMPEvent.Type.NAMEDTASK, "/events/tasks");
         put(STOMPEvent.Type.REQUEST, "/events/requests");
         put(STOMPEvent.Type.SERVICE, "/events/services");
         put(STOMPEvent.Type.HOST, "/events/hosts");
@@ -70,6 +71,7 @@ public class DefaultMessageEmitter extends MessageEmitter {
         STOMPEvent.Type.UI_TOPOLOGY,
         STOMPEvent.Type.CONFIGS,
         STOMPEvent.Type.HOSTCOMPONENT,
+        STOMPEvent.Type.NAMEDTASK,
         STOMPEvent.Type.REQUEST,
         STOMPEvent.Type.SERVICE,
         STOMPEvent.Type.HOST,
@@ -101,6 +103,6 @@ public class DefaultMessageEmitter extends MessageEmitter {
 
   @Override
   protected String getDestination(STOMPEvent stompEvent) {
-    return DEFAULT_DESTINATIONS.get(stompEvent.getType());
+    return stompEvent.completeDestination(DEFAULT_DESTINATIONS.get(stompEvent.getType()));
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/NamedTaskUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedTaskUpdateEvent.java
new file mode 100644
index 0000000..04ee04e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedTaskUpdateEvent.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.Objects;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Single host role command update info. This update will be sent to all subscribed recipients.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class NamedTaskUpdateEvent extends STOMPEvent {
+
+  private Long id;
+  private Long requestId;
+  private String hostName;
+  private Long endTime;
+  private HostRoleStatus status;
+  private String errorLog;
+  private String outLog;
+  private String stderr;
+  private String stdout;
+
+  @JsonProperty("structured_out")
+  private String structuredOut;
+
+  public NamedTaskUpdateEvent(Long id, Long requestId, String hostName, Long endTime, HostRoleStatus status,
+                              String errorLog, String outLog, String stderr, String stdout, String structuredOut) {
+    super(Type.NAMEDTASK);
+    this.id = id;
+    this.requestId = requestId;
+    this.hostName = hostName;
+    this.endTime = endTime;
+    this.status = status;
+    this.errorLog = errorLog;
+    this.outLog = outLog;
+    this.stderr = stderr;
+    this.stdout = stdout;
+    this.structuredOut = structuredOut;
+  }
+
+  public NamedTaskUpdateEvent(HostRoleCommand hostRoleCommand) {
+    this(hostRoleCommand.getTaskId(), hostRoleCommand.getRequestId(), hostRoleCommand.getHostName(),
+        hostRoleCommand.getEndTime(), hostRoleCommand.getStatus(), hostRoleCommand.getErrorLog(),
+        hostRoleCommand.getOutputLog(), hostRoleCommand.getStderr(), hostRoleCommand.getStdout(),
+        hostRoleCommand.getStructuredOut());
+  }
+
+  public Long getId() {
+    return id;
+  }
+
+  public void setId(Long id) {
+    this.id = id;
+  }
+
+  public Long getRequestId() {
+    return requestId;
+  }
+
+  public void setRequestId(Long requestId) {
+    this.requestId = requestId;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String hostName) {
+    this.hostName = hostName;
+  }
+
+  public Long getEndTime() {
+    return endTime;
+  }
+
+  public void setEndTime(Long endTime) {
+    this.endTime = endTime;
+  }
+
+  public HostRoleStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(HostRoleStatus status) {
+    this.status = status;
+  }
+
+  public String getErrorLog() {
+    return errorLog;
+  }
+
+  public void setErrorLog(String errorLog) {
+    this.errorLog = errorLog;
+  }
+
+  public String getOutLog() {
+    return outLog;
+  }
+
+  public void setOutLog(String outLog) {
+    this.outLog = outLog;
+  }
+
+  public String getStderr() {
+    return stderr;
+  }
+
+  public void setStderr(String stderr) {
+    this.stderr = stderr;
+  }
+
+  public String getStdout() {
+    return stdout;
+  }
+
+  public void setStdout(String stdout) {
+    this.stdout = stdout;
+  }
+
+  public String getStructuredOut() {
+    return structuredOut;
+  }
+
+  public void setStructuredOut(String structuredOut) {
+    this.structuredOut = structuredOut;
+  }
+
+  @Override
+  public String completeDestination(String destination) {
+    return destination + "/" + id;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    NamedTaskUpdateEvent that = (NamedTaskUpdateEvent) o;
+    return Objects.equals(id, that.id) &&
+        Objects.equals(requestId, that.requestId) &&
+        Objects.equals(hostName, that.hostName) &&
+        Objects.equals(endTime, that.endTime) &&
+        status == that.status &&
+        Objects.equals(errorLog, that.errorLog) &&
+        Objects.equals(outLog, that.outLog) &&
+        Objects.equals(stderr, that.stderr) &&
+        Objects.equals(stdout, that.stdout) &&
+        Objects.equals(structuredOut, that.structuredOut);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id, requestId, hostName, endTime, status, errorLog, outLog, stderr, stdout, structuredOut);
+  }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
index 15c3b1e..f3e119f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
@@ -54,6 +54,7 @@ public abstract class STOMPEvent {
     CONFIGS("events.configs"),
     HOSTCOMPONENT("events.hostcomponents"),
     NAMEDHOSTCOMPONENT("events.hostrolecommands.named"),
+    NAMEDTASK("events.tasks.named"),
     REQUEST("events.requests"),
     SERVICE("events.services"),
     HOST("events.hosts"),
@@ -76,4 +77,8 @@ public abstract class STOMPEvent {
       return metricName;
     }
   }
+
+  public String completeDestination(String destination) {
+    return destination;
+  }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
index b188729..73aa533 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -34,7 +34,9 @@ import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.Request;
 import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.api.stomp.NamedTasksSubscriptions;
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
+import org.apache.ambari.server.events.NamedTaskUpdateEvent;
 import org.apache.ambari.server.events.RequestUpdateEvent;
 import org.apache.ambari.server.events.TaskCreateEvent;
 import org.apache.ambari.server.events.TaskUpdateEvent;
@@ -95,12 +97,15 @@ public class TaskStatusListener {
 
   private STOMPUpdatePublisher STOMPUpdatePublisher;
 
+  private NamedTasksSubscriptions namedTasksSubscriptions;
+
   @Inject
   public TaskStatusListener(TaskEventPublisher taskEventPublisher, StageDAO stageDAO, RequestDAO requestDAO,
-                            STOMPUpdatePublisher STOMPUpdatePublisher) {
+                            STOMPUpdatePublisher STOMPUpdatePublisher, NamedTasksSubscriptions namedTasksSubscriptions) {
     this.stageDAO = stageDAO;
     this.requestDAO = requestDAO;
     this.STOMPUpdatePublisher = STOMPUpdatePublisher;
+    this.namedTasksSubscriptions = namedTasksSubscriptions;
     taskEventPublisher.register(this);
   }
 
@@ -129,6 +134,7 @@ public class TaskStatusListener {
     Set<StageEntityPK> stagesWithReceivedTaskStatus = new HashSet<>();
     Set<Long> requestIdsWithReceivedTaskStatus =  new HashSet<>();
     Set<RequestUpdateEvent> requestsToPublish = new HashSet<>();
+    Set<NamedTaskUpdateEvent> namedTasksToPublish = new HashSet<>();
 
     for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) {
       Long reportedTaskId = hostRoleCommand.getTaskId();
@@ -143,6 +149,17 @@ public class TaskStatusListener {
         stagesWithReceivedTaskStatus.add(stageEntityPK);
         requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
 
+        NamedTaskUpdateEvent namedTaskUpdateEvent = new NamedTaskUpdateEvent(hostRoleCommand);
+        if (namedTasksSubscriptions.checkTaskId(reportedTaskId)
+            && !namedTaskUpdateEvent.equals(new NamedTaskUpdateEvent(activeTasksMap.get(reportedTaskId)))) {
+          namedTasksToPublish.add(namedTaskUpdateEvent);
+        }
+
+        // unsubscribe on complete (no any update will be sent anyway)
+        if (hostRoleCommand.getStatus().equals(HostRoleStatus.COMPLETED)) {
+          namedTasksSubscriptions.removeTaskId(reportedTaskId);
+        }
+
         if (!activeTasksMap.get(reportedTaskId).getStatus().equals(hostRoleCommand.getStatus())) {
           // Ignore requests not related to any cluster. "requests" topic is used for cluster requests only.
           Long clusterId = activeRequestMap.get(hostRoleCommand.getRequestId()).getClusterId();
@@ -178,6 +195,10 @@ public class TaskStatusListener {
     for (RequestUpdateEvent requestToPublish : requestsToPublish) {
       STOMPUpdatePublisher.publish(requestToPublish);
     }
+    for (NamedTaskUpdateEvent namedTaskUpdateEvent : namedTasksToPublish) {
+      LOG.info(String.format("NamedTaskUpdateEvent with id %s will be send", namedTaskUpdateEvent.getId()));
+      STOMPUpdatePublisher.publish(namedTaskUpdateEvent);
+    }
   }
 
   /**
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptionsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptionsTest.java
new file mode 100644
index 0000000..2107c41
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptionsTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.stomp;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Provider;
+
+public class NamedTasksSubscriptionsTest {
+  private static final String SESSION_ID_1 = "fdsg3";
+  private static final String SESSION_ID_2 = "idfg6";
+
+  private NamedTasksSubscriptions tasksSubscriptions;
+  private Provider<TaskStatusListener> taskStatusListenerProvider;
+  private TaskStatusListener taskStatusListener;
+
+  @Before
+  public void setupTest() {
+    taskStatusListenerProvider = createMock(Provider.class);
+    taskStatusListener = createMock(TaskStatusListener.class);
+
+    Map<Long, HostRoleCommand> hostRoleCommands = new HashMap<>();
+    HostRoleCommand hostRoleCommand1 = createMock(HostRoleCommand.class);
+    HostRoleCommand hostRoleCommand4 = createMock(HostRoleCommand.class);
+    HostRoleCommand hostRoleCommand5 = createMock(HostRoleCommand.class);
+
+    expect(hostRoleCommand1.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS).anyTimes();
+    expect(hostRoleCommand4.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS).anyTimes();
+    expect(hostRoleCommand5.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS).anyTimes();
+
+    hostRoleCommands.put(1L, hostRoleCommand1);
+    hostRoleCommands.put(4L, hostRoleCommand4);
+    hostRoleCommands.put(5L, hostRoleCommand5);
+    expect(taskStatusListener.getActiveTasksMap()).andReturn(hostRoleCommands).anyTimes();
+    expect(taskStatusListenerProvider.get()).andReturn(taskStatusListener).anyTimes();
+
+    replay(taskStatusListenerProvider, taskStatusListener, hostRoleCommand1, hostRoleCommand4, hostRoleCommand5);
+    tasksSubscriptions = new NamedTasksSubscriptions(taskStatusListenerProvider);
+    tasksSubscriptions.addTaskId(SESSION_ID_1, 1L, "sub-1");
+    tasksSubscriptions.addTaskId(SESSION_ID_1, 5L, "sub-5");
+    tasksSubscriptions.addTaskId(SESSION_ID_2, 1L, "sub-1");
+    tasksSubscriptions.addTaskId(SESSION_ID_2, 4L, "sub-4");
+  }
+
+  @Test
+  public void testMatching() {
+    Optional<Long> taskIdOpt = tasksSubscriptions.matchDestination("/events/tasks/1");
+    assertTrue(taskIdOpt.isPresent());
+    assertEquals(1L, taskIdOpt.get().longValue());
+    assertFalse(tasksSubscriptions.matchDestination("/events/topologies").isPresent());
+  }
+
+  @Test
+  public void testCheckId() {
+    assertTrue(tasksSubscriptions.checkTaskId(1L));
+    assertTrue(tasksSubscriptions.checkTaskId(4L));
+    assertTrue(tasksSubscriptions.checkTaskId(5L));
+    assertFalse(tasksSubscriptions.checkTaskId(2L));
+  }
+
+  @Test
+  public void testRemoveBySessionId() {
+    tasksSubscriptions.removeSession(SESSION_ID_1);
+    assertTrue(tasksSubscriptions.checkTaskId(1L));
+    assertTrue(tasksSubscriptions.checkTaskId(4L));
+    assertFalse(tasksSubscriptions.checkTaskId(5L));
+
+    tasksSubscriptions.removeSession(SESSION_ID_2);
+    assertFalse(tasksSubscriptions.checkTaskId(1L));
+    assertFalse(tasksSubscriptions.checkTaskId(4L));
+    assertFalse(tasksSubscriptions.checkTaskId(5L));
+  }
+
+  @Test
+  public void testRemoveById() {
+    tasksSubscriptions.removeId(SESSION_ID_1, "sub-1");
+    assertTrue(tasksSubscriptions.checkTaskId(1L));
+    assertTrue(tasksSubscriptions.checkTaskId(4L));
+    assertTrue(tasksSubscriptions.checkTaskId(5L));
+
+    tasksSubscriptions.removeId(SESSION_ID_1, "sub-5");
+    assertTrue(tasksSubscriptions.checkTaskId(1L));
+    assertTrue(tasksSubscriptions.checkTaskId(4L));
+    assertFalse(tasksSubscriptions.checkTaskId(5L));
+
+    tasksSubscriptions.removeId(SESSION_ID_2, "sub-1");
+    assertFalse(tasksSubscriptions.checkTaskId(1L));
+    assertTrue(tasksSubscriptions.checkTaskId(4L));
+    assertFalse(tasksSubscriptions.checkTaskId(5L));
+
+    tasksSubscriptions.removeId(SESSION_ID_2, "sub-4");
+    assertFalse(tasksSubscriptions.checkTaskId(1L));
+    assertFalse(tasksSubscriptions.checkTaskId(4L));
+    assertFalse(tasksSubscriptions.checkTaskId(5L));
+  }
+
+  @Test
+  public void testAddDestination() {
+    tasksSubscriptions = new NamedTasksSubscriptions(taskStatusListenerProvider);
+    tasksSubscriptions.addDestination(SESSION_ID_1, "/events/tasks/1", "sub-1");
+    assertTrue(tasksSubscriptions.checkTaskId(1L));
+    assertFalse(tasksSubscriptions.checkTaskId(4L));
+    assertFalse(tasksSubscriptions.checkTaskId(5L));
+
+    tasksSubscriptions.addDestination(SESSION_ID_1, "/events/tasks/5", "sub-5");
+    assertTrue(tasksSubscriptions.checkTaskId(1L));
+    assertFalse(tasksSubscriptions.checkTaskId(4L));
+    assertTrue(tasksSubscriptions.checkTaskId(5L));
+
+    tasksSubscriptions.addDestination(SESSION_ID_2, "/events/tasks/1", "sub-1");
+    assertTrue(tasksSubscriptions.checkTaskId(1L));
+    assertFalse(tasksSubscriptions.checkTaskId(4L));
+    assertTrue(tasksSubscriptions.checkTaskId(5L));
+
+    tasksSubscriptions.addDestination(SESSION_ID_2, "/events/tasks/4", "sub-4");
+    assertTrue(tasksSubscriptions.checkTaskId(1L));
+    assertTrue(tasksSubscriptions.checkTaskId(4L));
+    assertTrue(tasksSubscriptions.checkTaskId(5L));
+  }
+}
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
index 6e62ef4..03e0655 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
@@ -20,7 +20,10 @@ package org.apache.ambari.server.events.listeners.tasks;
 
 import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -32,6 +35,8 @@ import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ExecutionCommandWrapperFactory;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.api.stomp.NamedTasksSubscriptions;
+import org.apache.ambari.server.events.NamedTaskUpdateEvent;
 import org.apache.ambari.server.events.TaskCreateEvent;
 import org.apache.ambari.server.events.TaskUpdateEvent;
 import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
@@ -44,12 +49,14 @@ import org.apache.ambari.server.orm.entities.RequestEntity;
 import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.orm.entities.StageEntityPK;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 
 
 public class TaskStatusListenerTest extends EasyMockSupport {
@@ -93,6 +100,7 @@ public class TaskStatusListenerTest extends EasyMockSupport {
     StageEntity stageEntity = createNiceMock(StageEntity.class);
     RequestEntity requestEntity = createNiceMock(RequestEntity.class);
     STOMPUpdatePublisher statePublisher = createNiceMock(STOMPUpdatePublisher.class);
+    NamedTasksSubscriptions namedTasksSubscriptions = createNiceMock(NamedTasksSubscriptions.class);
     EasyMock.expect(stageEntity.getStatus()).andReturn(hostRoleStatus).anyTimes();;
     EasyMock.expect(stageEntity.getDisplayStatus()).andReturn(hostRoleStatus).anyTimes();
     EasyMock.expect(stageEntity.isSkippable()).andReturn(Boolean.FALSE).anyTimes();;
@@ -110,9 +118,11 @@ public class TaskStatusListenerTest extends EasyMockSupport {
     EasyMock.replay(stageDAO);
     EasyMock.replay(requestDAO);
     EasyMock.replay(statePublisher);
+    EasyMock.replay(namedTasksSubscriptions);
 
     TaskCreateEvent event = new TaskCreateEvent(hostRoleCommands);
-    TaskStatusListener listener = new TaskStatusListener(publisher,stageDAO,requestDAO,statePublisher);
+    TaskStatusListener listener = new TaskStatusListener(publisher, stageDAO, requestDAO, statePublisher,
+        namedTasksSubscriptions);
 
     Assert.assertTrue(listener.getActiveTasksMap().isEmpty());
     Assert.assertTrue(listener.getActiveStageMap().isEmpty());
@@ -165,4 +175,84 @@ public class TaskStatusListenerTest extends EasyMockSupport {
     verifyAll();
   }
 
+  @Test
+  public void testNamedTasksEnabled() {
+    final Long taskId = 1L;
+    final Long requestId = 2L;
+    final HostRoleStatus status = HostRoleStatus.COMPLETED;
+    final String stderr = "gW$%SGFbhzsdfHBzdffdfd";
+    final String stdout = "gW$%gTESJ KHBjzdkfjbgv";
+    final String errorLog = " wTHT J YHKtjgsjgbvklfj";
+    final String outputLog = "546ky3kt%V$WYk4tgs5xzs";
+
+    Provider<TaskStatusListener> taskStatusListenerProvider = createMock(Provider.class);
+
+    NamedTasksSubscriptions namedTasksSubscriptions = new NamedTasksSubscriptions(taskStatusListenerProvider);
+
+    Capture<NamedTaskUpdateEvent> namedTaskUpdateEventCapture = Capture.newInstance();
+    STOMPUpdatePublisher stompUpdatePublisher = createStrictMock(STOMPUpdatePublisher.class);
+    stompUpdatePublisher.publish(capture(namedTaskUpdateEventCapture));
+    expectLastCall();
+
+    ServiceComponentHostEvent serviceComponentHostEvent = createNiceMock(ServiceComponentHostEvent.class);
+    HostDAO hostDAO = createNiceMock(HostDAO.class);
+
+    EasyMock.replay(hostDAO);
+    EasyMock.replay(serviceComponentHostEvent);
+
+    List<HostRoleCommand> updateHostRolesCommands = new ArrayList<>();
+    HostRoleCommand updateHostRoleCommand = new HostRoleCommand("hostName", Role.DATANODE,
+        serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory);
+    updateHostRoleCommand.setStatus(status);
+    updateHostRoleCommand.setRequestId(requestId);
+    updateHostRoleCommand.setStageId(3L);
+    updateHostRoleCommand.setTaskId(taskId);
+    updateHostRoleCommand.setStderr(stderr);
+    updateHostRoleCommand.setStdout(stdout);
+    updateHostRoleCommand.setErrorLog(errorLog);
+    updateHostRoleCommand.setOutputLog(outputLog);
+    updateHostRolesCommands.add(updateHostRoleCommand);
+
+    StageDAO stageDAO = createNiceMock(StageDAO.class);
+    RequestDAO requestDAO = createNiceMock(RequestDAO.class);
+
+    EasyMock.replay(stageDAO);
+    EasyMock.replay(requestDAO);
+    EasyMock.replay(stompUpdatePublisher);
+
+    TaskStatusListener listener = new TaskStatusListener(publisher, stageDAO, requestDAO, stompUpdatePublisher,
+        namedTasksSubscriptions);
+
+    expect(taskStatusListenerProvider.get()).andReturn(listener);
+
+    EasyMock.replay(taskStatusListenerProvider);
+
+    // subscribe for task
+    namedTasksSubscriptions.addTaskId("", taskId, "sub-1");
+
+    // add dummy host role command as active
+    // status should be the same to avoid request update event firing
+    HostRoleCommand activeHostRoleCommand = new HostRoleCommand("hostName", Role.DATANODE,
+        serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory);
+    activeHostRoleCommand.setStatus(status);
+    listener.getActiveTasksMap().put(taskId, activeHostRoleCommand);
+
+    listener.onTaskUpdateEvent(new TaskUpdateEvent(updateHostRolesCommands));
+
+    Assert.assertNotNull(namedTaskUpdateEventCapture.getValues());
+    Assert.assertEquals(1L, namedTaskUpdateEventCapture.getValues().size());
+
+    NamedTaskUpdateEvent capturedEvent = namedTaskUpdateEventCapture.getValue();
+
+    Assert.assertEquals(taskId, capturedEvent.getId());
+    Assert.assertEquals(requestId, capturedEvent.getRequestId());
+    Assert.assertEquals(status, capturedEvent.getStatus());
+    Assert.assertEquals(stderr, capturedEvent.getStderr());
+    Assert.assertEquals(stdout, capturedEvent.getStdout());
+    Assert.assertEquals(errorLog, capturedEvent.getErrorLog());
+    Assert.assertEquals(outputLog, capturedEvent.getOutLog());
+
+    verifyAll();
+  }
+
 }