You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2019/09/26 13:14:24 UTC
[ambari] branch branch-2.7 updated: AMBARI-25380. UI does not
reflect/update task logs. (#3086)
This is an automated email from the ASF dual-hosted git repository.
mpapirkovskyy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 16a1ca4 AMBARI-25380. UI does not reflect/update task logs. (#3086)
16a1ca4 is described below
commit 16a1ca4363e4237a087172b0bf52c00269c5ba18
Author: Myroslav Papirkovskyi <mp...@apache.org>
AuthorDate: Thu Sep 26 16:14:17 2019 +0300
AMBARI-25380. UI does not reflect/update task logs. (#3086)
* AMBARI-25380. UI does not reflect/update task logs. (AMBARI-24974. Sometimes Task Log is not refreshed in UI after operation completes.) (mpapirkovskyy) (#2745)
* AMBARI-25380. UI does not reflect/update task logs. (AMBARI-24974. Sometimes Task Log is not refreshed in UI after operation completes.) (mpapirkovskyy) (#2747)
---
.../api/stomp/NamedTasksSubscribeListener.java | 75 +++++++++
.../server/api/stomp/NamedTasksSubscriptions.java | 166 +++++++++++++++++++
.../configuration/spring/AgentStompConfig.java | 3 +-
.../configuration/spring/ApiStompConfig.java | 6 +
.../server/events/DefaultMessageEmitter.java | 4 +-
.../ambari/server/events/NamedTaskUpdateEvent.java | 176 +++++++++++++++++++++
.../apache/ambari/server/events/STOMPEvent.java | 5 +
.../events/listeners/tasks/TaskStatusListener.java | 23 ++-
.../api/stomp/NamedTasksSubscriptionsTest.java | 150 ++++++++++++++++++
.../listeners/tasks/TaskStatusListenerTest.java | 92 ++++++++++-
10 files changed, 695 insertions(+), 5 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscribeListener.java b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscribeListener.java
new file mode 100644
index 0000000..6882236
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscribeListener.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.stomp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.messaging.SessionDisconnectEvent;
+import org.springframework.web.socket.messaging.SessionSubscribeEvent;
+import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;
+
+@Component
+public class NamedTasksSubscribeListener {
+ private static Logger LOG = LoggerFactory.getLogger(NamedTasksSubscribeListener.class);
+
+ @Autowired
+ private NamedTasksSubscriptions namedTasksSubscriptions;
+
+ @EventListener
+ public void subscribe(SessionSubscribeEvent sse)
+ {
+ MessageHeaders msgHeaders = sse.getMessage().getHeaders();
+ String sessionId = (String) msgHeaders.get("simpSessionId");
+ String destination = (String) msgHeaders.get("simpDestination");
+ String id = (String) msgHeaders.get("simpSubscriptionId");
+ if (sessionId != null && destination != null && id != null) {
+ namedTasksSubscriptions.addDestination(sessionId, destination, id);
+ }
+ LOG.info(String.format("API subscribe was arrived with sessionId = %s, destination = %s and id = %s",
+ sessionId, destination, id));
+ }
+
+ @EventListener
+ public void unsubscribe(SessionUnsubscribeEvent suse)
+ {
+ MessageHeaders msgHeaders = suse.getMessage().getHeaders();
+ String sessionId = (String) msgHeaders.get("simpSessionId");
+ String id = (String) msgHeaders.get("simpSubscriptionId");
+ if (sessionId != null && id != null) {
+ namedTasksSubscriptions.removeId(sessionId, id);
+ }
+ LOG.info(String.format("API unsubscribe was arrived with sessionId = %s and id = %s",
+ sessionId, id));
+ }
+
+ @EventListener
+ public void disconnect(SessionDisconnectEvent sde)
+ {
+ MessageHeaders msgHeaders = sde.getMessage().getHeaders();
+ String sessionId = (String) msgHeaders.get("simpSessionId");
+ if (sessionId != null) {
+ namedTasksSubscriptions.removeSession(sessionId);
+ }
+ LOG.info(String.format("API disconnect was arrived with sessionId = %s",
+ sessionId));
+ }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptions.java b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptions.java
new file mode 100644
index 0000000..09acdf3
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptions.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.stomp;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+@Singleton
+public class NamedTasksSubscriptions {
+ private static Logger LOG = LoggerFactory.getLogger(NamedTasksSubscriptions.class);
+
+ private ConcurrentHashMap<String, List<SubscriptionId>> taskIds = new ConcurrentHashMap<>();
+ private final String subscriptionPrefix = "/events/tasks/";
+ private final Lock taskIdsLock = new ReentrantLock();
+
+ private Provider<TaskStatusListener> taskStatusListenerProvider;
+
+ @Inject
+ public NamedTasksSubscriptions(Provider<TaskStatusListener> taskStatusListenerProvider) {
+ this.taskStatusListenerProvider = taskStatusListenerProvider;
+ }
+
+ public void addTaskId(String sessionId, Long taskId, String id) {
+ try {
+ taskIdsLock.lock();
+ taskIds.compute(sessionId, (sid, ids) -> {
+ if (ids == null) {
+ ids = new ArrayList<>();
+ }
+ AtomicBoolean completed = new AtomicBoolean(false);
+ taskStatusListenerProvider.get().getActiveTasksMap().computeIfPresent(taskId, (tid, task) -> {
+ if (task.getStatus().isCompletedState()) {
+ completed.set(true);
+ }
+ return task;
+ });
+ if (!completed.get()) {
+ ids.add(new SubscriptionId(taskId, id));
+ }
+ return ids;
+ });
+ LOG.info(String.format("Task subscription was added for sessionId = %s, taskId = %s, id = %s",
+ sessionId, taskId, id));
+ } finally {
+ taskIdsLock.unlock();
+ }
+ }
+
+ public void removeId(String sessionId, String id) {
+ taskIds.computeIfPresent(sessionId, (sid, tasks) -> {
+ Iterator<SubscriptionId> iterator = tasks.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().getId().equals(id)) {
+ iterator.remove();
+ LOG.info(String.format("Task subscription was removed for sessionId = %s, id = %s", sessionId, id));
+ }
+ }
+ return tasks;
+ });
+ }
+
+ public void removeTaskId(Long taskId) {
+ try {
+ taskIdsLock.lock();
+ for (String sessionId : taskIds.keySet()) {
+ taskIds.computeIfPresent(sessionId, (id, tasks) -> {
+ Iterator<SubscriptionId> iterator = tasks.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().getTaskId().equals(taskId)) {
+ iterator.remove();
+ LOG.info(String.format("Task subscription was removed for sessionId = %s and taskId = %s",
+ sessionId, taskId));
+ }
+ }
+ return tasks;
+ });
+ }
+ } finally {
+ taskIdsLock.unlock();
+ }
+ }
+
+ public void removeSession(String sessionId) {
+ try {
+ taskIdsLock.lock();
+ taskIds.remove(sessionId);
+ LOG.info(String.format("Task subscriptions were removed for sessionId = %s", sessionId));
+ } finally {
+ taskIdsLock.unlock();
+ }
+ }
+
+ public Optional<Long> matchDestination(String destination) {
+ Optional<Long> taskIdOpt = Optional.of(StringUtils.substringAfter(destination, subscriptionPrefix))
+ .filter(StringUtils::isNotEmpty)
+ .filter(StringUtils::isNumeric)
+ .map(Long::parseLong);
+ return taskIdOpt;
+ }
+
+ public void addDestination(String sessionId, String destination, String id) {
+ Optional<Long> taskIdOpt = matchDestination(destination);
+ if (taskIdOpt.isPresent()) {
+ addTaskId(sessionId, taskIdOpt.get(), id);
+ }
+ }
+
+ public boolean checkTaskId(Long taskId) {
+ for (List<SubscriptionId> ids: taskIds.values()) {
+ for (SubscriptionId subscriptionId : ids) {
+ if (subscriptionId.getTaskId().equals(taskId)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public class SubscriptionId {
+ private final Long taskId;
+ private final String id;
+
+ public SubscriptionId(Long taskId, String id) {
+ this.taskId = taskId;
+ this.id = id;
+ }
+
+ public Long getTaskId() {
+ return taskId;
+ }
+
+ public String getId() {
+ return id;
+ }
+ }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
index e1251d9..87c36c9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
@@ -20,7 +20,6 @@ package org.apache.ambari.server.configuration.spring;
import javax.servlet.ServletContext;
import org.apache.ambari.server.agent.stomp.HeartbeatController;
-import org.apache.ambari.server.api.stomp.TestController;
import org.apache.ambari.server.events.DefaultMessageEmitter;
import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
@@ -41,7 +40,7 @@ import com.google.inject.Injector;
@Configuration
@EnableWebSocketMessageBroker
-@ComponentScan(basePackageClasses = {TestController.class, HeartbeatController.class})
+@ComponentScan(basePackageClasses = {HeartbeatController.class})
@Import({RootStompConfig.class,GuiceBeansConfig.class})
public class AgentStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
private org.apache.ambari.server.configuration.Configuration configuration;
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
index 44479ae..e147200 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
@@ -17,6 +17,7 @@
*/
package org.apache.ambari.server.configuration.spring;
+import org.apache.ambari.server.api.stomp.NamedTasksSubscriptions;
import org.apache.ambari.server.api.stomp.TestController;
import org.apache.ambari.server.events.DefaultMessageEmitter;
import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
@@ -50,6 +51,11 @@ public class ApiStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
return new STOMPUpdateListener(injector, DefaultMessageEmitter.DEFAULT_API_EVENT_TYPES);
}
+ @Bean
+ public NamedTasksSubscriptions namedTasksSubscribtions(Injector injector) {
+ return injector.getInstance(NamedTasksSubscriptions.class);
+ }
+
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/v1")
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
index e9f5c93..48d4fbc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
@@ -43,6 +43,7 @@ public class DefaultMessageEmitter extends MessageEmitter {
put(STOMPEvent.Type.AGENT_CONFIGS, "/configs");
put(STOMPEvent.Type.CONFIGS, "/events/configs");
put(STOMPEvent.Type.HOSTCOMPONENT, "/events/hostcomponents");
+ put(STOMPEvent.Type.NAMEDTASK, "/events/tasks");
put(STOMPEvent.Type.REQUEST, "/events/requests");
put(STOMPEvent.Type.SERVICE, "/events/services");
put(STOMPEvent.Type.HOST, "/events/hosts");
@@ -70,6 +71,7 @@ public class DefaultMessageEmitter extends MessageEmitter {
STOMPEvent.Type.UI_TOPOLOGY,
STOMPEvent.Type.CONFIGS,
STOMPEvent.Type.HOSTCOMPONENT,
+ STOMPEvent.Type.NAMEDTASK,
STOMPEvent.Type.REQUEST,
STOMPEvent.Type.SERVICE,
STOMPEvent.Type.HOST,
@@ -101,6 +103,6 @@ public class DefaultMessageEmitter extends MessageEmitter {
@Override
protected String getDestination(STOMPEvent stompEvent) {
- return DEFAULT_DESTINATIONS.get(stompEvent.getType());
+ return stompEvent.completeDestination(DEFAULT_DESTINATIONS.get(stompEvent.getType()));
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/NamedTaskUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedTaskUpdateEvent.java
new file mode 100644
index 0000000..04ee04e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedTaskUpdateEvent.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.Objects;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Single host role command update info. This update will be sent to all subscribed recipients.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class NamedTaskUpdateEvent extends STOMPEvent {
+
+ private Long id;
+ private Long requestId;
+ private String hostName;
+ private Long endTime;
+ private HostRoleStatus status;
+ private String errorLog;
+ private String outLog;
+ private String stderr;
+ private String stdout;
+
+ @JsonProperty("structured_out")
+ private String structuredOut;
+
+ public NamedTaskUpdateEvent(Long id, Long requestId, String hostName, Long endTime, HostRoleStatus status,
+ String errorLog, String outLog, String stderr, String stdout, String structuredOut) {
+ super(Type.NAMEDTASK);
+ this.id = id;
+ this.requestId = requestId;
+ this.hostName = hostName;
+ this.endTime = endTime;
+ this.status = status;
+ this.errorLog = errorLog;
+ this.outLog = outLog;
+ this.stderr = stderr;
+ this.stdout = stdout;
+ this.structuredOut = structuredOut;
+ }
+
+ public NamedTaskUpdateEvent(HostRoleCommand hostRoleCommand) {
+ this(hostRoleCommand.getTaskId(), hostRoleCommand.getRequestId(), hostRoleCommand.getHostName(),
+ hostRoleCommand.getEndTime(), hostRoleCommand.getStatus(), hostRoleCommand.getErrorLog(),
+ hostRoleCommand.getOutputLog(), hostRoleCommand.getStderr(), hostRoleCommand.getStdout(),
+ hostRoleCommand.getStructuredOut());
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public Long getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(Long requestId) {
+ this.requestId = requestId;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ public Long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(Long endTime) {
+ this.endTime = endTime;
+ }
+
+ public HostRoleStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(HostRoleStatus status) {
+ this.status = status;
+ }
+
+ public String getErrorLog() {
+ return errorLog;
+ }
+
+ public void setErrorLog(String errorLog) {
+ this.errorLog = errorLog;
+ }
+
+ public String getOutLog() {
+ return outLog;
+ }
+
+ public void setOutLog(String outLog) {
+ this.outLog = outLog;
+ }
+
+ public String getStderr() {
+ return stderr;
+ }
+
+ public void setStderr(String stderr) {
+ this.stderr = stderr;
+ }
+
+ public String getStdout() {
+ return stdout;
+ }
+
+ public void setStdout(String stdout) {
+ this.stdout = stdout;
+ }
+
+ public String getStructuredOut() {
+ return structuredOut;
+ }
+
+ public void setStructuredOut(String structuredOut) {
+ this.structuredOut = structuredOut;
+ }
+
+ @Override
+ public String completeDestination(String destination) {
+ return destination + "/" + id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ NamedTaskUpdateEvent that = (NamedTaskUpdateEvent) o;
+ return Objects.equals(id, that.id) &&
+ Objects.equals(requestId, that.requestId) &&
+ Objects.equals(hostName, that.hostName) &&
+ Objects.equals(endTime, that.endTime) &&
+ status == that.status &&
+ Objects.equals(errorLog, that.errorLog) &&
+ Objects.equals(outLog, that.outLog) &&
+ Objects.equals(stderr, that.stderr) &&
+ Objects.equals(stdout, that.stdout) &&
+ Objects.equals(structuredOut, that.structuredOut);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, requestId, hostName, endTime, status, errorLog, outLog, stderr, stdout, structuredOut);
+ }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
index 15c3b1e..f3e119f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
@@ -54,6 +54,7 @@ public abstract class STOMPEvent {
CONFIGS("events.configs"),
HOSTCOMPONENT("events.hostcomponents"),
NAMEDHOSTCOMPONENT("events.hostrolecommands.named"),
+ NAMEDTASK("events.tasks.named"),
REQUEST("events.requests"),
SERVICE("events.services"),
HOST("events.hosts"),
@@ -76,4 +77,8 @@ public abstract class STOMPEvent {
return metricName;
}
}
+
+ public String completeDestination(String destination) {
+ return destination;
+ }
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
index b188729..73aa533 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -34,7 +34,9 @@ import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.actionmanager.Request;
import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.api.stomp.NamedTasksSubscriptions;
import org.apache.ambari.server.controller.internal.CalculatedStatus;
+import org.apache.ambari.server.events.NamedTaskUpdateEvent;
import org.apache.ambari.server.events.RequestUpdateEvent;
import org.apache.ambari.server.events.TaskCreateEvent;
import org.apache.ambari.server.events.TaskUpdateEvent;
@@ -95,12 +97,15 @@ public class TaskStatusListener {
private STOMPUpdatePublisher STOMPUpdatePublisher;
+ private NamedTasksSubscriptions namedTasksSubscriptions;
+
@Inject
public TaskStatusListener(TaskEventPublisher taskEventPublisher, StageDAO stageDAO, RequestDAO requestDAO,
- STOMPUpdatePublisher STOMPUpdatePublisher) {
+ STOMPUpdatePublisher STOMPUpdatePublisher, NamedTasksSubscriptions namedTasksSubscriptions) {
this.stageDAO = stageDAO;
this.requestDAO = requestDAO;
this.STOMPUpdatePublisher = STOMPUpdatePublisher;
+ this.namedTasksSubscriptions = namedTasksSubscriptions;
taskEventPublisher.register(this);
}
@@ -129,6 +134,7 @@ public class TaskStatusListener {
Set<StageEntityPK> stagesWithReceivedTaskStatus = new HashSet<>();
Set<Long> requestIdsWithReceivedTaskStatus = new HashSet<>();
Set<RequestUpdateEvent> requestsToPublish = new HashSet<>();
+ Set<NamedTaskUpdateEvent> namedTasksToPublish = new HashSet<>();
for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) {
Long reportedTaskId = hostRoleCommand.getTaskId();
@@ -143,6 +149,17 @@ public class TaskStatusListener {
stagesWithReceivedTaskStatus.add(stageEntityPK);
requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
+ NamedTaskUpdateEvent namedTaskUpdateEvent = new NamedTaskUpdateEvent(hostRoleCommand);
+ if (namedTasksSubscriptions.checkTaskId(reportedTaskId)
+ && !namedTaskUpdateEvent.equals(new NamedTaskUpdateEvent(activeTasksMap.get(reportedTaskId)))) {
+ namedTasksToPublish.add(namedTaskUpdateEvent);
+ }
+
+ // unsubscribe on complete (no any update will be sent anyway)
+ if (hostRoleCommand.getStatus().equals(HostRoleStatus.COMPLETED)) {
+ namedTasksSubscriptions.removeTaskId(reportedTaskId);
+ }
+
if (!activeTasksMap.get(reportedTaskId).getStatus().equals(hostRoleCommand.getStatus())) {
// Ignore requests not related to any cluster. "requests" topic is used for cluster requests only.
Long clusterId = activeRequestMap.get(hostRoleCommand.getRequestId()).getClusterId();
@@ -178,6 +195,10 @@ public class TaskStatusListener {
for (RequestUpdateEvent requestToPublish : requestsToPublish) {
STOMPUpdatePublisher.publish(requestToPublish);
}
+ for (NamedTaskUpdateEvent namedTaskUpdateEvent : namedTasksToPublish) {
+ LOG.info(String.format("NamedTaskUpdateEvent with id %s will be send", namedTaskUpdateEvent.getId()));
+ STOMPUpdatePublisher.publish(namedTaskUpdateEvent);
+ }
}
/**
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptionsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptionsTest.java
new file mode 100644
index 0000000..2107c41
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptionsTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.stomp;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Provider;
+
+public class NamedTasksSubscriptionsTest {
+ private static final String SESSION_ID_1 = "fdsg3";
+ private static final String SESSION_ID_2 = "idfg6";
+
+ private NamedTasksSubscriptions tasksSubscriptions;
+ private Provider<TaskStatusListener> taskStatusListenerProvider;
+ private TaskStatusListener taskStatusListener;
+
+ @Before
+ public void setupTest() {
+ taskStatusListenerProvider = createMock(Provider.class);
+ taskStatusListener = createMock(TaskStatusListener.class);
+
+ Map<Long, HostRoleCommand> hostRoleCommands = new HashMap<>();
+ HostRoleCommand hostRoleCommand1 = createMock(HostRoleCommand.class);
+ HostRoleCommand hostRoleCommand4 = createMock(HostRoleCommand.class);
+ HostRoleCommand hostRoleCommand5 = createMock(HostRoleCommand.class);
+
+ expect(hostRoleCommand1.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS).anyTimes();
+ expect(hostRoleCommand4.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS).anyTimes();
+ expect(hostRoleCommand5.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS).anyTimes();
+
+ hostRoleCommands.put(1L, hostRoleCommand1);
+ hostRoleCommands.put(4L, hostRoleCommand4);
+ hostRoleCommands.put(5L, hostRoleCommand5);
+ expect(taskStatusListener.getActiveTasksMap()).andReturn(hostRoleCommands).anyTimes();
+ expect(taskStatusListenerProvider.get()).andReturn(taskStatusListener).anyTimes();
+
+ replay(taskStatusListenerProvider, taskStatusListener, hostRoleCommand1, hostRoleCommand4, hostRoleCommand5);
+ tasksSubscriptions = new NamedTasksSubscriptions(taskStatusListenerProvider);
+ tasksSubscriptions.addTaskId(SESSION_ID_1, 1L, "sub-1");
+ tasksSubscriptions.addTaskId(SESSION_ID_1, 5L, "sub-5");
+ tasksSubscriptions.addTaskId(SESSION_ID_2, 1L, "sub-1");
+ tasksSubscriptions.addTaskId(SESSION_ID_2, 4L, "sub-4");
+ }
+
+ @Test
+ public void testMatching() {
+ Optional<Long> taskIdOpt = tasksSubscriptions.matchDestination("/events/tasks/1");
+ assertTrue(taskIdOpt.isPresent());
+ assertEquals(1L, taskIdOpt.get().longValue());
+ assertFalse(tasksSubscriptions.matchDestination("/events/topologies").isPresent());
+ }
+
+ @Test
+ public void testCheckId() {
+ assertTrue(tasksSubscriptions.checkTaskId(1L));
+ assertTrue(tasksSubscriptions.checkTaskId(4L));
+ assertTrue(tasksSubscriptions.checkTaskId(5L));
+ assertFalse(tasksSubscriptions.checkTaskId(2L));
+ }
+
+ @Test
+ public void testRemoveBySessionId() {
+ tasksSubscriptions.removeSession(SESSION_ID_1);
+ assertTrue(tasksSubscriptions.checkTaskId(1L));
+ assertTrue(tasksSubscriptions.checkTaskId(4L));
+ assertFalse(tasksSubscriptions.checkTaskId(5L));
+
+ tasksSubscriptions.removeSession(SESSION_ID_2);
+ assertFalse(tasksSubscriptions.checkTaskId(1L));
+ assertFalse(tasksSubscriptions.checkTaskId(4L));
+ assertFalse(tasksSubscriptions.checkTaskId(5L));
+ }
+
+ @Test
+ public void testRemoveById() {
+ tasksSubscriptions.removeId(SESSION_ID_1, "sub-1");
+ assertTrue(tasksSubscriptions.checkTaskId(1L));
+ assertTrue(tasksSubscriptions.checkTaskId(4L));
+ assertTrue(tasksSubscriptions.checkTaskId(5L));
+
+ tasksSubscriptions.removeId(SESSION_ID_1, "sub-5");
+ assertTrue(tasksSubscriptions.checkTaskId(1L));
+ assertTrue(tasksSubscriptions.checkTaskId(4L));
+ assertFalse(tasksSubscriptions.checkTaskId(5L));
+
+ tasksSubscriptions.removeId(SESSION_ID_2, "sub-1");
+ assertFalse(tasksSubscriptions.checkTaskId(1L));
+ assertTrue(tasksSubscriptions.checkTaskId(4L));
+ assertFalse(tasksSubscriptions.checkTaskId(5L));
+
+ tasksSubscriptions.removeId(SESSION_ID_2, "sub-4");
+ assertFalse(tasksSubscriptions.checkTaskId(1L));
+ assertFalse(tasksSubscriptions.checkTaskId(4L));
+ assertFalse(tasksSubscriptions.checkTaskId(5L));
+ }
+
+ @Test
+ public void testAddDestination() {
+ tasksSubscriptions = new NamedTasksSubscriptions(taskStatusListenerProvider);
+ tasksSubscriptions.addDestination(SESSION_ID_1, "/events/tasks/1", "sub-1");
+ assertTrue(tasksSubscriptions.checkTaskId(1L));
+ assertFalse(tasksSubscriptions.checkTaskId(4L));
+ assertFalse(tasksSubscriptions.checkTaskId(5L));
+
+ tasksSubscriptions.addDestination(SESSION_ID_1, "/events/tasks/5", "sub-5");
+ assertTrue(tasksSubscriptions.checkTaskId(1L));
+ assertFalse(tasksSubscriptions.checkTaskId(4L));
+ assertTrue(tasksSubscriptions.checkTaskId(5L));
+
+ tasksSubscriptions.addDestination(SESSION_ID_2, "/events/tasks/1", "sub-1");
+ assertTrue(tasksSubscriptions.checkTaskId(1L));
+ assertFalse(tasksSubscriptions.checkTaskId(4L));
+ assertTrue(tasksSubscriptions.checkTaskId(5L));
+
+ tasksSubscriptions.addDestination(SESSION_ID_2, "/events/tasks/4", "sub-4");
+ assertTrue(tasksSubscriptions.checkTaskId(1L));
+ assertTrue(tasksSubscriptions.checkTaskId(4L));
+ assertTrue(tasksSubscriptions.checkTaskId(5L));
+ }
+}
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
index 6e62ef4..03e0655 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
@@ -20,7 +20,10 @@ package org.apache.ambari.server.events.listeners.tasks;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
import java.util.ArrayList;
import java.util.Collections;
@@ -32,6 +35,8 @@ import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.actionmanager.ExecutionCommandWrapperFactory;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.api.stomp.NamedTasksSubscriptions;
+import org.apache.ambari.server.events.NamedTaskUpdateEvent;
import org.apache.ambari.server.events.TaskCreateEvent;
import org.apache.ambari.server.events.TaskUpdateEvent;
import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
@@ -44,12 +49,14 @@ import org.apache.ambari.server.orm.entities.RequestEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.orm.entities.StageEntityPK;
import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Test;
import com.google.inject.Inject;
+import com.google.inject.Provider;
public class TaskStatusListenerTest extends EasyMockSupport {
@@ -93,6 +100,7 @@ public class TaskStatusListenerTest extends EasyMockSupport {
StageEntity stageEntity = createNiceMock(StageEntity.class);
RequestEntity requestEntity = createNiceMock(RequestEntity.class);
STOMPUpdatePublisher statePublisher = createNiceMock(STOMPUpdatePublisher.class);
+ NamedTasksSubscriptions namedTasksSubscriptions = createNiceMock(NamedTasksSubscriptions.class);
EasyMock.expect(stageEntity.getStatus()).andReturn(hostRoleStatus).anyTimes();;
EasyMock.expect(stageEntity.getDisplayStatus()).andReturn(hostRoleStatus).anyTimes();
EasyMock.expect(stageEntity.isSkippable()).andReturn(Boolean.FALSE).anyTimes();;
@@ -110,9 +118,11 @@ public class TaskStatusListenerTest extends EasyMockSupport {
EasyMock.replay(stageDAO);
EasyMock.replay(requestDAO);
EasyMock.replay(statePublisher);
+ EasyMock.replay(namedTasksSubscriptions);
TaskCreateEvent event = new TaskCreateEvent(hostRoleCommands);
- TaskStatusListener listener = new TaskStatusListener(publisher,stageDAO,requestDAO,statePublisher);
+ TaskStatusListener listener = new TaskStatusListener(publisher, stageDAO, requestDAO, statePublisher,
+ namedTasksSubscriptions);
Assert.assertTrue(listener.getActiveTasksMap().isEmpty());
Assert.assertTrue(listener.getActiveStageMap().isEmpty());
@@ -165,4 +175,84 @@ public class TaskStatusListenerTest extends EasyMockSupport {
verifyAll();
}
+ @Test
+ public void testNamedTasksEnabled() {
+ final Long taskId = 1L;
+ final Long requestId = 2L;
+ final HostRoleStatus status = HostRoleStatus.COMPLETED;
+ final String stderr = "gW$%SGFbhzsdfHBzdffdfd";
+ final String stdout = "gW$%gTESJ KHBjzdkfjbgv";
+ final String errorLog = " wTHT J YHKtjgsjgbvklfj";
+ final String outputLog = "546ky3kt%V$WYk4tgs5xzs";
+
+ Provider<TaskStatusListener> taskStatusListenerProvider = createMock(Provider.class);
+
+ NamedTasksSubscriptions namedTasksSubscriptions = new NamedTasksSubscriptions(taskStatusListenerProvider);
+
+ Capture<NamedTaskUpdateEvent> namedTaskUpdateEventCapture = Capture.newInstance();
+ STOMPUpdatePublisher stompUpdatePublisher = createStrictMock(STOMPUpdatePublisher.class);
+ stompUpdatePublisher.publish(capture(namedTaskUpdateEventCapture));
+ expectLastCall();
+
+ ServiceComponentHostEvent serviceComponentHostEvent = createNiceMock(ServiceComponentHostEvent.class);
+ HostDAO hostDAO = createNiceMock(HostDAO.class);
+
+ EasyMock.replay(hostDAO);
+ EasyMock.replay(serviceComponentHostEvent);
+
+ List<HostRoleCommand> updateHostRolesCommands = new ArrayList<>();
+ HostRoleCommand updateHostRoleCommand = new HostRoleCommand("hostName", Role.DATANODE,
+ serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory);
+ updateHostRoleCommand.setStatus(status);
+ updateHostRoleCommand.setRequestId(requestId);
+ updateHostRoleCommand.setStageId(3L);
+ updateHostRoleCommand.setTaskId(taskId);
+ updateHostRoleCommand.setStderr(stderr);
+ updateHostRoleCommand.setStdout(stdout);
+ updateHostRoleCommand.setErrorLog(errorLog);
+ updateHostRoleCommand.setOutputLog(outputLog);
+ updateHostRolesCommands.add(updateHostRoleCommand);
+
+ StageDAO stageDAO = createNiceMock(StageDAO.class);
+ RequestDAO requestDAO = createNiceMock(RequestDAO.class);
+
+ EasyMock.replay(stageDAO);
+ EasyMock.replay(requestDAO);
+ EasyMock.replay(stompUpdatePublisher);
+
+ TaskStatusListener listener = new TaskStatusListener(publisher, stageDAO, requestDAO, stompUpdatePublisher,
+ namedTasksSubscriptions);
+
+ expect(taskStatusListenerProvider.get()).andReturn(listener);
+
+ EasyMock.replay(taskStatusListenerProvider);
+
+ // subscribe for task
+ namedTasksSubscriptions.addTaskId("", taskId, "sub-1");
+
+ // add dummy host role command as active
+ // status should be the same to avoid request update event firing
+ HostRoleCommand activeHostRoleCommand = new HostRoleCommand("hostName", Role.DATANODE,
+ serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory);
+ activeHostRoleCommand.setStatus(status);
+ listener.getActiveTasksMap().put(taskId, activeHostRoleCommand);
+
+ listener.onTaskUpdateEvent(new TaskUpdateEvent(updateHostRolesCommands));
+
+ Assert.assertNotNull(namedTaskUpdateEventCapture.getValues());
+ Assert.assertEquals(1L, namedTaskUpdateEventCapture.getValues().size());
+
+ NamedTaskUpdateEvent capturedEvent = namedTaskUpdateEventCapture.getValue();
+
+ Assert.assertEquals(taskId, capturedEvent.getId());
+ Assert.assertEquals(requestId, capturedEvent.getRequestId());
+ Assert.assertEquals(status, capturedEvent.getStatus());
+ Assert.assertEquals(stderr, capturedEvent.getStderr());
+ Assert.assertEquals(stdout, capturedEvent.getStdout());
+ Assert.assertEquals(errorLog, capturedEvent.getErrorLog());
+ Assert.assertEquals(outputLog, capturedEvent.getOutLog());
+
+ verifyAll();
+ }
+
}