You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/02/17 22:06:44 UTC
[45/50] [abbrv] ambari git commit: AMBARI-18868. Stage and Request
status should be persisted in the database. (jaimin)
AMBARI-18868. Stage and Request status should be persisted in the database. (jaimin)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0fc7a667
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0fc7a667
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0fc7a667
Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 0fc7a6671feb10dc0e8475dc4878942cf19f46cc
Parents: dd174f4
Author: Jaimin Jetly <ja...@hortonworks.com>
Authored: Fri Feb 17 09:31:10 2017 -0800
Committer: Jaimin Jetly <ja...@hortonworks.com>
Committed: Fri Feb 17 09:31:10 2017 -0800
----------------------------------------------------------------------
.../actionmanager/ActionDBAccessorImpl.java | 108 ++--
.../server/actionmanager/ActionScheduler.java | 31 +
.../ambari/server/actionmanager/Request.java | 8 +-
.../ambari/server/actionmanager/Stage.java | 25 +
.../controller/internal/CalculatedStatus.java | 390 +++++++++++-
.../ambari/server/events/TaskCreateEvent.java | 48 ++
.../apache/ambari/server/events/TaskEvent.java | 66 ++
.../ambari/server/events/TaskUpdateEvent.java | 35 ++
.../listeners/tasks/TaskStatusListener.java | 609 +++++++++++++++++++
.../events/publishers/TaskEventPublisher.java | 62 ++
.../server/orm/dao/HostRoleCommandDAO.java | 67 +-
.../ambari/server/orm/dao/RequestDAO.java | 8 +
.../apache/ambari/server/orm/dao/StageDAO.java | 32 +-
.../orm/entities/HostRoleCommandEntity.java | 4 +-
.../server/orm/entities/RequestEntity.java | 49 +-
.../ambari/server/orm/entities/StageEntity.java | 70 ++-
.../server/orm/entities/StageEntityPK.java | 12 +
.../server/upgrade/UpgradeCatalog300.java | 70 +++
.../main/resources/Ambari-DDL-Derby-CREATE.sql | 7 +-
.../main/resources/Ambari-DDL-MySQL-CREATE.sql | 7 +-
.../main/resources/Ambari-DDL-Oracle-CREATE.sql | 7 +-
.../resources/Ambari-DDL-Postgres-CREATE.sql | 7 +-
.../resources/Ambari-DDL-SQLAnywhere-CREATE.sql | 7 +-
.../resources/Ambari-DDL-SQLServer-CREATE.sql | 7 +-
.../actionmanager/TestActionDBAccessorImpl.java | 3 +-
.../actionmanager/TestActionScheduler.java | 71 ++-
.../alerts/AmbariPerformanceRunnableTest.java | 7 +-
.../internal/UpgradeResourceProviderTest.java | 1 -
.../UpgradeSummaryResourceProviderTest.java | 1 -
.../listeners/tasks/TaskStatusListenerTest.java | 164 +++++
.../ambari/server/state/ConfigHelperTest.java | 2 +
.../cluster/ClusterEffectiveVersionTest.java | 5 +-
.../services/RetryUpgradeActionServiceTest.java | 1 -
.../server/upgrade/UpgradeCatalog300Test.java | 20 +
34 files changed, 1892 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 7881a4b..b813fe6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -45,7 +45,9 @@ import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.internal.CalculatedStatus;
import org.apache.ambari.server.events.HostsRemovedEvent;
import org.apache.ambari.server.events.RequestFinishedEvent;
+import org.apache.ambari.server.events.TaskCreateEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
import org.apache.ambari.server.orm.dao.HostDAO;
@@ -130,6 +132,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
AmbariEventPublisher ambariEventPublisher;
@Inject
+ TaskEventPublisher taskEventPublisher;
+
+ @Inject
AuditLogger auditLogger;
/**
@@ -205,8 +210,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
public Collection<HostRoleCommandEntity> abortOperation(long requestId) {
long now = System.currentTimeMillis();
- endRequest(requestId);
-
// only request commands which actually need to be aborted; requesting all
// commands here can cause OOM problems during large requests like upgrades
List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequestIdAndStatuses(requestId,
@@ -228,7 +231,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
if (!commands.isEmpty()) {
return hostRoleCommandDAO.mergeAll(commands);
}
-
+ endRequest(requestId);
return Collections.emptyList();
}
@@ -283,7 +286,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
@Override
@Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
public List<Stage> getStagesInProgress() {
- List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(
+ List<StageEntity> stageEntities = stageDAO.findByStatuses(
HostRoleStatus.IN_PROGRESS_STATUSES);
return getStagesForEntities(stageEntities);
}
@@ -343,6 +346,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
RequestEntity requestEntity = request.constructNewPersistenceEntity();
Long clusterId = -1L;
+ Long requestId = requestEntity.getRequestId();
ClusterEntity clusterEntity = clusterDAO.findById(request.getClusterId());
if (clusterEntity != null) {
clusterId = clusterEntity.getClusterId();
@@ -356,8 +360,11 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
addRequestToAuditlogCache(request);
+ List<HostRoleCommand> hostRoleCommands = new ArrayList<>();
+
for (Stage stage : request.getStages()) {
StageEntity stageEntity = stage.constructNewPersistenceEntity();
+ Long stageId = stageEntity.getStageId();
stageEntities.add(stageEntity);
stageEntity.setClusterId(clusterId);
stageEntity.setRequest(requestEntity);
@@ -366,6 +373,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
List<HostRoleCommand> orderedHostRoleCommands = stage.getOrderedHostRoleCommands();
for (HostRoleCommand hostRoleCommand : orderedHostRoleCommands) {
+ hostRoleCommand.setRequestId(requestId);
+ hostRoleCommand.setStageId(stageId);
HostRoleCommandEntity hostRoleCommandEntity = hostRoleCommand.constructNewPersistenceEntity();
hostRoleCommandEntity.setStage(stageEntity);
hostRoleCommandDAO.create(hostRoleCommandEntity);
@@ -415,11 +424,12 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
hostRoleCommandEntity.setExecutionCommand(executionCommandEntity);
executionCommandDAO.create(hostRoleCommandEntity.getExecutionCommand());
- hostRoleCommandEntity = hostRoleCommandDAO.merge(hostRoleCommandEntity);
+ hostRoleCommandEntity = hostRoleCommandDAO.mergeWithoutPublishEvent(hostRoleCommandEntity);
if (null != hostEntity) {
hostEntity = hostDAO.merge(hostEntity);
}
+ hostRoleCommands.add(hostRoleCommand);
}
for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
@@ -431,6 +441,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
requestEntity.setStages(stageEntities);
requestDAO.merge(requestEntity);
+
+ TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
+ taskEventPublisher.publish(taskCreateEvent);
}
@Override
@@ -497,66 +510,55 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
long now = System.currentTimeMillis();
List<Long> requestsToCheck = new ArrayList<Long>();
- List<Long> abortedCommandUpdates = new ArrayList<Long>();
List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet());
+ List<HostRoleCommandEntity> commandEntitiesToMerge = new ArrayList<HostRoleCommandEntity>();
for (HostRoleCommandEntity commandEntity : commandEntities) {
CommandReport report = taskReports.get(commandEntity.getTaskId());
-
- boolean statusChanged = false;
-
- switch (commandEntity.getStatus()) {
- case ABORTED:
- // We don't want to overwrite statuses for ABORTED tasks with
- // statuses that have been received from the agent after aborting task
- abortedCommandUpdates.add(commandEntity.getTaskId());
- break;
- default:
- HostRoleStatus status = HostRoleStatus.valueOf(report.getStatus());
- // if FAILED and marked for holding then set status = HOLDING_FAILED
- if (status == HostRoleStatus.FAILED && commandEntity.isRetryAllowed()) {
- status = HostRoleStatus.HOLDING_FAILED;
-
- // tasks can be marked as skipped when they fail
- if (commandEntity.isFailureAutoSkipped()) {
- status = HostRoleStatus.SKIPPED_FAILED;
- }
+ HostRoleStatus existingTaskStatus = commandEntity.getStatus();
+ HostRoleStatus reportedTaskStatus = HostRoleStatus.valueOf(report.getStatus());
+ if (!existingTaskStatus.isCompletedState() || existingTaskStatus == HostRoleStatus.ABORTED) {
+ // if FAILED and marked for holding then set reportedTaskStatus = HOLDING_FAILED
+ if (reportedTaskStatus == HostRoleStatus.FAILED && commandEntity.isRetryAllowed()) {
+ reportedTaskStatus = HostRoleStatus.HOLDING_FAILED;
+
+ // tasks can be marked as skipped when they fail
+ if (commandEntity.isFailureAutoSkipped()) {
+ reportedTaskStatus = HostRoleStatus.SKIPPED_FAILED;
}
-
- commandEntity.setStatus(status);
- statusChanged = true;
- break;
- }
-
- commandEntity.setStdOut(report.getStdOut().getBytes());
- commandEntity.setStdError(report.getStdErr().getBytes());
- commandEntity.setStructuredOut(report.getStructuredOut() == null ? null :
- report.getStructuredOut().getBytes());
- commandEntity.setExitcode(report.getExitCode());
-
- if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
- commandEntity.setEndTime(now);
-
- String actionId = report.getActionId();
- long[] requestStageIds = StageUtils.getRequestStage(actionId);
- long requestId = requestStageIds[0];
- long stageId = requestStageIds[1];
- if(statusChanged) {
- auditLog(commandEntity, requestId);
}
- if (requestDAO.getLastStageId(requestId).equals(stageId)) {
- requestsToCheck.add(requestId);
+ if (!existingTaskStatus.isCompletedState()) {
+ commandEntity.setStatus(reportedTaskStatus);
}
+ commandEntity.setStdOut(report.getStdOut().getBytes());
+ commandEntity.setStdError(report.getStdErr().getBytes());
+ commandEntity.setStructuredOut(report.getStructuredOut() == null ? null :
+ report.getStructuredOut().getBytes());
+ commandEntity.setExitcode(report.getExitCode());
+ if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
+ commandEntity.setEndTime(now);
+
+ String actionId = report.getActionId();
+ long[] requestStageIds = StageUtils.getRequestStage(actionId);
+ long requestId = requestStageIds[0];
+ long stageId = requestStageIds[1];
+ auditLog(commandEntity, requestId);
+ if (requestDAO.getLastStageId(requestId).equals(stageId)) {
+ requestsToCheck.add(requestId);
+ }
+ }
+ commandEntitiesToMerge.add(commandEntity);
+ } else {
+ LOG.warn(String.format("Request for invalid transition of host role command status received for task id %d from " +
+ "agent: %s -> %s",commandEntity.getTaskId(),existingTaskStatus,reportedTaskStatus));
}
}
// no need to merge if there's nothing to merge
- if (!commandEntities.isEmpty()) {
- hostRoleCommandDAO.mergeAll(commandEntities);
+ if (!commandEntitiesToMerge.isEmpty()) {
+ hostRoleCommandDAO.mergeAll(commandEntitiesToMerge);
}
- // Invalidate cache because of updates to ABORTED commands
- hostRoleCommandCache.invalidateAll(abortedCommandUpdates);
for (Long requestId : requestsToCheck) {
endRequestIfCompleted(requestId);
@@ -923,7 +925,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
return HostRoleStatus.QUEUED;
}
Collection<HostRoleStatus> taskStatuses = details.getTaskStatuses();
- return CalculatedStatus.calculateSummaryStatusOfStage(CalculatedStatus.calculateStatusCounts(taskStatuses), numberOfTasks, false);
+ return CalculatedStatus.calculateSummaryStatus(CalculatedStatus.calculateStatusCounts(taskStatuses), numberOfTasks, false);
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 680c0a6..a92c03c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.persistence.EntityManager;
@@ -49,6 +50,7 @@ import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.events.publishers.JPAEventPublisher;
import org.apache.ambari.server.metadata.RoleCommandOrder;
@@ -75,10 +77,13 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.eventbus.Subscribe;
import com.google.common.reflect.TypeToken;
@@ -179,6 +184,9 @@ class ActionScheduler implements Runnable {
* we receive awake() request during running a scheduler iteration.
*/
private boolean activeAwakeRequest = false;
+
+ private AtomicBoolean taskStatusLoaded = new AtomicBoolean();
+
//Cache for clusterHostinfo, key - stageId-requestId
private Cache<String, Map<String, Set<String>>> clusterHostInfoCache;
private Cache<String, Map<String, String>> commandParamsStageCache;
@@ -353,6 +361,8 @@ class ActionScheduler implements Runnable {
LOG.debug("Processing {} in progress stages ", stages.size());
}
+ publishInProgressTasks(stages);
+
if (stages.isEmpty()) {
// Nothing to do
if (LOG.isDebugEnabled()) {
@@ -532,6 +542,27 @@ class ActionScheduler implements Runnable {
}
}
+ /**
+ * publish event to load {@link TaskStatusListener#activeTasksMap} {@link TaskStatusListener#activeStageMap}
+ * and {@link TaskStatusListener#activeRequestMap} for all running request once during server startup.
+ * This is required as some tasks may have been in progress when server was last stopped
+ * @param stages list of stages
+ */
+ private void publishInProgressTasks(List<Stage> stages) {
+ if (taskStatusLoaded.compareAndSet(false, true)) {
+ if (!stages.isEmpty()) {
+ Function<Stage, Long> transform = new Function<Stage, Long>() {
+ @Override
+ public Long apply(Stage stage) {
+ return stage.getRequestId();
+ }
+ };
+ Set<Long> runningRequestID = ImmutableSet.copyOf(Lists.transform(stages, transform));
+ List<HostRoleCommand> hostRoleCommands = db.getAllTasksByRequestIds(runningRequestID);
+ hostRoleCommandDAO.publishTaskCreateEvent(hostRoleCommands);
+ }
+ }
+ }
/**
* Returns the list of hosts that have a task assigned
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
index 31e11c1..502c016 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
@@ -71,7 +71,8 @@ public class Request {
* As of now, this field is not used. Request status is
* calculated at RequestResourceProvider on the fly.
*/
- private HostRoleStatus status; // not persisted yet
+ private HostRoleStatus status = HostRoleStatus.PENDING;
+ private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
private String inputs;
private List<RequestResourceFilter> resourceFilters;
private RequestOperationLevel operationLevel;
@@ -186,6 +187,7 @@ public class Request {
this.requestType = entity.getRequestType();
this.commandName = entity.getCommandName();
this.status = entity.getStatus();
+ this.displayStatus = entity.getDisplayStatus();
if (entity.getRequestScheduleEntity() != null) {
this.requestScheduleId = entity.getRequestScheduleEntity().getScheduleId();
}
@@ -241,6 +243,8 @@ public class Request {
requestEntity.setInputs(inputs);
requestEntity.setRequestType(requestType);
requestEntity.setRequestScheduleId(requestScheduleId);
+ requestEntity.setStatus(status);
+ requestEntity.setDisplayStatus(displayStatus);
//TODO set all fields
if (resourceFilters != null) {
@@ -381,6 +385,8 @@ public class Request {
", startTime=" + startTime +
", endTime=" + endTime +
", inputs='" + inputs + '\'' +
+ ", status='" + status + '\'' +
+ ", displayStatus='" + displayStatus + '\'' +
", resourceFilters='" + resourceFilters + '\'' +
", operationLevel='" + operationLevel + '\'' +
", requestType=" + requestType +
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index 4a05b32..f7ceca2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -74,6 +74,8 @@ public class Stage {
private long stageId = -1;
private final String logDir;
private final String requestContext;
+ private HostRoleStatus status = HostRoleStatus.PENDING;
+ private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
private String clusterHostInfo;
private String commandParamsStage;
private String hostParamsStage;
@@ -157,6 +159,8 @@ public class Stage {
commandParamsStage = stageEntity.getCommandParamsStage();
hostParamsStage = stageEntity.getHostParamsStage();
commandExecutionType = stageEntity.getCommandExecutionType();
+ status = stageEntity.getStatus();
+ displayStatus = stageEntity.getDisplayStatus();
List<Long> taskIds = hostRoleCommandDAO.findTaskIdsByStage(requestId, stageId);
Collection<HostRoleCommand> commands = dbAccessor.getTasks(taskIds);
@@ -197,6 +201,8 @@ public class Stage {
stageEntity.setCommandParamsStage(commandParamsStage);
stageEntity.setHostParamsStage(hostParamsStage);
stageEntity.setCommandExecutionType(commandExecutionType);
+ stageEntity.setStatus(status);
+ stageEntity.setDisplayStatus(displayStatus);
for (Role role : successFactors.keySet()) {
RoleSuccessCriteriaEntity roleSuccessCriteriaEntity = new RoleSuccessCriteriaEntity();
@@ -290,6 +296,23 @@ public class Stage {
this.commandExecutionType = commandExecutionType;
}
+ /**
+ * get current status of the stage
+ * @return {@link HostRoleStatus}
+ */
+ public HostRoleStatus getStatus() {
+ return status;
+ }
+
+ /**
+ * sets status of the stage
+ * @param status {@link HostRoleStatus}
+ */
+ public void setStatus(HostRoleStatus status) {
+ this.status = status;
+ }
+
+
public synchronized void setStageId(long stageId) {
if (this.stageId != -1) {
throw new RuntimeException("Attempt to set stageId again! Not allowed.");
@@ -915,6 +938,8 @@ public class Stage {
builder.append("clusterHostInfo="+clusterHostInfo+"\n");
builder.append("commandParamsStage="+commandParamsStage+"\n");
builder.append("hostParamsStage="+hostParamsStage+"\n");
+ builder.append("status="+status+"\n");
+ builder.append("displayStatus="+displayStatus+"\n");
builder.append("Success Factors:\n");
for (Role r : successFactors.keySet()) {
builder.append(" role: "+r+", factor: "+successFactors.get(r)+"\n");
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
index 3c415df..32dd03d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
@@ -26,12 +26,20 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.ambari.server.Role;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
/**
* Status of a request resource, calculated from a set of tasks or stages.
@@ -142,7 +150,7 @@ public class CalculatedStatus {
Map<HostRoleStatus, Integer> taskStatusCounts = CalculatedStatus.calculateTaskEntityStatusCounts(tasks);
- HostRoleStatus status = calculateSummaryStatusOfStage(taskStatusCounts, size, skippable);
+ HostRoleStatus status = calculateSummaryStatus(taskStatusCounts, size, skippable);
double progressPercent = calculateProgressPercent(taskStatusCounts, size);
@@ -167,7 +175,7 @@ public class CalculatedStatus {
// calculate the stage status from the task status counts
HostRoleStatus stageStatus =
- calculateSummaryStatusOfStage(calculateTaskEntityStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
+ calculateSummaryStatus(calculateTaskEntityStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
stageStatuses.add(stageStatus);
@@ -203,7 +211,7 @@ public class CalculatedStatus {
// calculate the stage status from the task status counts
HostRoleStatus stageStatus =
- calculateSummaryStatusOfStage(calculateTaskStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
+ calculateSummaryStatus(calculateTaskStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
stageStatuses.add(stageStatus);
@@ -256,6 +264,126 @@ public class CalculatedStatus {
}
/**
+ * Returns counts of tasks that are in various states.
+ *
+ * @param hostRoleCommands collection of beans {@link HostRoleCommand}
+ *
+ * @return a map of counts of tasks keyed by the task status
+ */
+ public static Map<HostRoleStatus, Integer> calculateStatusCountsForTasks(Collection<HostRoleCommand> hostRoleCommands) {
+ Map<HostRoleStatus, Integer> counters = new HashMap<>();
+ // initialize
+ for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+ counters.put(hostRoleStatus, 0);
+ }
+ // calculate counts
+ for (HostRoleCommand hrc : hostRoleCommands) {
+ // count tasks where isCompletedState() == true as COMPLETED
+ // but don't count tasks with COMPLETED status twice
+ if (hrc.getStatus().isCompletedState() && hrc.getStatus() != HostRoleStatus.COMPLETED) {
+ // Increase total number of completed tasks;
+ counters.put(HostRoleStatus.COMPLETED, counters.get(HostRoleStatus.COMPLETED) + 1);
+ }
+ // Increment counter for particular status
+ counters.put(hrc.getStatus(), counters.get(hrc.getStatus()) + 1);
+ }
+
+ // We overwrite the value to have the sum converged
+ counters.put(HostRoleStatus.IN_PROGRESS,
+ hostRoleCommands.size() -
+ counters.get(HostRoleStatus.COMPLETED) -
+ counters.get(HostRoleStatus.QUEUED) -
+ counters.get(HostRoleStatus.PENDING));
+
+ return counters;
+ }
+
+ /**
+ * Returns map for counts of stages that are in various states.
+ *
+ * @param stages collection of beans {@link org.apache.ambari.server.events.listeners.tasks.TaskStatusListener.ActiveStage}
+ *
+ * @return a map of counts of tasks keyed by the task status
+ */
+ public static Map<StatusType,Map<HostRoleStatus, Integer>> calculateStatusCountsForStage(Collection<TaskStatusListener.ActiveStage> stages) {
+
+ Map<StatusType,Map<HostRoleStatus, Integer>> counters = new HashMap<>();
+ for (StatusType statusType : StatusType.values()) {
+ Map <HostRoleStatus, Integer> statusMap = new HashMap<HostRoleStatus, Integer>();
+ counters.put(statusType,statusMap);
+ // initialize
+ for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+ statusMap.put(hostRoleStatus, 0);
+ }
+ for (TaskStatusListener.ActiveStage stage : stages) {
+ // count tasks where isCompletedState() == true as COMPLETED
+ // but don't count tasks with COMPLETED status twice
+ HostRoleStatus status;
+ if (statusType == StatusType.DISPLAY_STATUS) {
+ status = stage.getDisplayStatus();
+ } else {
+ status = stage.getStatus();
+ }
+ if (status.isCompletedState() && status != HostRoleStatus.COMPLETED) {
+ // Increase total number of completed tasks;
+ statusMap.put(HostRoleStatus.COMPLETED, statusMap.get(HostRoleStatus.COMPLETED) + 1);
+ }
+
+ // Increment counter for particular status
+ statusMap.put(status, statusMap.get(status) + 1);
+ }
+ statusMap.put(HostRoleStatus.IN_PROGRESS,
+ stages.size() -
+ statusMap.get(HostRoleStatus.COMPLETED) -
+ statusMap.get(HostRoleStatus.QUEUED) -
+ statusMap.get(HostRoleStatus.PENDING));
+ }
+ return counters;
+ }
+
+
+ /**
+ * Returns counts of tasks that are in various states.
+ *
+ * @param hostRoleCommands collection of beans {@link HostRoleCommand}
+ *
+ * @return a map of counts of tasks keyed by the task status
+ */
+ public static Map<HostRoleStatus, Integer> calculateStatusCountsForTasks(Collection<HostRoleCommand> hostRoleCommands, StageEntityPK stage) {
+ Map<HostRoleStatus, Integer> counters = new HashMap<>();
+ List<HostRoleCommand> hostRoleCommandsOfStage = new ArrayList<>();
+ // initialize
+ for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+ counters.put(hostRoleStatus, 0);
+ }
+ // calculate counts
+ for (HostRoleCommand hrc : hostRoleCommands) {
+ if (stage.getStageId() == hrc.getStageId() && stage.getRequestId() == hrc.getRequestId()) {
+ // count tasks where isCompletedState() == true as COMPLETED
+ // but don't count tasks with COMPLETED status twice
+ if (hrc.getStatus().isCompletedState() && hrc.getStatus() != HostRoleStatus.COMPLETED) {
+ // Increase total number of completed tasks;
+ counters.put(HostRoleStatus.COMPLETED, counters.get(HostRoleStatus.COMPLETED) + 1);
+ }
+
+ // Increment counter for particular status
+ counters.put(hrc.getStatus(), counters.get(hrc.getStatus()) + 1);
+
+ hostRoleCommandsOfStage.add(hrc);
+ }
+ }
+
+ // We overwrite the value to have the sum converged
+ counters.put(HostRoleStatus.IN_PROGRESS,
+ hostRoleCommandsOfStage.size() -
+ counters.get(HostRoleStatus.COMPLETED) -
+ counters.get(HostRoleStatus.QUEUED) -
+ counters.get(HostRoleStatus.PENDING));
+
+ return counters;
+ }
+
+ /**
* Returns counts of task entities that are in various states.
*
* @param tasks the collection of task entities
@@ -329,7 +457,7 @@ public class CalculatedStatus {
int total = summary.getTaskTotal();
boolean skip = summary.isStageSkippable();
Map<HostRoleStatus, Integer> counts = calculateStatusCounts(summary.getTaskStatuses());
- HostRoleStatus stageStatus = calculateSummaryStatusOfStage(counts, total, skip);
+ HostRoleStatus stageStatus = calculateSummaryStatus(counts, total, skip);
HostRoleStatus stageDisplayStatus = calculateSummaryDisplayStatus(counts, total, skip);
stageStatuses.add(stageStatus);
@@ -392,7 +520,7 @@ public class CalculatedStatus {
*
* @return summary request status based on statuses of tasks in different states.
*/
- public static HostRoleStatus calculateSummaryStatusOfStage(Map<HostRoleStatus, Integer> counters,
+ public static HostRoleStatus calculateSummaryStatus(Map<HostRoleStatus, Integer> counters,
int total, boolean skippable) {
// when there are 0 tasks, return COMPLETED
@@ -435,6 +563,230 @@ public class CalculatedStatus {
}
/**
+ *
+ * @param counters counts of resources that are in various states
+ * @param skippable {Boolean} <code>TRUE<code/> if failure of any of the task should not fail the stage
+ * @return {@link HostRoleStatus}
+ */
+ public static HostRoleStatus calculateSummaryStatusFromPartialSet(Map<HostRoleStatus, Integer> counters,
+ boolean skippable) {
+
+ HostRoleStatus status = HostRoleStatus.PENDING;
+ // By definition, any tasks in a future stage must be held in a PENDING status.
+ if (counters.get(HostRoleStatus.HOLDING) > 0 || counters.get(HostRoleStatus.HOLDING_FAILED) > 0 || counters.get(HostRoleStatus.HOLDING_TIMEDOUT) > 0) {
+ status = counters.get(HostRoleStatus.HOLDING) > 0 ? HostRoleStatus.HOLDING :
+ counters.get(HostRoleStatus.HOLDING_FAILED) > 0 ? HostRoleStatus.HOLDING_FAILED :
+ HostRoleStatus.HOLDING_TIMEDOUT;
+ }
+
+ // Because tasks are not skippable, guaranteed to be FAILED
+ if (counters.get(HostRoleStatus.FAILED) > 0 && !skippable) {
+ status = HostRoleStatus.FAILED;
+ }
+
+ // Because tasks are not skippable, guaranteed to be TIMEDOUT
+ if (counters.get(HostRoleStatus.TIMEDOUT) > 0 && !skippable) {
+ status = HostRoleStatus.TIMEDOUT;
+ }
+
+ int inProgressTasks = counters.get(HostRoleStatus.QUEUED) + counters.get(HostRoleStatus.IN_PROGRESS);
+ if (inProgressTasks > 0) {
+ status = HostRoleStatus.IN_PROGRESS;
+ }
+
+ return status;
+ }
+
+
+ /**
+ *
+ * @param hostRoleCommands list of {@link HostRoleCommand} for a stage
+ * @param counters counts of resources that are in various states
+ * @param successFactors Map of roles to their successfactor for a stage
+ * @param skippable {Boolean} <code>TRUE<code/> if failure of any of the task should not fail the stage
+ * @return {@link HostRoleStatus} based on success factor
+ */
+ public static HostRoleStatus calculateStageStatus(List <HostRoleCommand> hostRoleCommands, Map<HostRoleStatus, Integer> counters, Map<Role, Float> successFactors,
+ boolean skippable) {
+
+ // when there are 0 tasks, return COMPLETED
+ int total = hostRoleCommands.size();
+ if (total == 0) {
+ return HostRoleStatus.COMPLETED;
+ }
+
+ if (counters.get(HostRoleStatus.PENDING) == total) {
+ return HostRoleStatus.PENDING;
+ }
+
+ // By definition, any tasks in a future stage must be held in a PENDING status.
+ if (counters.get(HostRoleStatus.HOLDING) > 0 || counters.get(HostRoleStatus.HOLDING_FAILED) > 0 || counters.get(HostRoleStatus.HOLDING_TIMEDOUT) > 0) {
+ return counters.get(HostRoleStatus.HOLDING) > 0 ? HostRoleStatus.HOLDING :
+ counters.get(HostRoleStatus.HOLDING_FAILED) > 0 ? HostRoleStatus.HOLDING_FAILED :
+ HostRoleStatus.HOLDING_TIMEDOUT;
+ }
+
+
+ if (counters.get(HostRoleStatus.FAILED) > 0 && !skippable) {
+ Set<Role> rolesWithFailedTasks = getRolesOfFailedTasks(hostRoleCommands);
+ Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithFailedTasks, successFactors);
+ if (didStageFailed) return HostRoleStatus.FAILED;
+ }
+
+
+ if (counters.get(HostRoleStatus.TIMEDOUT) > 0 && !skippable) {
+ Set<Role> rolesWithTimedOutTasks = getRolesOfTimedOutTasks(hostRoleCommands);
+ Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithTimedOutTasks, successFactors);
+ if (didStageFailed) return HostRoleStatus.TIMEDOUT;
+ }
+
+ int numActiveTasks = counters.get(HostRoleStatus.PENDING) + counters.get(HostRoleStatus.QUEUED) + counters.get(HostRoleStatus.IN_PROGRESS);
+
+ if (numActiveTasks > 0) {
+ return HostRoleStatus.IN_PROGRESS;
+ } else if (counters.get(HostRoleStatus.ABORTED) > 0) {
+ Set<Role> rolesWithTimedOutTasks = getRolesOfAbortedTasks(hostRoleCommands);
+ Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithTimedOutTasks, successFactors);
+ if (didStageFailed) return HostRoleStatus.ABORTED;
+ }
+
+ return HostRoleStatus.COMPLETED;
+ }
+
+ /**
+ * Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#FAILED}
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @return Set of {@link Role}
+ */
+ protected static Set<Role> getRolesOfFailedTasks(List <HostRoleCommand> hostRoleCommands) {
+ return getRolesOfTasks(hostRoleCommands, HostRoleStatus.FAILED);
+ }
+
+ /**
+ * Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#TIMEDOUT}
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @return Set of {@link Role}
+ */
+ protected static Set<Role> getRolesOfTimedOutTasks(List <HostRoleCommand> hostRoleCommands) {
+ return getRolesOfTasks(hostRoleCommands, HostRoleStatus.TIMEDOUT);
+ }
+
+ /**
+ * Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#ABORTED}
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @return Set of {@link Role}
+ */
+ protected static Set<Role> getRolesOfAbortedTasks(List <HostRoleCommand> hostRoleCommands) {
+ return getRolesOfTasks(hostRoleCommands, HostRoleStatus.ABORTED);
+ }
+
+ /**
+ * Get all {@link Role} any of whose tasks are in given {@code status}
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @param status {@link HostRoleStatus}
+ * @return Set of {@link Role}
+ */
+ protected static Set<Role> getRolesOfTasks(List <HostRoleCommand> hostRoleCommands, final HostRoleStatus status) {
+
+ Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+ @Override
+ public boolean apply(HostRoleCommand hrc) {
+ return hrc.getStatus() == status;
+ }
+ };
+
+ Function<HostRoleCommand, Role> transform = new Function<HostRoleCommand, Role>() {
+ @Override
+ public Role apply(HostRoleCommand hrc) {
+ return hrc.getRole();
+ }
+ };
+ return FluentIterable.from(hostRoleCommands)
+ .filter(predicate)
+ .transform(transform)
+ .toSet();
+ }
+
+ /**
+ *
+ * @param hostRoleCommands list of {@link HostRoleCommand} for a stage
+ * @param roles set of roles to be checked for meeting success criteria
+ * @param successFactors map of role to it's success factor
+ * @return {Boolean} <code>TRUE</code> if stage failed due to hostRoleCommands of any role not meeting success criteria
+ */
+ protected static Boolean didStageFailed(List<HostRoleCommand> hostRoleCommands, Set<Role> roles, Map<Role, Float> successFactors) {
+ Boolean isFailed = Boolean.FALSE;
+ for (Role role: roles) {
+ List <HostRoleCommand> hostRoleCommandsOfRole = getHostRoleCommandsOfRole(hostRoleCommands, role);
+ List <HostRoleCommand> failedHostRoleCommands = getFailedHostRoleCommands(hostRoleCommandsOfRole);
+ float successRatioForRole = (hostRoleCommandsOfRole.size() - failedHostRoleCommands.size())/hostRoleCommandsOfRole.size();
+ Float successFactorForRole = successFactors.get(role) == null ? 1.0f : successFactors.get(role);
+ if (successRatioForRole < successFactorForRole) {
+ isFailed = Boolean.TRUE;
+ break;
+ }
+ }
+ return isFailed;
+ }
+
+ /**
+ *
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @param role {@link Role}
+ * @return list of {@link HostRoleCommand} that belongs to {@link Role}
+ */
+ protected static List<HostRoleCommand> getHostRoleCommandsOfRole(List <HostRoleCommand> hostRoleCommands, final Role role) {
+ Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+ @Override
+ public boolean apply(HostRoleCommand hrc) {
+ return hrc.getRole() == role;
+ }
+ };
+ return FluentIterable.from(hostRoleCommands)
+ .filter(predicate)
+ .toList();
+ }
+
+ /**
+ *
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @return list of {@link HostRoleCommand} with failed status
+ */
+ protected static List<HostRoleCommand> getFailedHostRoleCommands(List <HostRoleCommand> hostRoleCommands) {
+ Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+ @Override
+ public boolean apply(HostRoleCommand hrc) {
+ return hrc.getStatus().isFailedAndNotSkippableState();
+ }
+ };
+ return FluentIterable.from(hostRoleCommands)
+ .filter(predicate)
+ .toList();
+ }
+
+
+ /**
+ * Calculate overall status from collection of statuses
+ * @param hostRoleStatuses list of all stage's {@link HostRoleStatus}
+ * @return overall status of a request
+ */
+ public static HostRoleStatus getOverallStatusForRequest (Collection<HostRoleStatus> hostRoleStatuses) {
+ Map<HostRoleStatus, Integer> statusCount = calculateStatusCounts(hostRoleStatuses);
+ return calculateSummaryStatus(statusCount, hostRoleStatuses.size(), false);
+ }
+
+ /**
+ * Calculate overall display status from collection of statuses
+ * @param hostRoleStatuses list of all stage's {@link HostRoleStatus}
+ * @return overall display status of a request
+ */
+ public static HostRoleStatus getOverallDisplayStatusForRequest (Collection<HostRoleStatus> hostRoleStatuses) {
+ Map<HostRoleStatus, Integer> statusCount = calculateStatusCounts(hostRoleStatuses);
+ return calculateSummaryDisplayStatus(statusCount, hostRoleStatuses.size(), false);
+ }
+
+
+ /**
* Calculate overall status of an upgrade.
*
* @param counters counts of resources that are in various states
@@ -444,7 +796,7 @@ public class CalculatedStatus {
*/
protected static HostRoleStatus calculateSummaryStatusOfUpgrade(
Map<HostRoleStatus, Integer> counters, int total) {
- return calculateSummaryStatusOfStage(counters, total, false);
+ return calculateSummaryStatus(counters, total, false);
}
/**
@@ -456,10 +808,28 @@ public class CalculatedStatus {
*
* @return summary request status based on statuses of tasks in different states.
*/
- protected static HostRoleStatus calculateSummaryDisplayStatus(
+ public static HostRoleStatus calculateSummaryDisplayStatus(
Map<HostRoleStatus, Integer> counters, int total, boolean skippable) {
- return counters.get(HostRoleStatus.SKIPPED_FAILED) > 0 ? HostRoleStatus.SKIPPED_FAILED :
- counters.get(HostRoleStatus.FAILED) > 0 ? HostRoleStatus.FAILED:
- calculateSummaryStatusOfStage(counters, total, skippable);
+ return counters.get(HostRoleStatus.FAILED) > 0 ? HostRoleStatus.FAILED:
+ counters.get(HostRoleStatus.TIMEDOUT) > 0 ? HostRoleStatus.TIMEDOUT:
+ counters.get(HostRoleStatus.SKIPPED_FAILED) > 0 ? HostRoleStatus.SKIPPED_FAILED :
+ calculateSummaryStatus(counters, total, skippable);
+ }
+
+ /**
+ * kind of {@link HostRoleStatus} persisted by {@link Stage} and {@link Request}
+ */
+ public enum StatusType {
+ STATUS("status"),
+ DISPLAY_STATUS("display_status");
+ private String value;
+
+ StatusType(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
new file mode 100644
index 0000000..9d73122
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
+
+/**
+ * The {@link TaskCreateEvent} is to be fired every time
+ * when any request is to be tracked as running requests in
+ * {@link TaskStatusListener}
+ * This usually happens when new request is created by user action or
+ * when ambari-server starts with some stages in non-completed state
+ */
+public class TaskCreateEvent extends TaskEvent {
+
+
+ /**
+ * Constructor.
+ *
+ * @param hostRoleCommandList
+ * all hostRoleCommands for all requests
+ */
+ public TaskCreateEvent(List<HostRoleCommand> hostRoleCommandList) {
+ super(hostRoleCommandList);
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
new file mode 100644
index 0000000..ca351d7
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * {@link TaskEvent} is the base for all events related to create/update of tasks
+ * that might result in update of stage/request status
+ */
+public class TaskEvent {
+ /**
+ * List of {@link HostRoleCommand}
+ */
+ private List<HostRoleCommand> hostRoleCommands;
+
+ /**
+ * Constructor.
+ *
+ * @param hostRoleCommands
+ * list of HRCs which have been reported back by the agents.
+ */
+ public TaskEvent(List<HostRoleCommand> hostRoleCommands) {
+ this.hostRoleCommands = hostRoleCommands;
+ }
+
+ /**
+ * Gets hostRoleCommands that created event
+ * @return List of {@link HostRoleCommand}
+ */
+ public List<HostRoleCommand> getHostRoleCommands() {
+ return hostRoleCommands;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ String hostRoleCommands = StringUtils.join(this.hostRoleCommands, ", ");
+ StringBuilder buffer = new StringBuilder("TaskEvent{");
+ buffer.append("hostRoleCommands=").append(hostRoleCommands);
+ buffer.append("}");
+ return buffer.toString();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
new file mode 100644
index 0000000..84f67f5
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+
+/**
+ * The {@link TaskUpdateEvent} is to be fired every time
+ * when host role commands are merged to the database
+ */
+public class TaskUpdateEvent extends TaskEvent{
+
+ public TaskUpdateEvent(List<HostRoleCommand> hostRoleCommandList) {
+ super(hostRoleCommandList);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
new file mode 100644
index 0000000..bc146ef
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -0,0 +1,609 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.listeners.tasks;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.controller.internal.CalculatedStatus;
+import org.apache.ambari.server.events.TaskCreateEvent;
+import org.apache.ambari.server.events.TaskUpdateEvent;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.dao.StageDAO;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * The {@link TaskStatusListener} is used to constantly update status of running Stages and Requests
+ * {@link TaskUpdateEvent} listens for all incoming events. These events are fired when either host role commands are created/updated
+ * This listener maintains map of all running tasks, stages and requests
+ */
+@Singleton
+@EagerSingleton
+public class TaskStatusListener {
+ /**
+ * Logger.
+ */
+ private final static Logger LOG = LoggerFactory.getLogger(TaskStatusListener.class);
+
+ /**
+ * Maps task id to its {@link HostRoleCommand} Object.
+ * Map has entries of all tasks of all active(ongoing) requests
+ * NOTE: Partial loading of tasks for any request may lead to incorrect update of the request status
+ */
+ private Map<Long,HostRoleCommand> activeTasksMap = new ConcurrentHashMap<>();
+
+ /**
+ * Maps all ongoing request id to its {@link ActiveRequest}
+ */
+ private Map<Long, ActiveRequest> activeRequestMap = new ConcurrentHashMap<>();
+
+ /**
+ * Maps {@link StageEntityPK} of all ongoing requests to its {@link ActiveStage}
+ * with updated {@link ActiveStage#status} and {@link ActiveStage#displayStatus}.
+ */
+ private Map<StageEntityPK, ActiveStage> activeStageMap = new ConcurrentHashMap<>();
+
+ private StageDAO stageDAO;
+
+ private RequestDAO requestDAO;
+
+
+ @Inject
+ public TaskStatusListener(TaskEventPublisher taskEventPublisher, StageDAO stageDAO, RequestDAO requestDAO) {
+ this.stageDAO = stageDAO;
+ this.requestDAO = requestDAO;
+ taskEventPublisher.register(this);
+ }
+
+ public Map<Long,HostRoleCommand> getActiveTasksMap() {
+ return activeTasksMap;
+ }
+
+ public Map<Long, ActiveRequest> getActiveRequestMap() {
+ return activeRequestMap;
+ }
+
+ public Map<StageEntityPK, ActiveStage> getActiveStageMap() {
+ return activeStageMap;
+ }
+
+ /**
+ * On receiving task update event, update related entries of the running request, stage and task in the maps
+ * Event containing newly created tasks is expected to contain complete set of all tasks for a request
+ * @param event Consumes {@link TaskUpdateEvent}.
+ */
+ @Subscribe
+ public void onTaskUpdateEvent(TaskUpdateEvent event) {
+ LOG.debug("Received task update event {}", event);
+ List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
+ List<HostRoleCommand> hostRoleCommandWithReceivedStatus = new ArrayList<>();
+ Set<StageEntityPK> stagesWithReceivedTaskStatus = new HashSet<>();
+ Set<Long> requestIdsWithReceivedTaskStatus = new HashSet<>();
+ for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) {
+ Long reportedTaskId = hostRoleCommand.getTaskId();
+ HostRoleCommand activeTask = activeTasksMap.get(reportedTaskId);
+ if (activeTask == null) {
+ LOG.error(String.format("Received update for a task %d which is not being tracked as running task", reportedTaskId));
+ } else {
+ hostRoleCommandWithReceivedStatus.add(hostRoleCommand);
+ StageEntityPK stageEntityPK = new StageEntityPK();
+ stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+ stageEntityPK.setStageId(hostRoleCommand.getStageId());
+ stagesWithReceivedTaskStatus.add(stageEntityPK);
+ requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
+ }
+ }
+
+ updateActiveTasksMap(hostRoleCommandWithReceivedStatus);
+ Boolean didAnyStageStatusUpdated = updateActiveStagesStatus(stagesWithReceivedTaskStatus, hostRoleCommandListAll);
+ // Presumption: If there is no update in any of the running stage's status
+ // then none of the running request status needs to be updated
+ if (didAnyStageStatusUpdated) {
+ updateActiveRequestsStatus(requestIdsWithReceivedTaskStatus, stagesWithReceivedTaskStatus);
+ }
+
+ }
+
+ /**
+ * On receiving task create event, create entries in the running request, stage and task in the maps
+ * @param event Consumes {@link TaskCreateEvent}.
+ */
+ @Subscribe
+ public void onTaskCreateEvent(TaskCreateEvent event) {
+ LOG.debug("Received task create event {}", event);
+ List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
+
+ for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) {
+ activeTasksMap.put(hostRoleCommand.getTaskId(), hostRoleCommand);
+ addStagePK(hostRoleCommand);
+ addRequestId(hostRoleCommand);
+ }
+ }
+
+
+ /**
+ * update changed host role command status
+ * @param hostRoleCommandWithReceivedStatus list of host role commands reported
+ */
+ private void updateActiveTasksMap(List<HostRoleCommand> hostRoleCommandWithReceivedStatus) {
+ for (HostRoleCommand hostRoleCommand : hostRoleCommandWithReceivedStatus) {
+ Long taskId = hostRoleCommand.getTaskId();
+ activeTasksMap.put(taskId , hostRoleCommand);
+ }
+ }
+
+
+ /**
+ * Adds new {@link StageEntityPK} to be tracked as running stage in {@link #activeStageMap}
+ * @param hostRoleCommand newly created {@link HostRoleCommand} in {@link #activeTasksMap}
+ */
+ private void addStagePK(HostRoleCommand hostRoleCommand) {
+ StageEntityPK stageEntityPK = new StageEntityPK();
+ stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+ stageEntityPK.setStageId(hostRoleCommand.getStageId());
+ if (activeStageMap.containsKey(stageEntityPK)) {
+ activeStageMap.get(stageEntityPK).addTaskId(hostRoleCommand.getTaskId());
+ } else {
+ StageEntity stageEntity = stageDAO.findByPK(stageEntityPK);
+ // Stage entity of the hostrolecommand should be persisted before publishing task create event
+ assert stageEntity != null;
+ Map<Role, Float> successFactors = new HashMap<>();
+ Collection<RoleSuccessCriteriaEntity> roleSuccessCriteriaEntities = stageEntity.getRoleSuccessCriterias();
+ for (RoleSuccessCriteriaEntity successCriteriaEntity : roleSuccessCriteriaEntities) {
+ successFactors.put(successCriteriaEntity.getRole(), successCriteriaEntity.getSuccessFactor().floatValue());
+ }
+ Set<Long> taskIdSet = Sets.newHashSet(hostRoleCommand.getTaskId());
+
+ ActiveStage reportedStage = new ActiveStage(stageEntity.getStatus(), stageEntity.getDisplayStatus(),
+ successFactors, stageEntity.isSkippable(), taskIdSet);
+ activeStageMap.put(stageEntityPK, reportedStage);
+ }
+ }
+
+ /**
+ * update and persist all changed stage status
+ * @param stagesWithReceivedTaskStatus set of stages that has received task status
+ * @param hostRoleCommandListAll list of all task updates received from agent
+ * @return <code>true</code> if any of the stage has changed it's existing status;
+ * <code>false</code> otherwise
+ */
+ private Boolean updateActiveStagesStatus(final Set<StageEntityPK> stagesWithReceivedTaskStatus, List<HostRoleCommand> hostRoleCommandListAll) {
+ Boolean didAnyStageStatusUpdated = Boolean.FALSE;
+ for (StageEntityPK reportedStagePK : stagesWithReceivedTaskStatus) {
+ if (activeStageMap.containsKey(reportedStagePK)) {
+ Boolean didStatusChange = updateStageStatus(reportedStagePK, hostRoleCommandListAll);
+ if (didStatusChange) {
+ ActiveStage reportedStage = activeStageMap.get(reportedStagePK);
+ stageDAO.updateStatus(reportedStagePK, reportedStage.getStatus(), reportedStage.getDisplayStatus());
+ didAnyStageStatusUpdated = Boolean.TRUE;
+ }
+ } else {
+ LOG.error(String.format("Received update for a task whose stage is not being tracked as running stage: %s", reportedStagePK.toString()));
+ }
+
+ }
+ return didAnyStageStatusUpdated;
+ }
+
+ /**
+ * Adds new request id to be tracked as running request in {@link #activeRequestMap}
+ * @param hostRoleCommand newly created {@link HostRoleCommand} in {@link #activeTasksMap}
+ */
+ private void addRequestId(HostRoleCommand hostRoleCommand) {
+ Long requestId = hostRoleCommand.getRequestId();
+ StageEntityPK stageEntityPK = new StageEntityPK();
+ stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+ stageEntityPK.setStageId(hostRoleCommand.getStageId());
+ if (activeRequestMap.containsKey(requestId)) {
+ activeRequestMap.get(requestId).addStageEntityPK(stageEntityPK);
+ } else {
+ RequestEntity requestEntity = requestDAO.findByPK(requestId);
+ // Request entity of the hostrolecommand should be persisted before publishing task create event
+ assert requestEntity != null;
+ Set<StageEntityPK> stageEntityPKs = Sets.newHashSet(stageEntityPK);
+ ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(), stageEntityPKs);
+ activeRequestMap.put(requestId, request);
+ }
+ }
+
+
+ /**
+ * update and persist changed request status
+ * @param requestIdsWithReceivedTaskStatus set of request ids that has received tasks status
+ * @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status
+ */
+ private void updateActiveRequestsStatus(final Set<Long> requestIdsWithReceivedTaskStatus, Set<StageEntityPK> stagesWithChangedTaskStatus) {
+ for (Long reportedRequestId : requestIdsWithReceivedTaskStatus) {
+ if (activeRequestMap.containsKey(reportedRequestId)) {
+ ActiveRequest request = activeRequestMap.get(reportedRequestId);
+ Boolean didStatusChange = updateRequestStatus(reportedRequestId, stagesWithChangedTaskStatus);
+ if (didStatusChange) {
+ requestDAO.updateStatus(reportedRequestId, request.getStatus(), request.getDisplayStatus());
+ }
+ if (request.isCompleted() && isAllTasksCompleted(reportedRequestId)) {
+ // Request is considered ton have been finished if request status and all of it's tasks status are completed
+ // in that case, request and it's stages
+ // and tasks should no longer be tracked as active(running)
+ removeRequestStageAndTasks(reportedRequestId);
+ }
+ } else {
+ LOG.error(String.format("Received update for a task whose request %d is not being tracked as running request", reportedRequestId));
+ }
+
+ }
+ }
+
+ /**
+ *
+ * @param requestId request Id
+ * @return <code>false</code> if any of the task belonging to requestId has incomplete status
+ * <code>true</code> otherwise
+ */
+ private Boolean isAllTasksCompleted(Long requestId) {
+ Boolean result = Boolean.TRUE;
+ for (Map.Entry<Long, HostRoleCommand> entry : activeTasksMap.entrySet()) {
+ if (entry.getValue().getRequestId() == requestId && !entry.getValue().getStatus().isCompletedState()) {
+ result = Boolean.FALSE;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Removes entries from {@link #activeTasksMap},{@link #activeStageMap} and {@link #activeRequestMap}
+ * @param requestId request id whose entry and it's stage and task entries is to be removed
+ */
+ private void removeRequestStageAndTasks(Long requestId) {
+ removeTasks(requestId);
+ removeStages(requestId);
+ removeRequest(requestId);
+ }
+
+
+ /**
+ * Filters list of {@link Stage} to list of {@link StageEntityPK}
+ * @param requestID requestId
+ * @return list of StageEntityPK
+ */
+ private List<StageEntityPK> getAllStageEntityPKForRequest(final Long requestID) {
+ Predicate<StageEntityPK> predicate = new Predicate<StageEntityPK>() {
+ @Override
+ public boolean apply(StageEntityPK stageEntityPK) {
+ return stageEntityPK.getRequestId().equals(requestID);
+ }
+ };
+ return FluentIterable.from(activeStageMap.keySet())
+ .filter(predicate)
+ .toList();
+ }
+
+
+
+ /**
+ * Returns the computed status of the stage from the status of it's host role commands
+ * @param stagePK {@link StageEntityPK} primary key for the stage entity
+ * @param hostRoleCommandListAll list of all hrc received whose status has been received from agent
+ * @return {@link Boolean} <code>TRUE</code> if status of the given stage changed.
+ */
+ private Boolean updateStageStatus(final StageEntityPK stagePK, List<HostRoleCommand> hostRoleCommandListAll) {
+ Boolean didAnyStatusChanged = Boolean.FALSE;
+ ActiveStage reportedStage = activeStageMap.get(stagePK);
+ HostRoleStatus stageCurrentStatus = reportedStage.getStatus();
+ HostRoleStatus stageCurrentDisplayStatus = reportedStage.getDisplayStatus();
+
+
+ // if stage is already marked to be completed then do not calculate reported status from host role commands
+ // Presumption: There will be no status transition of the host role command from one completed state to another
+ if (!stageCurrentDisplayStatus.isCompletedState() || !stageCurrentStatus.isCompletedState()) {
+ Map<HostRoleStatus, Integer> receivedTaskStatusCount = CalculatedStatus.calculateStatusCountsForTasks(hostRoleCommandListAll, stagePK);
+ HostRoleStatus statusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(receivedTaskStatusCount, reportedStage.getSkippable());
+ HostRoleStatus displayStatusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(receivedTaskStatusCount, Boolean.FALSE);
+ if (statusFromPartialSet == HostRoleStatus.PENDING || displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+ Function<Long,HostRoleCommand> transform = new Function<Long,HostRoleCommand>(){
+ @Override
+ public HostRoleCommand apply(Long taskId) {
+ return activeTasksMap.get(taskId);
+ }
+ };
+
+ List<HostRoleCommand> activeHostRoleCommandsOfStage = FluentIterable.from(reportedStage.getTaskIds())
+ .transform(transform).toList();
+ Map<HostRoleStatus, Integer> statusCount = CalculatedStatus.calculateStatusCountsForTasks(activeHostRoleCommandsOfStage);
+ if (displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+ // calculate and get new display status of the stage as per the new status of received host role commands
+ HostRoleStatus display_status = CalculatedStatus.calculateSummaryDisplayStatus(statusCount, activeHostRoleCommandsOfStage.size(), reportedStage.getSkippable());
+ if (display_status != stageCurrentDisplayStatus) {
+ reportedStage.setDisplayStatus(display_status);
+ didAnyStatusChanged = Boolean.TRUE;
+ }
+
+ } else {
+ reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+ didAnyStatusChanged = Boolean.TRUE;
+ }
+
+ if (statusFromPartialSet == HostRoleStatus.PENDING) {
+ // calculate status of the stage as per the new status of received host role commands
+ HostRoleStatus status = CalculatedStatus.calculateStageStatus(activeHostRoleCommandsOfStage, statusCount, reportedStage.getSuccessFactors(), reportedStage.getSkippable());
+ if (status != stageCurrentStatus) {
+ reportedStage.setStatus(status);
+ didAnyStatusChanged = Boolean.TRUE;
+ }
+ } else {
+ reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+ didAnyStatusChanged = Boolean.TRUE;
+ }
+ } else {
+ reportedStage.setStatus(statusFromPartialSet);
+ reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+ didAnyStatusChanged = Boolean.TRUE;
+ }
+ }
+
+ return didAnyStatusChanged;
+ }
+
+ /**
+ *
+ * @param requestId {@link Request} whose status is to be updated
+ * @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status
+ * @return {Boolean} <code>TRUE</code> if request status has changed from existing
+ */
+ private Boolean updateRequestStatus (final Long requestId, Set<StageEntityPK> stagesWithChangedTaskStatus) {
+ Boolean didStatusChanged = Boolean.FALSE;
+ ActiveRequest request = activeRequestMap.get(requestId);
+ HostRoleStatus requestCurrentStatus = request.getStatus();
+ HostRoleStatus requestCurrentDisplayStatus = request.getDisplayStatus();
+
+ if (!requestCurrentDisplayStatus.isCompletedState() || !requestCurrentStatus.isCompletedState()) {
+ List <ActiveStage> activeStagesWithChangesTaskStatus = new ArrayList<>();
+ for (StageEntityPK stageEntityPK:stagesWithChangedTaskStatus) {
+ if (requestId.equals(stageEntityPK.getRequestId())) {
+ ActiveStage activeStage = activeStageMap.get(stageEntityPK);
+ activeStagesWithChangesTaskStatus.add(activeStage);
+ }
+ }
+
+
+ Map<CalculatedStatus.StatusType,Map<HostRoleStatus, Integer>> stageStatusCountFromPartialSet = CalculatedStatus.calculateStatusCountsForStage(activeStagesWithChangesTaskStatus);
+ HostRoleStatus statusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(stageStatusCountFromPartialSet.get(CalculatedStatus.StatusType.STATUS), Boolean.FALSE);
+ HostRoleStatus displayStatusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(stageStatusCountFromPartialSet.get(CalculatedStatus.StatusType.DISPLAY_STATUS), Boolean.FALSE);
+
+ if (statusFromPartialSet == HostRoleStatus.PENDING || displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+ List <ActiveStage> allActiveStages = new ArrayList<>();
+ for (StageEntityPK stageEntityPK:request.getStageEntityPks()) {
+ ActiveStage activeStage = activeStageMap.get(stageEntityPK);
+ allActiveStages.add(activeStage);
+ }
+ Map<CalculatedStatus.StatusType,Map<HostRoleStatus, Integer>> stageStatusCount = CalculatedStatus.calculateStatusCountsForStage(allActiveStages);
+
+ if (displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+ // calculate and get new display status of the stage as per the new status of received host role commands
+
+ HostRoleStatus display_status = CalculatedStatus.calculateSummaryDisplayStatus(stageStatusCount.get(CalculatedStatus.StatusType.DISPLAY_STATUS), allActiveStages.size(), false);
+ if (display_status != requestCurrentDisplayStatus) {
+ request.setDisplayStatus(display_status);
+ didStatusChanged = Boolean.TRUE;
+ }
+
+ } else {
+ request.setDisplayStatus(displayStatusFromPartialSet);
+ didStatusChanged = Boolean.TRUE;
+ }
+
+ if (statusFromPartialSet == HostRoleStatus.PENDING) {
+ // calculate status of the stage as per the new status of received host role commands
+ HostRoleStatus status = CalculatedStatus.calculateSummaryStatus(stageStatusCount.get(CalculatedStatus.StatusType.STATUS), allActiveStages.size(), false);
+ if (status != requestCurrentStatus) {
+ request.setStatus(status);
+ didStatusChanged = Boolean.TRUE;
+ }
+ } else {
+ request.setDisplayStatus(displayStatusFromPartialSet);
+ didStatusChanged = Boolean.TRUE;
+ }
+ } else {
+ request.setStatus(statusFromPartialSet);
+ request.setDisplayStatus(displayStatusFromPartialSet);
+ didStatusChanged = Boolean.TRUE;
+ }
+ }
+
+ return didStatusChanged;
+ }
+
+
+ /**
+ * Removes list of {@link HostRoleCommand} entries from {@link #activeTasksMap}
+ * @param requestId request id
+ */
+ private void removeTasks(Long requestId) {
+ Iterator<Map.Entry<Long, HostRoleCommand>> iter = activeTasksMap.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Long, HostRoleCommand> entry = iter.next();
+ HostRoleCommand hrc = entry.getValue();
+ if (hrc.getRequestId() == requestId) {
+ if (!hrc.getStatus().isCompletedState()) {
+ LOG.error(String.format("Task %d should have been completed before being removed from running task cache(activeTasksMap)", hrc.getTaskId()));
+ }
+ iter.remove();
+ }
+ }
+ }
+
+
+ /**
+ * Removes list of {@link StageEntityPK} entries from {@link #activeStageMap}
+ * @param requestId request Id
+ */
+ private void removeStages(Long requestId) {
+ List <StageEntityPK> stageEntityPKs = getAllStageEntityPKForRequest(requestId);
+ for (StageEntityPK stageEntityPK: stageEntityPKs) {
+ activeStageMap.remove(stageEntityPK);
+ }
+ }
+
+
+ /**
+ * Removes request id from {@link #activeRequestMap}
+ * @param requestId request Id
+ */
+ private void removeRequest(Long requestId) {
+ activeRequestMap.remove(requestId);
+ }
+
+
+ /**
+ * This class stores {@link Request#status} and {@link Request#displayStatus} information
+ * This information is cached for all running {@link Request} at {@link #activeRequestMap}
+ */
+ protected class ActiveRequest {
+ private HostRoleStatus status;
+ private HostRoleStatus displayStatus;
+ private Set <StageEntityPK> stageEntityPks;
+
+ public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks) {
+ this.status = status;
+ this.displayStatus = displayStatus;
+ this.stageEntityPks = stageEntityPks;
+ }
+
+ public HostRoleStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(HostRoleStatus status) {
+ this.status = status;
+ }
+
+ public HostRoleStatus getDisplayStatus() {
+ return displayStatus;
+ }
+
+ public void setDisplayStatus(HostRoleStatus displayStatus) {
+ this.displayStatus = displayStatus;
+ }
+
+ public Boolean isCompleted() {
+ return status.isCompletedState() && displayStatus.isCompletedState();
+ }
+
+ public Set <StageEntityPK> getStageEntityPks() {
+ return stageEntityPks;
+ }
+
+ public void addStageEntityPK(StageEntityPK stageEntityPK) {
+ stageEntityPks.add(stageEntityPK);
+ }
+
+ }
+
+ /**
+ * This class stores information needed to determine {@link Stage#status} and {@link Stage#displayStatus}
+ * This information is cached for all {@link Stage} of all running {@link Request} at {@link #activeStageMap}
+ */
+ public class ActiveStage {
+ private HostRoleStatus status;
+ private HostRoleStatus displayStatus;
+ private Boolean skippable;
+ private Set <Long> taskIds;
+
+ //Map of roles to successFactors for this stage. Default is 1 i.e. 100%
+ private Map<Role, Float> successFactors = new HashMap<Role, Float>();
+
+ public ActiveStage(HostRoleStatus status, HostRoleStatus displayStatus,
+ Map<Role, Float> successFactors, Boolean skippable, Set<Long> taskIds) {
+ this.status = status;
+ this.displayStatus = displayStatus;
+ this.successFactors = successFactors;
+ this.skippable = skippable;
+ this.taskIds = taskIds;
+ }
+
+ public HostRoleStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(HostRoleStatus status) {
+ this.status = status;
+ }
+
+ public HostRoleStatus getDisplayStatus() {
+ return displayStatus;
+ }
+
+ public void setDisplayStatus(HostRoleStatus displayStatus) {
+ this.displayStatus = displayStatus;
+ }
+
+ public Boolean getSkippable() {
+ return skippable;
+ }
+
+ public void setSkippable(Boolean skippable) {
+ this.skippable = skippable;
+ }
+
+ public Map<Role, Float> getSuccessFactors() {
+ return successFactors;
+ }
+
+ public void setSuccessFactors(Map<Role, Float> successFactors) {
+ this.successFactors = successFactors;
+ }
+
+ public Set <Long> getTaskIds() {
+ return taskIds;
+ }
+
+ public void addTaskId(Long taskId) {
+ taskIds.add(taskId);
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
new file mode 100644
index 0000000..fdc41e5
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import org.apache.ambari.server.events.TaskEvent;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link TaskEventPublisher} is used to publish instances of
+ * {@link TaskEvent} to any {@link com.google.common.eventbus.Subscribe} interested.
+ * It uses a single-threaded, serial {@link EventBus}.
+ */
+@Singleton
+public class TaskEventPublisher {
+
+ /**
+ * A single threaded, synchronous event bus for processing task events.
+ */
+ private final EventBus m_eventBus = new EventBus("ambari-task-report-event-bus");
+
+
+ /**
+ * Publishes the specified event to all registered listeners that
+ * {@link Subscribe} to {@link TaskEvent} instances.
+ *
+ * @param event {@link TaskEvent}
+ */
+ public void publish(TaskEvent event) {
+ m_eventBus.post(event);
+ }
+
+ /**
+ * Register a listener to receive events. The listener should use the
+ * {@link Subscribe} annotation.
+ *
+ * @param object
+ * the listener to receive events.
+ */
+ public void register(Object object) {
+ m_eventBus.register(object);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 02c4091..e834045 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -40,6 +40,8 @@ import org.apache.ambari.annotations.TransactionalLock;
import org.apache.ambari.annotations.TransactionalLock.LockArea;
import org.apache.ambari.annotations.TransactionalLock.LockType;
import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.api.query.JpaPredicateVisitor;
import org.apache.ambari.server.api.query.JpaSortBuilder;
@@ -49,6 +51,9 @@ import org.apache.ambari.server.controller.spi.Predicate;
import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.SortRequest;
import org.apache.ambari.server.controller.utilities.PredicateHelper;
+import org.apache.ambari.server.events.TaskCreateEvent;
+import org.apache.ambari.server.events.TaskUpdateEvent;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.TransactionalLocks;
import org.apache.ambari.server.orm.entities.HostEntity;
@@ -58,9 +63,11 @@ import org.apache.ambari.server.orm.entities.StageEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Provider;
@@ -144,6 +151,13 @@ public class HostRoleCommandDAO {
@Inject
private Configuration configuration;
+
+ @Inject
+ HostRoleCommandFactory hostRoleCommandFactory;
+
+ @Inject
+ private TaskEventPublisher taskEventPublisher;
+
/**
* Used to ensure that methods which rely on the completion of
* {@link Transactional} can detect when they are able to run.
@@ -629,11 +643,17 @@ public class HostRoleCommandDAO {
@Transactional
@TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
public HostRoleCommandEntity merge(HostRoleCommandEntity entity) {
+ entity = mergeWithoutPublishEvent(entity);
+ publishTaskUpdateEvent(Collections.singletonList(hostRoleCommandFactory.createExisting(entity)));
+ return entity;
+ }
+
+ @Transactional
+ @TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
+ public HostRoleCommandEntity mergeWithoutPublishEvent(HostRoleCommandEntity entity) {
EntityManager entityManager = entityManagerProvider.get();
entity = entityManager.merge(entity);
-
invalidateHostRoleCommandStatusSummaryCache(entity);
-
return entity;
}
@@ -667,10 +687,51 @@ public class HostRoleCommandDAO {
}
invalidateHostRoleCommandStatusSummaryCache(requestsToInvalidate);
-
+ publishTaskUpdateEvent(getHostRoleCommands(entities));
return managedList;
}
+ /**
+ *
+ * @param entities
+ */
+ public List<HostRoleCommand> getHostRoleCommands(Collection<HostRoleCommandEntity> entities) {
+ Function<HostRoleCommandEntity, HostRoleCommand> transform = new Function<HostRoleCommandEntity, HostRoleCommand> () {
+ @Override
+ public HostRoleCommand apply(HostRoleCommandEntity entity) {
+ return hostRoleCommandFactory.createExisting(entity);
+ }
+ };
+ return FluentIterable.from(entities)
+ .transform(transform)
+ .toList();
+
+ }
+
+ /**
+ *
+ * @param hostRoleCommands
+ */
+ public void publishTaskUpdateEvent(List<HostRoleCommand> hostRoleCommands) {
+ if (!hostRoleCommands.isEmpty()) {
+ TaskUpdateEvent taskUpdateEvent = new TaskUpdateEvent(hostRoleCommands);
+ taskEventPublisher.publish(taskUpdateEvent);
+ }
+ }
+
+ /**
+ *
+ * @param hostRoleCommands
+ */
+ public void publishTaskCreateEvent(List<HostRoleCommand> hostRoleCommands) {
+ if (!hostRoleCommands.isEmpty()) {
+ TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
+ taskEventPublisher.publish(taskCreateEvent);
+ }
+ }
+
+
+
@Transactional
@TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
public void remove(HostRoleCommandEntity entity) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 1c4d0a3..2696f66 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -144,6 +144,14 @@ public class RequestDAO {
}
@Transactional
+ public void updateStatus(long requestId, HostRoleStatus status, HostRoleStatus displayStatus) {
+ RequestEntity requestEntity = findByPK(requestId);
+ requestEntity.setStatus(status);
+ requestEntity.setDisplayStatus(displayStatus);
+ merge(requestEntity);
+ }
+
+ @Transactional
public void create(RequestEntity requestEntity) {
entityManagerProvider.get().persist(requestEntity);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index d2f899f..126468a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -18,6 +18,7 @@
package org.apache.ambari.server.orm.dao;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@@ -173,11 +174,15 @@ public class StageDAO {
return daoUtils.selectList(query);
}
+ /**
+ *
+ * @param statuses {@link HostRoleStatus}
+ * @return list of stage entities
+ */
@RequiresSession
- public List<StageEntity> findByCommandStatuses(
- Collection<HostRoleStatus> statuses) {
+ public List<StageEntity> findByStatuses(Collection<HostRoleStatus> statuses) {
TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(
- "StageEntity.findByCommandStatuses", StageEntity.class);
+ "StageEntity.findByStatuses", StageEntity.class);
query.setParameter("statuses", statuses);
return daoUtils.selectList(query);
@@ -280,8 +285,8 @@ public class StageDAO {
* the stage entity to update
* @param desiredStatus
* the desired stage status
- * @param controller
- * the ambari management controller
+ * @param actionManager
+ * the action manager
*
* @throws java.lang.IllegalArgumentException
* if the transition to the desired status is not a legal transition
@@ -301,9 +306,11 @@ public class StageDAO {
if (desiredStatus == HostRoleStatus.ABORTED) {
actionManager.cancelRequest(stage.getRequestId(), "User aborted.");
} else {
+ List <HostRoleCommandEntity> hrcWithChangedStatus = new ArrayList<HostRoleCommandEntity>();
for (HostRoleCommandEntity hostRoleCommand : tasks) {
HostRoleStatus hostRoleStatus = hostRoleCommand.getStatus();
if (hostRoleStatus.equals(currentStatus)) {
+ hrcWithChangedStatus.add(hostRoleCommand);
hostRoleCommand.setStatus(desiredStatus);
if (desiredStatus == HostRoleStatus.PENDING) {
@@ -316,6 +323,21 @@ public class StageDAO {
}
/**
+ *
+ * @param stageEntityPK {@link StageEntityPK}
+ * @param status {@link HostRoleStatus}
+ * @param displayStatus {@link HostRoleStatus}
+ */
+ @Transactional
+ public void updateStatus(StageEntityPK stageEntityPK, HostRoleStatus status, HostRoleStatus displayStatus) {
+ StageEntity stageEntity = findByPK(stageEntityPK);
+ stageEntity.setStatus(status);
+ stageEntity.setDisplayStatus(displayStatus);
+ merge(stageEntity);
+ }
+
+
+ /**
* Determine whether or not it is valid to transition from this stage status
* to the given status.
*
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index 74271b9..a809295 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -105,9 +105,9 @@ public class HostRoleCommandEntity {
@Basic
private Integer exitcode = 0;
- @Column(name = "status")
+ @Column(name = "status", nullable = false)
@Enumerated(EnumType.STRING)
- private HostRoleStatus status;
+ private HostRoleStatus status = HostRoleStatus.PENDING;
@Column(name = "std_error")
@Lob