You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/02/17 22:06:44 UTC

[45/50] [abbrv] ambari git commit: AMBARI-18868. Stage and Request status should be persisted in the database. (jaimin)

AMBARI-18868. Stage and Request status should be persisted in the database. (jaimin)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0fc7a667
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0fc7a667
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0fc7a667

Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 0fc7a6671feb10dc0e8475dc4878942cf19f46cc
Parents: dd174f4
Author: Jaimin Jetly <ja...@hortonworks.com>
Authored: Fri Feb 17 09:31:10 2017 -0800
Committer: Jaimin Jetly <ja...@hortonworks.com>
Committed: Fri Feb 17 09:31:10 2017 -0800

----------------------------------------------------------------------
 .../actionmanager/ActionDBAccessorImpl.java     | 108 ++--
 .../server/actionmanager/ActionScheduler.java   |  31 +
 .../ambari/server/actionmanager/Request.java    |   8 +-
 .../ambari/server/actionmanager/Stage.java      |  25 +
 .../controller/internal/CalculatedStatus.java   | 390 +++++++++++-
 .../ambari/server/events/TaskCreateEvent.java   |  48 ++
 .../apache/ambari/server/events/TaskEvent.java  |  66 ++
 .../ambari/server/events/TaskUpdateEvent.java   |  35 ++
 .../listeners/tasks/TaskStatusListener.java     | 609 +++++++++++++++++++
 .../events/publishers/TaskEventPublisher.java   |  62 ++
 .../server/orm/dao/HostRoleCommandDAO.java      |  67 +-
 .../ambari/server/orm/dao/RequestDAO.java       |   8 +
 .../apache/ambari/server/orm/dao/StageDAO.java  |  32 +-
 .../orm/entities/HostRoleCommandEntity.java     |   4 +-
 .../server/orm/entities/RequestEntity.java      |  49 +-
 .../ambari/server/orm/entities/StageEntity.java |  70 ++-
 .../server/orm/entities/StageEntityPK.java      |  12 +
 .../server/upgrade/UpgradeCatalog300.java       |  70 +++
 .../main/resources/Ambari-DDL-Derby-CREATE.sql  |   7 +-
 .../main/resources/Ambari-DDL-MySQL-CREATE.sql  |   7 +-
 .../main/resources/Ambari-DDL-Oracle-CREATE.sql |   7 +-
 .../resources/Ambari-DDL-Postgres-CREATE.sql    |   7 +-
 .../resources/Ambari-DDL-SQLAnywhere-CREATE.sql |   7 +-
 .../resources/Ambari-DDL-SQLServer-CREATE.sql   |   7 +-
 .../actionmanager/TestActionDBAccessorImpl.java |   3 +-
 .../actionmanager/TestActionScheduler.java      |  71 ++-
 .../alerts/AmbariPerformanceRunnableTest.java   |   7 +-
 .../internal/UpgradeResourceProviderTest.java   |   1 -
 .../UpgradeSummaryResourceProviderTest.java     |   1 -
 .../listeners/tasks/TaskStatusListenerTest.java | 164 +++++
 .../ambari/server/state/ConfigHelperTest.java   |   2 +
 .../cluster/ClusterEffectiveVersionTest.java    |   5 +-
 .../services/RetryUpgradeActionServiceTest.java |   1 -
 .../server/upgrade/UpgradeCatalog300Test.java   |  20 +
 34 files changed, 1892 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 7881a4b..b813fe6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -45,7 +45,9 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
 import org.apache.ambari.server.events.HostsRemovedEvent;
 import org.apache.ambari.server.events.RequestFinishedEvent;
+import org.apache.ambari.server.events.TaskCreateEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
@@ -130,6 +132,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   AmbariEventPublisher ambariEventPublisher;
 
   @Inject
+  TaskEventPublisher taskEventPublisher;
+
+  @Inject
   AuditLogger auditLogger;
 
   /**
@@ -205,8 +210,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   public Collection<HostRoleCommandEntity> abortOperation(long requestId) {
     long now = System.currentTimeMillis();
 
-    endRequest(requestId);
-
     // only request commands which actually need to be aborted; requesting all
     // commands here can cause OOM problems during large requests like upgrades
     List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequestIdAndStatuses(requestId,
@@ -228,7 +231,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     if (!commands.isEmpty()) {
       return hostRoleCommandDAO.mergeAll(commands);
     }
-
+    endRequest(requestId);
     return Collections.emptyList();
   }
 
@@ -283,7 +286,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   @Override
   @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
   public List<Stage> getStagesInProgress() {
-    List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(
+    List<StageEntity> stageEntities = stageDAO.findByStatuses(
       HostRoleStatus.IN_PROGRESS_STATUSES);
     return getStagesForEntities(stageEntities);
   }
@@ -343,6 +346,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     RequestEntity requestEntity = request.constructNewPersistenceEntity();
 
     Long clusterId = -1L;
+    Long requestId = requestEntity.getRequestId();
     ClusterEntity clusterEntity = clusterDAO.findById(request.getClusterId());
     if (clusterEntity != null) {
       clusterId = clusterEntity.getClusterId();
@@ -356,8 +360,11 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
 
     addRequestToAuditlogCache(request);
 
+    List<HostRoleCommand> hostRoleCommands = new ArrayList<>();
+
     for (Stage stage : request.getStages()) {
       StageEntity stageEntity = stage.constructNewPersistenceEntity();
+      Long stageId = stageEntity.getStageId();
       stageEntities.add(stageEntity);
       stageEntity.setClusterId(clusterId);
       stageEntity.setRequest(requestEntity);
@@ -366,6 +373,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
       List<HostRoleCommand> orderedHostRoleCommands = stage.getOrderedHostRoleCommands();
 
       for (HostRoleCommand hostRoleCommand : orderedHostRoleCommands) {
+        hostRoleCommand.setRequestId(requestId);
+        hostRoleCommand.setStageId(stageId);
         HostRoleCommandEntity hostRoleCommandEntity = hostRoleCommand.constructNewPersistenceEntity();
         hostRoleCommandEntity.setStage(stageEntity);
         hostRoleCommandDAO.create(hostRoleCommandEntity);
@@ -415,11 +424,12 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
         hostRoleCommandEntity.setExecutionCommand(executionCommandEntity);
 
         executionCommandDAO.create(hostRoleCommandEntity.getExecutionCommand());
-        hostRoleCommandEntity = hostRoleCommandDAO.merge(hostRoleCommandEntity);
+        hostRoleCommandEntity = hostRoleCommandDAO.mergeWithoutPublishEvent(hostRoleCommandEntity);
 
         if (null != hostEntity) {
           hostEntity = hostDAO.merge(hostEntity);
         }
+        hostRoleCommands.add(hostRoleCommand);
       }
 
       for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
@@ -431,6 +441,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
 
     requestEntity.setStages(stageEntities);
     requestDAO.merge(requestEntity);
+
+    TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
+    taskEventPublisher.publish(taskCreateEvent);
   }
 
   @Override
@@ -497,66 +510,55 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     long now = System.currentTimeMillis();
 
     List<Long> requestsToCheck = new ArrayList<Long>();
-    List<Long> abortedCommandUpdates = new ArrayList<Long>();
 
     List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet());
+    List<HostRoleCommandEntity> commandEntitiesToMerge = new ArrayList<HostRoleCommandEntity>();
     for (HostRoleCommandEntity commandEntity : commandEntities) {
       CommandReport report = taskReports.get(commandEntity.getTaskId());
-
-      boolean statusChanged = false;
-
-      switch (commandEntity.getStatus()) {
-        case ABORTED:
-          // We don't want to overwrite statuses for ABORTED tasks with
-          // statuses that have been received from the agent after aborting task
-          abortedCommandUpdates.add(commandEntity.getTaskId());
-          break;
-        default:
-          HostRoleStatus status = HostRoleStatus.valueOf(report.getStatus());
-          // if FAILED and marked for holding then set status = HOLDING_FAILED
-          if (status == HostRoleStatus.FAILED && commandEntity.isRetryAllowed()) {
-            status = HostRoleStatus.HOLDING_FAILED;
-
-            // tasks can be marked as skipped when they fail
-            if (commandEntity.isFailureAutoSkipped()) {
-              status = HostRoleStatus.SKIPPED_FAILED;
-            }
+      HostRoleStatus existingTaskStatus = commandEntity.getStatus();
+      HostRoleStatus reportedTaskStatus = HostRoleStatus.valueOf(report.getStatus());
+      if (!existingTaskStatus.isCompletedState() || existingTaskStatus == HostRoleStatus.ABORTED) {
+        // if FAILED and marked for holding then set reportedTaskStatus = HOLDING_FAILED
+        if (reportedTaskStatus == HostRoleStatus.FAILED && commandEntity.isRetryAllowed()) {
+          reportedTaskStatus = HostRoleStatus.HOLDING_FAILED;
+
+          // tasks can be marked as skipped when they fail
+          if (commandEntity.isFailureAutoSkipped()) {
+            reportedTaskStatus = HostRoleStatus.SKIPPED_FAILED;
           }
-
-          commandEntity.setStatus(status);
-          statusChanged = true;
-          break;
-      }
-
-      commandEntity.setStdOut(report.getStdOut().getBytes());
-      commandEntity.setStdError(report.getStdErr().getBytes());
-      commandEntity.setStructuredOut(report.getStructuredOut() == null ? null :
-        report.getStructuredOut().getBytes());
-      commandEntity.setExitcode(report.getExitCode());
-
-      if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
-        commandEntity.setEndTime(now);
-
-        String actionId = report.getActionId();
-        long[] requestStageIds = StageUtils.getRequestStage(actionId);
-        long requestId = requestStageIds[0];
-        long stageId = requestStageIds[1];
-        if(statusChanged) {
-          auditLog(commandEntity, requestId);
         }
-        if (requestDAO.getLastStageId(requestId).equals(stageId)) {
-          requestsToCheck.add(requestId);
+        if (!existingTaskStatus.isCompletedState()) {
+          commandEntity.setStatus(reportedTaskStatus);
         }
+        commandEntity.setStdOut(report.getStdOut().getBytes());
+        commandEntity.setStdError(report.getStdErr().getBytes());
+        commandEntity.setStructuredOut(report.getStructuredOut() == null ? null :
+            report.getStructuredOut().getBytes());
+        commandEntity.setExitcode(report.getExitCode());
+        if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
+          commandEntity.setEndTime(now);
+
+          String actionId = report.getActionId();
+          long[] requestStageIds = StageUtils.getRequestStage(actionId);
+          long requestId = requestStageIds[0];
+          long stageId = requestStageIds[1];
+          auditLog(commandEntity, requestId);
+          if (requestDAO.getLastStageId(requestId).equals(stageId)) {
+            requestsToCheck.add(requestId);
+          }
+        }
+        commandEntitiesToMerge.add(commandEntity);
+      } else {
+       LOG.warn(String.format("Request for invalid transition of host role command status received for task id %d from " +
+           "agent: %s -> %s",commandEntity.getTaskId(),existingTaskStatus,reportedTaskStatus));
       }
     }
 
     // no need to merge if there's nothing to merge
-    if (!commandEntities.isEmpty()) {
-      hostRoleCommandDAO.mergeAll(commandEntities);
+    if (!commandEntitiesToMerge.isEmpty()) {
+      hostRoleCommandDAO.mergeAll(commandEntitiesToMerge);
     }
 
-    // Invalidate cache because of updates to ABORTED commands
-    hostRoleCommandCache.invalidateAll(abortedCommandUpdates);
 
     for (Long requestId : requestsToCheck) {
       endRequestIfCompleted(requestId);
@@ -923,7 +925,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
       return HostRoleStatus.QUEUED;
     }
     Collection<HostRoleStatus> taskStatuses = details.getTaskStatuses();
-    return CalculatedStatus.calculateSummaryStatusOfStage(CalculatedStatus.calculateStatusCounts(taskStatuses), numberOfTasks, false);
+    return CalculatedStatus.calculateSummaryStatus(CalculatedStatus.calculateStatusCounts(taskStatuses), numberOfTasks, false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 680c0a6..a92c03c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.persistence.EntityManager;
 
@@ -49,6 +50,7 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
 import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.events.publishers.JPAEventPublisher;
 import org.apache.ambari.server.metadata.RoleCommandOrder;
@@ -75,10 +77,13 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.reflect.TypeToken;
@@ -179,6 +184,9 @@ class ActionScheduler implements Runnable {
    * we receive awake() request during running a scheduler iteration.
    */
   private boolean activeAwakeRequest = false;
+
+  private AtomicBoolean taskStatusLoaded = new AtomicBoolean();
+
   //Cache for clusterHostinfo, key - stageId-requestId
   private Cache<String, Map<String, Set<String>>> clusterHostInfoCache;
   private Cache<String, Map<String, String>> commandParamsStageCache;
@@ -353,6 +361,8 @@ class ActionScheduler implements Runnable {
         LOG.debug("Processing {} in progress stages ", stages.size());
       }
 
+      publishInProgressTasks(stages);
+
       if (stages.isEmpty()) {
         // Nothing to do
         if (LOG.isDebugEnabled()) {
@@ -532,6 +542,27 @@ class ActionScheduler implements Runnable {
     }
   }
 
+  /**
+   * publish event to load {@link TaskStatusListener#activeTasksMap} {@link TaskStatusListener#activeStageMap}
+   * and {@link TaskStatusListener#activeRequestMap} for all running request once during server startup.
+   * This is required as some tasks may have been in progress when server was last stopped
+   * @param stages list of stages
+   */
+  private void publishInProgressTasks(List<Stage> stages) {
+    if (taskStatusLoaded.compareAndSet(false, true)) {
+      if (!stages.isEmpty()) {
+        Function<Stage, Long> transform = new Function<Stage, Long>() {
+          @Override
+          public Long apply(Stage stage) {
+            return stage.getRequestId();
+          }
+        };
+        Set<Long> runningRequestID = ImmutableSet.copyOf(Lists.transform(stages, transform));
+        List<HostRoleCommand> hostRoleCommands = db.getAllTasksByRequestIds(runningRequestID);
+        hostRoleCommandDAO.publishTaskCreateEvent(hostRoleCommands);
+      }
+    }
+  }
 
   /**
    * Returns the list of hosts that have a task assigned

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
index 31e11c1..502c016 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
@@ -71,7 +71,8 @@ public class Request {
    * As of now, this field is not used. Request status is
    * calculated at RequestResourceProvider on the fly.
    */
-  private HostRoleStatus status; // not persisted yet
+  private HostRoleStatus status = HostRoleStatus.PENDING;
+  private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
   private String inputs;
   private List<RequestResourceFilter> resourceFilters;
   private RequestOperationLevel operationLevel;
@@ -186,6 +187,7 @@ public class Request {
     this.requestType = entity.getRequestType();
     this.commandName = entity.getCommandName();
     this.status = entity.getStatus();
+    this.displayStatus = entity.getDisplayStatus();
     if (entity.getRequestScheduleEntity() != null) {
       this.requestScheduleId = entity.getRequestScheduleEntity().getScheduleId();
     }
@@ -241,6 +243,8 @@ public class Request {
     requestEntity.setInputs(inputs);
     requestEntity.setRequestType(requestType);
     requestEntity.setRequestScheduleId(requestScheduleId);
+    requestEntity.setStatus(status);
+    requestEntity.setDisplayStatus(displayStatus);
     //TODO set all fields
 
     if (resourceFilters != null) {
@@ -381,6 +385,8 @@ public class Request {
         ", startTime=" + startTime +
         ", endTime=" + endTime +
         ", inputs='" + inputs + '\'' +
+        ", status='" + status + '\'' +
+        ", displayStatus='" + displayStatus + '\'' +
         ", resourceFilters='" + resourceFilters + '\'' +
         ", operationLevel='" + operationLevel + '\'' +
         ", requestType=" + requestType +

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index 4a05b32..f7ceca2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -74,6 +74,8 @@ public class Stage {
   private long stageId = -1;
   private final String logDir;
   private final String requestContext;
+  private HostRoleStatus status = HostRoleStatus.PENDING;
+  private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
   private String clusterHostInfo;
   private String commandParamsStage;
   private String hostParamsStage;
@@ -157,6 +159,8 @@ public class Stage {
     commandParamsStage = stageEntity.getCommandParamsStage();
     hostParamsStage = stageEntity.getHostParamsStage();
     commandExecutionType = stageEntity.getCommandExecutionType();
+    status = stageEntity.getStatus();
+    displayStatus = stageEntity.getDisplayStatus();
 
     List<Long> taskIds = hostRoleCommandDAO.findTaskIdsByStage(requestId, stageId);
     Collection<HostRoleCommand> commands = dbAccessor.getTasks(taskIds);
@@ -197,6 +201,8 @@ public class Stage {
     stageEntity.setCommandParamsStage(commandParamsStage);
     stageEntity.setHostParamsStage(hostParamsStage);
     stageEntity.setCommandExecutionType(commandExecutionType);
+    stageEntity.setStatus(status);
+    stageEntity.setDisplayStatus(displayStatus);
 
     for (Role role : successFactors.keySet()) {
       RoleSuccessCriteriaEntity roleSuccessCriteriaEntity = new RoleSuccessCriteriaEntity();
@@ -290,6 +296,23 @@ public class Stage {
     this.commandExecutionType = commandExecutionType;
   }
 
+  /**
+   * get current status of the stage
+   * @return {@link HostRoleStatus}
+   */
+  public HostRoleStatus getStatus() {
+    return status;
+  }
+
+  /**
+   * sets status of the stage
+   * @param status {@link HostRoleStatus}
+   */
+  public void setStatus(HostRoleStatus status) {
+    this.status = status;
+  }
+
+
   public synchronized void setStageId(long stageId) {
     if (this.stageId != -1) {
       throw new RuntimeException("Attempt to set stageId again! Not allowed.");
@@ -915,6 +938,8 @@ public class Stage {
     builder.append("clusterHostInfo="+clusterHostInfo+"\n");
     builder.append("commandParamsStage="+commandParamsStage+"\n");
     builder.append("hostParamsStage="+hostParamsStage+"\n");
+    builder.append("status="+status+"\n");
+    builder.append("displayStatus="+displayStatus+"\n");
     builder.append("Success Factors:\n");
     for (Role r : successFactors.keySet()) {
       builder.append("  role: "+r+", factor: "+successFactors.get(r)+"\n");

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
index 3c415df..32dd03d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
@@ -26,12 +26,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.ambari.server.Role;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
 import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
 import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
 
 /**
  * Status of a request resource, calculated from a set of tasks or stages.
@@ -142,7 +150,7 @@ public class CalculatedStatus {
 
     Map<HostRoleStatus, Integer> taskStatusCounts = CalculatedStatus.calculateTaskEntityStatusCounts(tasks);
 
-    HostRoleStatus status = calculateSummaryStatusOfStage(taskStatusCounts, size, skippable);
+    HostRoleStatus status = calculateSummaryStatus(taskStatusCounts, size, skippable);
 
     double progressPercent = calculateProgressPercent(taskStatusCounts, size);
 
@@ -167,7 +175,7 @@ public class CalculatedStatus {
 
       // calculate the stage status from the task status counts
       HostRoleStatus stageStatus =
-          calculateSummaryStatusOfStage(calculateTaskEntityStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
+          calculateSummaryStatus(calculateTaskEntityStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
 
       stageStatuses.add(stageStatus);
 
@@ -203,7 +211,7 @@ public class CalculatedStatus {
 
       // calculate the stage status from the task status counts
       HostRoleStatus stageStatus =
-          calculateSummaryStatusOfStage(calculateTaskStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
+          calculateSummaryStatus(calculateTaskStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
 
       stageStatuses.add(stageStatus);
 
@@ -256,6 +264,126 @@ public class CalculatedStatus {
   }
 
   /**
+   * Returns counts of tasks that are in various states.
+   *
+   * @param hostRoleCommands  collection of beans {@link HostRoleCommand}
+   *
+   * @return a map of counts of tasks keyed by the task status
+   */
+  public static Map<HostRoleStatus, Integer> calculateStatusCountsForTasks(Collection<HostRoleCommand> hostRoleCommands) {
+    Map<HostRoleStatus, Integer> counters = new HashMap<>();
+    // initialize
+    for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+      counters.put(hostRoleStatus, 0);
+    }
+    // calculate counts
+    for (HostRoleCommand hrc : hostRoleCommands) {
+      // count tasks where isCompletedState() == true as COMPLETED
+      // but don't count tasks with COMPLETED status twice
+      if (hrc.getStatus().isCompletedState() && hrc.getStatus() != HostRoleStatus.COMPLETED) {
+        // Increase total number of completed tasks;
+        counters.put(HostRoleStatus.COMPLETED, counters.get(HostRoleStatus.COMPLETED) + 1);
+      }
+      // Increment counter for particular status
+      counters.put(hrc.getStatus(), counters.get(hrc.getStatus()) + 1);
+    }
+
+    // We overwrite the value to have the sum converged
+    counters.put(HostRoleStatus.IN_PROGRESS,
+        hostRoleCommands.size() -
+            counters.get(HostRoleStatus.COMPLETED) -
+            counters.get(HostRoleStatus.QUEUED) -
+            counters.get(HostRoleStatus.PENDING));
+
+    return counters;
+  }
+
+  /**
+   * Returns map for counts of stages that are in various states.
+   *
+   * @param stages  collection of beans {@link org.apache.ambari.server.events.listeners.tasks.TaskStatusListener.ActiveStage}
+   *
+   * @return a map of counts of tasks keyed by the task status
+   */
+  public static Map<StatusType,Map<HostRoleStatus, Integer>> calculateStatusCountsForStage(Collection<TaskStatusListener.ActiveStage> stages) {
+
+    Map<StatusType,Map<HostRoleStatus, Integer>> counters = new HashMap<>();
+    for (StatusType statusType : StatusType.values()) {
+      Map <HostRoleStatus, Integer> statusMap = new HashMap<HostRoleStatus, Integer>();
+      counters.put(statusType,statusMap);
+      // initialize
+      for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+        statusMap.put(hostRoleStatus, 0);
+      }
+      for (TaskStatusListener.ActiveStage stage : stages) {
+        // count tasks where isCompletedState() == true as COMPLETED
+        // but don't count tasks with COMPLETED status twice
+        HostRoleStatus status;
+        if (statusType == StatusType.DISPLAY_STATUS) {
+          status = stage.getDisplayStatus();
+        } else {
+          status = stage.getStatus();
+        }
+        if (status.isCompletedState() && status != HostRoleStatus.COMPLETED) {
+          // Increase total number of completed tasks;
+          statusMap.put(HostRoleStatus.COMPLETED, statusMap.get(HostRoleStatus.COMPLETED) + 1);
+        }
+
+        // Increment counter for particular status
+        statusMap.put(status, statusMap.get(status) + 1);
+      }
+      statusMap.put(HostRoleStatus.IN_PROGRESS,
+          stages.size() -
+              statusMap.get(HostRoleStatus.COMPLETED) -
+              statusMap.get(HostRoleStatus.QUEUED) -
+              statusMap.get(HostRoleStatus.PENDING));
+    }
+    return counters;
+  }
+
+
+  /**
+   * Returns counts of tasks that are in various states.
+   *
+   * @param hostRoleCommands  collection of beans {@link HostRoleCommand}
+   *
+   * @return a map of counts of tasks keyed by the task status
+   */
+  public static Map<HostRoleStatus, Integer> calculateStatusCountsForTasks(Collection<HostRoleCommand> hostRoleCommands, StageEntityPK stage) {
+    Map<HostRoleStatus, Integer> counters = new HashMap<>();
+    List<HostRoleCommand> hostRoleCommandsOfStage = new ArrayList<>();
+    // initialize
+    for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+      counters.put(hostRoleStatus, 0);
+    }
+    // calculate counts
+    for (HostRoleCommand hrc : hostRoleCommands) {
+      if (stage.getStageId() == hrc.getStageId() && stage.getRequestId() == hrc.getRequestId()) {
+        // count tasks where isCompletedState() == true as COMPLETED
+        // but don't count tasks with COMPLETED status twice
+        if (hrc.getStatus().isCompletedState() && hrc.getStatus() != HostRoleStatus.COMPLETED) {
+          // Increase total number of completed tasks;
+          counters.put(HostRoleStatus.COMPLETED, counters.get(HostRoleStatus.COMPLETED) + 1);
+        }
+
+        // Increment counter for particular status
+        counters.put(hrc.getStatus(), counters.get(hrc.getStatus()) + 1);
+
+        hostRoleCommandsOfStage.add(hrc);
+      }
+    }
+
+    // We overwrite the value to have the sum converged
+    counters.put(HostRoleStatus.IN_PROGRESS,
+        hostRoleCommandsOfStage.size() -
+            counters.get(HostRoleStatus.COMPLETED) -
+            counters.get(HostRoleStatus.QUEUED) -
+            counters.get(HostRoleStatus.PENDING));
+
+    return counters;
+  }
+
+  /**
    * Returns counts of task entities that are in various states.
    *
    * @param tasks  the collection of task entities
@@ -329,7 +457,7 @@ public class CalculatedStatus {
       int total = summary.getTaskTotal();
       boolean skip = summary.isStageSkippable();
       Map<HostRoleStatus, Integer> counts = calculateStatusCounts(summary.getTaskStatuses());
-      HostRoleStatus stageStatus = calculateSummaryStatusOfStage(counts, total, skip);
+      HostRoleStatus stageStatus = calculateSummaryStatus(counts, total, skip);
       HostRoleStatus stageDisplayStatus = calculateSummaryDisplayStatus(counts, total, skip);
 
       stageStatuses.add(stageStatus);
@@ -392,7 +520,7 @@ public class CalculatedStatus {
    *
    * @return summary request status based on statuses of tasks in different states.
    */
-  public static HostRoleStatus calculateSummaryStatusOfStage(Map<HostRoleStatus, Integer> counters,
+  public static HostRoleStatus calculateSummaryStatus(Map<HostRoleStatus, Integer> counters,
       int total, boolean skippable) {
 
     // when there are 0 tasks, return COMPLETED
@@ -435,6 +563,230 @@ public class CalculatedStatus {
   }
 
   /**
+   *
+   * @param counters counts of resources that are in various states
+   * @param skippable {Boolean} <code>TRUE<code/> if failure of any of the task should not fail the stage
+   * @return {@link HostRoleStatus}
+   */
+  public static HostRoleStatus calculateSummaryStatusFromPartialSet(Map<HostRoleStatus, Integer> counters,
+                                                      boolean skippable) {
+
+    HostRoleStatus status = HostRoleStatus.PENDING;
+    // By definition, any tasks in a future stage must be held in a PENDING status.
+    if (counters.get(HostRoleStatus.HOLDING) > 0 || counters.get(HostRoleStatus.HOLDING_FAILED) > 0 || counters.get(HostRoleStatus.HOLDING_TIMEDOUT) > 0) {
+      status =  counters.get(HostRoleStatus.HOLDING) > 0 ? HostRoleStatus.HOLDING :
+          counters.get(HostRoleStatus.HOLDING_FAILED) > 0 ? HostRoleStatus.HOLDING_FAILED :
+              HostRoleStatus.HOLDING_TIMEDOUT;
+    }
+
+    // Because tasks are not skippable, guaranteed to be FAILED
+    if (counters.get(HostRoleStatus.FAILED) > 0 && !skippable) {
+      status = HostRoleStatus.FAILED;
+    }
+
+    // Because tasks are not skippable, guaranteed to be TIMEDOUT
+    if (counters.get(HostRoleStatus.TIMEDOUT) > 0  && !skippable) {
+      status = HostRoleStatus.TIMEDOUT;
+    }
+
+    int inProgressTasks =  counters.get(HostRoleStatus.QUEUED) + counters.get(HostRoleStatus.IN_PROGRESS);
+    if (inProgressTasks > 0) {
+      status = HostRoleStatus.IN_PROGRESS;
+    }
+
+    return status;
+  }
+
+
+  /**
+   *
+   * @param hostRoleCommands list of {@link HostRoleCommand} for a stage
+   * @param counters counts of resources that are in various states
+   * @param successFactors Map of roles to their successfactor for a stage
+   * @param skippable {Boolean} <code>TRUE<code/> if failure of any of the task should not fail the stage
+   * @return {@link HostRoleStatus} based on success factor
+   */
+  public static HostRoleStatus calculateStageStatus(List <HostRoleCommand> hostRoleCommands, Map<HostRoleStatus, Integer> counters, Map<Role, Float> successFactors,
+                                                    boolean skippable) {
+
+    // when there are 0 tasks, return COMPLETED
+    int total = hostRoleCommands.size();
+    if (total == 0) {
+      return HostRoleStatus.COMPLETED;
+    }
+
+    if (counters.get(HostRoleStatus.PENDING) == total) {
+      return HostRoleStatus.PENDING;
+    }
+
+    // By definition, any tasks in a future stage must be held in a PENDING status.
+    if (counters.get(HostRoleStatus.HOLDING) > 0 || counters.get(HostRoleStatus.HOLDING_FAILED) > 0 || counters.get(HostRoleStatus.HOLDING_TIMEDOUT) > 0) {
+      return counters.get(HostRoleStatus.HOLDING) > 0 ? HostRoleStatus.HOLDING :
+          counters.get(HostRoleStatus.HOLDING_FAILED) > 0 ? HostRoleStatus.HOLDING_FAILED :
+              HostRoleStatus.HOLDING_TIMEDOUT;
+    }
+
+
+    if (counters.get(HostRoleStatus.FAILED) > 0 && !skippable) {
+      Set<Role> rolesWithFailedTasks = getRolesOfFailedTasks(hostRoleCommands);
+      Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithFailedTasks, successFactors);
+      if (didStageFailed) return HostRoleStatus.FAILED;
+    }
+
+
+    if (counters.get(HostRoleStatus.TIMEDOUT) > 0  && !skippable) {
+      Set<Role> rolesWithTimedOutTasks = getRolesOfTimedOutTasks(hostRoleCommands);
+      Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithTimedOutTasks, successFactors);
+      if (didStageFailed) return HostRoleStatus.TIMEDOUT;
+    }
+
+    int numActiveTasks = counters.get(HostRoleStatus.PENDING) + counters.get(HostRoleStatus.QUEUED) + counters.get(HostRoleStatus.IN_PROGRESS);
+
+    if (numActiveTasks > 0) {
+      return HostRoleStatus.IN_PROGRESS;
+    } else if (counters.get(HostRoleStatus.ABORTED) > 0) {
+      Set<Role> rolesWithTimedOutTasks = getRolesOfAbortedTasks(hostRoleCommands);
+      Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithTimedOutTasks, successFactors);
+      if (didStageFailed) return HostRoleStatus.ABORTED;
+    }
+
+    return HostRoleStatus.COMPLETED;
+  }
+
+  /**
+   *  Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#FAILED}
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @return Set of {@link Role}
+   */
+  protected static Set<Role> getRolesOfFailedTasks(List <HostRoleCommand> hostRoleCommands) {
+    return getRolesOfTasks(hostRoleCommands, HostRoleStatus.FAILED);
+  }
+
+  /**
+   *  Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#TIMEDOUT}
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @return Set of {@link Role}
+   */
+  protected static Set<Role> getRolesOfTimedOutTasks(List <HostRoleCommand> hostRoleCommands) {
+    return getRolesOfTasks(hostRoleCommands, HostRoleStatus.TIMEDOUT);
+  }
+
+  /**
+   *  Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#ABORTED}
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @return Set of {@link Role}
+   */
+  protected static Set<Role> getRolesOfAbortedTasks(List <HostRoleCommand> hostRoleCommands) {
+    return getRolesOfTasks(hostRoleCommands, HostRoleStatus.ABORTED);
+  }
+
+  /**
+   * Get all {@link Role} any of whose tasks are in given {@code status}
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @param status {@link HostRoleStatus}
+   * @return Set of {@link Role}
+   */
+  protected static Set<Role> getRolesOfTasks(List <HostRoleCommand> hostRoleCommands, final HostRoleStatus status) {
+
+    Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+      @Override
+      public boolean apply(HostRoleCommand hrc) {
+        return hrc.getStatus() ==  status;
+      }
+    };
+
+    Function<HostRoleCommand, Role> transform = new Function<HostRoleCommand, Role>() {
+      @Override
+      public Role apply(HostRoleCommand hrc) {
+        return hrc.getRole();
+      }
+    };
+    return FluentIterable.from(hostRoleCommands)
+        .filter(predicate)
+        .transform(transform)
+        .toSet();
+  }
+
+  /**
+   *
+   * @param hostRoleCommands list of {@link HostRoleCommand} for a stage
+   * @param roles  set of roles to be checked for meeting success criteria
+   * @param successFactors  map of role to it's success factor
+   * @return {Boolean} <code>TRUE</code> if stage failed due to hostRoleCommands of any role not meeting success criteria
+   */
+  protected static Boolean didStageFailed(List<HostRoleCommand> hostRoleCommands, Set<Role> roles, Map<Role, Float> successFactors) {
+    Boolean isFailed = Boolean.FALSE;
+    for (Role role: roles) {
+      List <HostRoleCommand> hostRoleCommandsOfRole = getHostRoleCommandsOfRole(hostRoleCommands, role);
+      List <HostRoleCommand> failedHostRoleCommands =  getFailedHostRoleCommands(hostRoleCommandsOfRole);
+      float successRatioForRole = (hostRoleCommandsOfRole.size() - failedHostRoleCommands.size())/hostRoleCommandsOfRole.size();
+      Float successFactorForRole =  successFactors.get(role) == null ? 1.0f : successFactors.get(role);
+      if (successRatioForRole  < successFactorForRole) {
+        isFailed = Boolean.TRUE;
+        break;
+      }
+    }
+    return isFailed;
+  }
+
+  /**
+   *
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @param role {@link Role}
+   * @return list of {@link HostRoleCommand} that belongs to {@link Role}
+   */
+  protected static List<HostRoleCommand> getHostRoleCommandsOfRole(List <HostRoleCommand> hostRoleCommands, final Role role) {
+    Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+      @Override
+      public boolean apply(HostRoleCommand hrc) {
+        return hrc.getRole() ==  role;
+      }
+    };
+    return FluentIterable.from(hostRoleCommands)
+        .filter(predicate)
+        .toList();
+  }
+
+  /**
+   *
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @return list of {@link HostRoleCommand} with failed status
+   */
+  protected static List<HostRoleCommand> getFailedHostRoleCommands(List <HostRoleCommand> hostRoleCommands) {
+    Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+      @Override
+      public boolean apply(HostRoleCommand hrc) {
+        return hrc.getStatus().isFailedAndNotSkippableState();
+      }
+    };
+    return FluentIterable.from(hostRoleCommands)
+        .filter(predicate)
+        .toList();
+  }
+
+
+  /**
+   * Calculate overall status from collection of statuses
+   * @param hostRoleStatuses list of all stage's {@link HostRoleStatus}
+   * @return overall status of a request
+   */
+  public static HostRoleStatus getOverallStatusForRequest (Collection<HostRoleStatus> hostRoleStatuses) {
+    Map<HostRoleStatus, Integer> statusCount = calculateStatusCounts(hostRoleStatuses);
+    return calculateSummaryStatus(statusCount, hostRoleStatuses.size(), false);
+  }
+
+  /**
+   * Calculate overall display status from collection of statuses
+   * @param hostRoleStatuses list of all stage's {@link HostRoleStatus}
+   * @return overall display status of a request
+   */
+  public static HostRoleStatus getOverallDisplayStatusForRequest (Collection<HostRoleStatus> hostRoleStatuses) {
+    Map<HostRoleStatus, Integer> statusCount = calculateStatusCounts(hostRoleStatuses);
+    return calculateSummaryDisplayStatus(statusCount, hostRoleStatuses.size(), false);
+  }
+
+
+  /**
    * Calculate overall status of an upgrade.
    *
    * @param counters   counts of resources that are in various states
@@ -444,7 +796,7 @@ public class CalculatedStatus {
    */
   protected static HostRoleStatus calculateSummaryStatusOfUpgrade(
       Map<HostRoleStatus, Integer> counters, int total) {
-    return calculateSummaryStatusOfStage(counters, total, false);
+    return calculateSummaryStatus(counters, total, false);
   }
 
   /**
@@ -456,10 +808,28 @@ public class CalculatedStatus {
    *
    * @return summary request status based on statuses of tasks in different states.
    */
-  protected static HostRoleStatus calculateSummaryDisplayStatus(
+  public static HostRoleStatus calculateSummaryDisplayStatus(
       Map<HostRoleStatus, Integer> counters, int total, boolean skippable) {
-    return counters.get(HostRoleStatus.SKIPPED_FAILED) > 0 ? HostRoleStatus.SKIPPED_FAILED :
-           counters.get(HostRoleStatus.FAILED) > 0 ? HostRoleStatus.FAILED:
-           calculateSummaryStatusOfStage(counters, total, skippable);
+    return counters.get(HostRoleStatus.FAILED) > 0 ? HostRoleStatus.FAILED:
+           counters.get(HostRoleStatus.TIMEDOUT) > 0 ? HostRoleStatus.TIMEDOUT:
+           counters.get(HostRoleStatus.SKIPPED_FAILED) > 0 ? HostRoleStatus.SKIPPED_FAILED :
+           calculateSummaryStatus(counters, total, skippable);
+  }
+
+  /**
+   * kind of {@link HostRoleStatus} persisted by {@link Stage} and {@link Request}
+   */
+  public enum StatusType {
+    STATUS("status"),
+    DISPLAY_STATUS("display_status");
+    private String value;
+
+    StatusType(String value) {
+      this.value = value;
+    }
+
+    public String getValue() {
+      return value;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
new file mode 100644
index 0000000..9d73122
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
+
+/**
+ * The {@link TaskCreateEvent} is to be fired every time
+ * when any request is to be tracked as running requests in
+ * {@link TaskStatusListener}
+ * This usually happens when new request is created by user action or
+ * when ambari-server starts with some stages in non-completed state
+ */
+public class TaskCreateEvent extends TaskEvent {
+
+
+  /**
+   * Constructor.
+   *
+   * @param hostRoleCommandList
+   *          all hostRoleCommands for all requests
+   */
+  public TaskCreateEvent(List<HostRoleCommand> hostRoleCommandList) {
+    super(hostRoleCommandList);
+  }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
new file mode 100644
index 0000000..ca351d7
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * {@link TaskEvent} is the base for all events related to create/update of tasks
+ * that might result in update of stage/request status
+ */
+public class TaskEvent {
+  /**
+   * List of {@link HostRoleCommand}
+   */
+  private List<HostRoleCommand> hostRoleCommands;
+
+  /**
+   * Constructor.
+   *
+   * @param hostRoleCommands
+   *          list of HRCs which have been reported back by the agents.
+   */
+  public TaskEvent(List<HostRoleCommand> hostRoleCommands) {
+    this.hostRoleCommands = hostRoleCommands;
+  }
+
+  /**
+   *  Gets hostRoleCommands that created event
+   * @return List of {@link HostRoleCommand}
+   */
+  public List<HostRoleCommand> getHostRoleCommands() {
+    return hostRoleCommands;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    String hostRoleCommands = StringUtils.join(this.hostRoleCommands, ", ");
+    StringBuilder buffer = new StringBuilder("TaskEvent{");
+    buffer.append("hostRoleCommands=").append(hostRoleCommands);
+    buffer.append("}");
+    return buffer.toString();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
new file mode 100644
index 0000000..84f67f5
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+
+/**
+ * The {@link TaskUpdateEvent} is to be fired every time
+ * when host role commands are merged to the database
+ */
+public class TaskUpdateEvent extends TaskEvent{
+
+  public TaskUpdateEvent(List<HostRoleCommand> hostRoleCommandList) {
+    super(hostRoleCommandList);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
new file mode 100644
index 0000000..bc146ef
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -0,0 +1,609 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.listeners.tasks;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.controller.internal.CalculatedStatus;
+import org.apache.ambari.server.events.TaskCreateEvent;
+import org.apache.ambari.server.events.TaskUpdateEvent;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.dao.StageDAO;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * The {@link TaskStatusListener} is used to constantly update status of running Stages and Requests
+ * {@link TaskUpdateEvent} listens for all incoming events. These events are fired when either host role commands are created/updated
+ * This listener maintains map of all running tasks, stages and requests
+ */
+@Singleton
+@EagerSingleton
+public class TaskStatusListener {
+  /**
+   * Logger.
+   */
+  private final static Logger LOG = LoggerFactory.getLogger(TaskStatusListener.class);
+
+  /**
+   * Maps task id to its {@link HostRoleCommand} Object.
+   * Map has entries of all tasks of all active(ongoing) requests
+   * NOTE: Partial loading of tasks for any request may lead to incorrect update of the request status
+   */
+  private Map<Long,HostRoleCommand> activeTasksMap = new ConcurrentHashMap<>();
+
+  /**
+   * Maps all ongoing request id to its {@link ActiveRequest}
+   */
+  private Map<Long, ActiveRequest> activeRequestMap = new ConcurrentHashMap<>();
+
+  /**
+   * Maps {@link StageEntityPK} of all ongoing requests to its {@link ActiveStage}
+   * with updated {@link ActiveStage#status} and {@link ActiveStage#displayStatus}.
+   */
+  private Map<StageEntityPK, ActiveStage> activeStageMap = new ConcurrentHashMap<>();
+
+  private StageDAO stageDAO;
+
+  private RequestDAO requestDAO;
+
+
+  @Inject
+  public TaskStatusListener(TaskEventPublisher taskEventPublisher, StageDAO stageDAO, RequestDAO requestDAO) {
+    this.stageDAO = stageDAO;
+    this.requestDAO = requestDAO;
+    taskEventPublisher.register(this);
+  }
+
+  public Map<Long,HostRoleCommand> getActiveTasksMap() {
+    return activeTasksMap;
+  }
+
+  public Map<Long, ActiveRequest> getActiveRequestMap() {
+    return activeRequestMap;
+  }
+
+  public Map<StageEntityPK, ActiveStage> getActiveStageMap() {
+    return activeStageMap;
+  }
+
+  /**
+   * On receiving task update event, update related entries of the running request, stage and task in the maps
+   * Event containing newly created tasks is expected to contain complete set of all tasks for a request
+   * @param event Consumes {@link TaskUpdateEvent}.
+   */
+  @Subscribe
+  public void onTaskUpdateEvent(TaskUpdateEvent event) {
+    LOG.debug("Received task update event {}", event);
+    List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
+    List<HostRoleCommand>  hostRoleCommandWithReceivedStatus =  new ArrayList<>();
+    Set<StageEntityPK> stagesWithReceivedTaskStatus = new HashSet<>();
+    Set<Long> requestIdsWithReceivedTaskStatus =  new HashSet<>();
+    for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) {
+      Long reportedTaskId = hostRoleCommand.getTaskId();
+      HostRoleCommand activeTask =  activeTasksMap.get(reportedTaskId);
+      if (activeTask == null) {
+        LOG.error(String.format("Received update for a task %d which is not being tracked as running task", reportedTaskId));
+      } else  {
+        hostRoleCommandWithReceivedStatus.add(hostRoleCommand);
+        StageEntityPK stageEntityPK = new StageEntityPK();
+        stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+        stageEntityPK.setStageId(hostRoleCommand.getStageId());
+        stagesWithReceivedTaskStatus.add(stageEntityPK);
+        requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
+      }
+    }
+
+    updateActiveTasksMap(hostRoleCommandWithReceivedStatus);
+    Boolean didAnyStageStatusUpdated = updateActiveStagesStatus(stagesWithReceivedTaskStatus, hostRoleCommandListAll);
+    // Presumption: If there is no update in any of the running stage's status
+    // then none of the running request status needs to be updated
+    if (didAnyStageStatusUpdated) {
+      updateActiveRequestsStatus(requestIdsWithReceivedTaskStatus, stagesWithReceivedTaskStatus);
+    }
+
+  }
+
+  /**
+   * On receiving task create event, create entries in the running request, stage and task in the maps
+   * @param event Consumes {@link TaskCreateEvent}.
+   */
+  @Subscribe
+  public void onTaskCreateEvent(TaskCreateEvent event) {
+    LOG.debug("Received task create event {}", event);
+    List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
+
+    for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) {
+      activeTasksMap.put(hostRoleCommand.getTaskId(), hostRoleCommand);
+      addStagePK(hostRoleCommand);
+      addRequestId(hostRoleCommand);
+    }
+  }
+
+
+  /**
+   * update changed host role command status
+   * @param hostRoleCommandWithReceivedStatus list of host role commands reported
+   */
+  private void updateActiveTasksMap(List<HostRoleCommand> hostRoleCommandWithReceivedStatus) {
+    for (HostRoleCommand hostRoleCommand : hostRoleCommandWithReceivedStatus) {
+      Long taskId = hostRoleCommand.getTaskId();
+      activeTasksMap.put(taskId , hostRoleCommand);
+    }
+  }
+
+
+  /**
+   * Adds new {@link StageEntityPK} to be tracked as running stage in {@link #activeStageMap}
+   * @param hostRoleCommand newly created {@link HostRoleCommand} in {@link #activeTasksMap}
+   */
+  private void addStagePK(HostRoleCommand hostRoleCommand) {
+    StageEntityPK stageEntityPK = new StageEntityPK();
+    stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+    stageEntityPK.setStageId(hostRoleCommand.getStageId());
+    if (activeStageMap.containsKey(stageEntityPK)) {
+      activeStageMap.get(stageEntityPK).addTaskId(hostRoleCommand.getTaskId());
+    } else {
+      StageEntity stageEntity = stageDAO.findByPK(stageEntityPK);
+      // Stage entity of the hostrolecommand should be persisted before publishing task create event
+      assert stageEntity != null;
+      Map<Role, Float> successFactors = new HashMap<>();
+      Collection<RoleSuccessCriteriaEntity> roleSuccessCriteriaEntities = stageEntity.getRoleSuccessCriterias();
+      for (RoleSuccessCriteriaEntity successCriteriaEntity : roleSuccessCriteriaEntities) {
+        successFactors.put(successCriteriaEntity.getRole(), successCriteriaEntity.getSuccessFactor().floatValue());
+      }
+      Set<Long> taskIdSet =  Sets.newHashSet(hostRoleCommand.getTaskId());
+
+      ActiveStage reportedStage = new ActiveStage(stageEntity.getStatus(), stageEntity.getDisplayStatus(),
+          successFactors, stageEntity.isSkippable(), taskIdSet);
+      activeStageMap.put(stageEntityPK, reportedStage);
+    }
+  }
+
+  /**
+   * update and persist all changed stage status
+   * @param stagesWithReceivedTaskStatus set of stages that has received task status
+   * @param hostRoleCommandListAll list of all task updates received from agent
+   * @return  <code>true</code> if any of the stage has changed it's existing status;
+   *          <code>false</code> otherwise
+   */
+  private Boolean updateActiveStagesStatus(final Set<StageEntityPK> stagesWithReceivedTaskStatus, List<HostRoleCommand> hostRoleCommandListAll) {
+    Boolean didAnyStageStatusUpdated = Boolean.FALSE;
+    for (StageEntityPK reportedStagePK : stagesWithReceivedTaskStatus) {
+      if (activeStageMap.containsKey(reportedStagePK)) {
+        Boolean didStatusChange = updateStageStatus(reportedStagePK, hostRoleCommandListAll);
+        if (didStatusChange) {
+          ActiveStage reportedStage = activeStageMap.get(reportedStagePK);
+          stageDAO.updateStatus(reportedStagePK, reportedStage.getStatus(), reportedStage.getDisplayStatus());
+          didAnyStageStatusUpdated = Boolean.TRUE;
+        }
+      } else {
+        LOG.error(String.format("Received update for a task whose stage is not being tracked as running stage: %s", reportedStagePK.toString()));
+      }
+
+    }
+    return didAnyStageStatusUpdated;
+  }
+
+  /**
+   * Adds new request id to be tracked as running request in {@link #activeRequestMap}
+   * @param hostRoleCommand newly created {@link HostRoleCommand} in {@link #activeTasksMap}
+   */
+  private void addRequestId(HostRoleCommand hostRoleCommand) {
+    Long requestId = hostRoleCommand.getRequestId();
+    StageEntityPK stageEntityPK = new StageEntityPK();
+    stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+    stageEntityPK.setStageId(hostRoleCommand.getStageId());
+    if (activeRequestMap.containsKey(requestId)) {
+      activeRequestMap.get(requestId).addStageEntityPK(stageEntityPK);
+    } else {
+      RequestEntity requestEntity = requestDAO.findByPK(requestId);
+      // Request entity of the hostrolecommand should be persisted before publishing task create event
+      assert requestEntity != null;
+      Set<StageEntityPK> stageEntityPKs =  Sets.newHashSet(stageEntityPK);
+      ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(), stageEntityPKs);
+      activeRequestMap.put(requestId, request);
+    }
+  }
+
+
+  /**
+   * update and persist changed request status
+   * @param requestIdsWithReceivedTaskStatus set of request ids that has received tasks status
+   * @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status
+   */
+  private void updateActiveRequestsStatus(final Set<Long> requestIdsWithReceivedTaskStatus, Set<StageEntityPK> stagesWithChangedTaskStatus) {
+    for (Long reportedRequestId : requestIdsWithReceivedTaskStatus) {
+      if (activeRequestMap.containsKey(reportedRequestId)) {
+        ActiveRequest request =  activeRequestMap.get(reportedRequestId);
+        Boolean didStatusChange = updateRequestStatus(reportedRequestId, stagesWithChangedTaskStatus);
+        if (didStatusChange) {
+          requestDAO.updateStatus(reportedRequestId, request.getStatus(), request.getDisplayStatus());
+        }
+        if (request.isCompleted() && isAllTasksCompleted(reportedRequestId)) {
+          // Request is considered ton have been finished if request status and all of it's tasks status are completed
+          // in that case, request and it's stages
+          // and tasks should no longer be tracked as active(running)
+          removeRequestStageAndTasks(reportedRequestId);
+        }
+      } else {
+        LOG.error(String.format("Received update for a task whose request %d is not being tracked as running request", reportedRequestId));
+      }
+
+    }
+  }
+
+  /**
+   *
+   * @param requestId request Id
+   * @return  <code>false</code> if any of the task belonging to requestId has incomplete status
+   *          <code>true</code> otherwise
+   */
+  private Boolean isAllTasksCompleted(Long requestId) {
+    Boolean result = Boolean.TRUE;
+    for (Map.Entry<Long, HostRoleCommand> entry : activeTasksMap.entrySet()) {
+      if (entry.getValue().getRequestId() == requestId && !entry.getValue().getStatus().isCompletedState()) {
+        result = Boolean.FALSE;
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Removes entries from {@link #activeTasksMap},{@link #activeStageMap} and {@link #activeRequestMap}
+   * @param requestId request id whose entry and it's stage and task entries is to be removed
+   */
+  private void removeRequestStageAndTasks(Long requestId) {
+    removeTasks(requestId);
+    removeStages(requestId);
+    removeRequest(requestId);
+  }
+
+
+  /**
+   * Filters list of {@link Stage} to list of {@link StageEntityPK}
+   * @param requestID requestId
+   * @return  list of StageEntityPK
+   */
+  private List<StageEntityPK> getAllStageEntityPKForRequest(final Long requestID) {
+    Predicate<StageEntityPK> predicate = new Predicate<StageEntityPK>() {
+      @Override
+      public boolean apply(StageEntityPK stageEntityPK) {
+        return stageEntityPK.getRequestId().equals(requestID);
+      }
+    };
+    return  FluentIterable.from(activeStageMap.keySet())
+        .filter(predicate)
+        .toList();
+  }
+
+
+
+  /**
+   * Returns the computed status of the stage from the status of it's host role commands
+   * @param stagePK {@link StageEntityPK} primary key for the stage entity
+   * @param hostRoleCommandListAll list of all hrc received whose status has been received from agent
+   * @return {@link Boolean} <code>TRUE</code> if status of the given stage changed.
+   */
+  private Boolean updateStageStatus(final StageEntityPK stagePK, List<HostRoleCommand> hostRoleCommandListAll) {
+    Boolean didAnyStatusChanged = Boolean.FALSE;
+    ActiveStage reportedStage = activeStageMap.get(stagePK);
+    HostRoleStatus stageCurrentStatus = reportedStage.getStatus();
+    HostRoleStatus stageCurrentDisplayStatus = reportedStage.getDisplayStatus();
+
+
+    // if stage is already marked to be completed then do not calculate reported status from host role commands
+    // Presumption: There will be no status transition of the host role command from one completed state to another
+    if (!stageCurrentDisplayStatus.isCompletedState() || !stageCurrentStatus.isCompletedState()) {
+      Map<HostRoleStatus, Integer> receivedTaskStatusCount = CalculatedStatus.calculateStatusCountsForTasks(hostRoleCommandListAll, stagePK);
+      HostRoleStatus statusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(receivedTaskStatusCount, reportedStage.getSkippable());
+      HostRoleStatus displayStatusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(receivedTaskStatusCount, Boolean.FALSE);
+      if (statusFromPartialSet == HostRoleStatus.PENDING || displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+        Function<Long,HostRoleCommand> transform = new Function<Long,HostRoleCommand>(){
+          @Override
+          public HostRoleCommand apply(Long taskId) {
+            return activeTasksMap.get(taskId);
+          }
+        };
+
+        List<HostRoleCommand> activeHostRoleCommandsOfStage = FluentIterable.from(reportedStage.getTaskIds())
+            .transform(transform).toList();
+        Map<HostRoleStatus, Integer> statusCount = CalculatedStatus.calculateStatusCountsForTasks(activeHostRoleCommandsOfStage);
+        if (displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+          // calculate and get new display status of the stage as per the new status of received host role commands
+          HostRoleStatus display_status = CalculatedStatus.calculateSummaryDisplayStatus(statusCount, activeHostRoleCommandsOfStage.size(), reportedStage.getSkippable());
+          if (display_status != stageCurrentDisplayStatus) {
+            reportedStage.setDisplayStatus(display_status);
+            didAnyStatusChanged = Boolean.TRUE;
+          }
+
+        } else {
+          reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+          didAnyStatusChanged = Boolean.TRUE;
+        }
+
+        if (statusFromPartialSet == HostRoleStatus.PENDING) {
+          // calculate status of the stage as per the new status of received host role commands
+          HostRoleStatus status = CalculatedStatus.calculateStageStatus(activeHostRoleCommandsOfStage, statusCount, reportedStage.getSuccessFactors(), reportedStage.getSkippable());
+          if (status != stageCurrentStatus) {
+            reportedStage.setStatus(status);
+            didAnyStatusChanged = Boolean.TRUE;
+          }
+        } else {
+          reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+          didAnyStatusChanged = Boolean.TRUE;
+        }
+      } else {
+        reportedStage.setStatus(statusFromPartialSet);
+        reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+        didAnyStatusChanged = Boolean.TRUE;
+      }
+    }
+
+    return didAnyStatusChanged;
+  }
+
+  /**
+   *
+   * @param requestId {@link Request} whose status is to be updated
+   * @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status
+   * @return {Boolean} <code>TRUE</code> if request status has changed from existing
+   */
+  private Boolean updateRequestStatus (final Long requestId, Set<StageEntityPK> stagesWithChangedTaskStatus) {
+    Boolean didStatusChanged = Boolean.FALSE;
+    ActiveRequest request = activeRequestMap.get(requestId);
+    HostRoleStatus requestCurrentStatus = request.getStatus();
+    HostRoleStatus requestCurrentDisplayStatus = request.getDisplayStatus();
+
+    if (!requestCurrentDisplayStatus.isCompletedState() || !requestCurrentStatus.isCompletedState()) {
+      List <ActiveStage>  activeStagesWithChangesTaskStatus = new ArrayList<>();
+      for (StageEntityPK stageEntityPK:stagesWithChangedTaskStatus) {
+        if (requestId.equals(stageEntityPK.getRequestId())) {
+          ActiveStage activeStage = activeStageMap.get(stageEntityPK);
+          activeStagesWithChangesTaskStatus.add(activeStage);
+        }
+      }
+
+
+      Map<CalculatedStatus.StatusType,Map<HostRoleStatus, Integer>> stageStatusCountFromPartialSet = CalculatedStatus.calculateStatusCountsForStage(activeStagesWithChangesTaskStatus);
+      HostRoleStatus statusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(stageStatusCountFromPartialSet.get(CalculatedStatus.StatusType.STATUS), Boolean.FALSE);
+      HostRoleStatus displayStatusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(stageStatusCountFromPartialSet.get(CalculatedStatus.StatusType.DISPLAY_STATUS), Boolean.FALSE);
+
+      if (statusFromPartialSet == HostRoleStatus.PENDING || displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+        List <ActiveStage> allActiveStages = new ArrayList<>();
+        for (StageEntityPK stageEntityPK:request.getStageEntityPks()) {
+          ActiveStage activeStage = activeStageMap.get(stageEntityPK);
+          allActiveStages.add(activeStage);
+        }
+        Map<CalculatedStatus.StatusType,Map<HostRoleStatus, Integer>> stageStatusCount = CalculatedStatus.calculateStatusCountsForStage(allActiveStages);
+
+        if (displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+          // calculate and get new display status of the stage as per the new status of received host role commands
+
+          HostRoleStatus display_status = CalculatedStatus.calculateSummaryDisplayStatus(stageStatusCount.get(CalculatedStatus.StatusType.DISPLAY_STATUS), allActiveStages.size(), false);
+          if (display_status != requestCurrentDisplayStatus) {
+            request.setDisplayStatus(display_status);
+            didStatusChanged = Boolean.TRUE;
+          }
+
+        } else {
+          request.setDisplayStatus(displayStatusFromPartialSet);
+          didStatusChanged = Boolean.TRUE;
+        }
+
+        if (statusFromPartialSet == HostRoleStatus.PENDING) {
+          // calculate status of the stage as per the new status of received host role commands
+          HostRoleStatus status = CalculatedStatus.calculateSummaryStatus(stageStatusCount.get(CalculatedStatus.StatusType.STATUS), allActiveStages.size(), false);
+          if (status != requestCurrentStatus) {
+            request.setStatus(status);
+            didStatusChanged = Boolean.TRUE;
+          }
+        } else {
+          request.setDisplayStatus(displayStatusFromPartialSet);
+          didStatusChanged = Boolean.TRUE;
+        }
+      } else {
+        request.setStatus(statusFromPartialSet);
+        request.setDisplayStatus(displayStatusFromPartialSet);
+        didStatusChanged = Boolean.TRUE;
+      }
+    }
+
+    return didStatusChanged;
+  }
+
+
+  /**
+   * Removes list of {@link HostRoleCommand} entries from {@link #activeTasksMap}
+   * @param requestId request id
+   */
+  private void removeTasks(Long requestId) {
+    Iterator<Map.Entry<Long, HostRoleCommand>> iter = activeTasksMap.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<Long, HostRoleCommand> entry = iter.next();
+      HostRoleCommand hrc = entry.getValue();
+      if (hrc.getRequestId() == requestId) {
+        if (!hrc.getStatus().isCompletedState()) {
+          LOG.error(String.format("Task %d should have been completed before being removed from running task cache(activeTasksMap)", hrc.getTaskId()));
+        }
+        iter.remove();
+      }
+    }
+  }
+
+
+  /**
+   * Removes list of {@link StageEntityPK} entries from {@link #activeStageMap}
+   * @param requestId request Id
+   */
+  private void removeStages(Long requestId) {
+    List <StageEntityPK> stageEntityPKs = getAllStageEntityPKForRequest(requestId);
+    for (StageEntityPK stageEntityPK: stageEntityPKs) {
+      activeStageMap.remove(stageEntityPK);
+    }
+  }
+
+
+  /**
+   * Removes request id from {@link #activeRequestMap}
+   * @param requestId request Id
+   */
+  private void removeRequest(Long requestId) {
+    activeRequestMap.remove(requestId);
+  }
+
+
+  /**
+   * This class stores {@link Request#status} and {@link Request#displayStatus} information
+   * This information is cached for all running {@link Request} at {@link #activeRequestMap}
+   */
+  protected class ActiveRequest {
+    private HostRoleStatus status;
+    private HostRoleStatus displayStatus;
+    private Set <StageEntityPK> stageEntityPks;
+
+    public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks) {
+      this.status = status;
+      this.displayStatus = displayStatus;
+      this.stageEntityPks = stageEntityPks;
+    }
+
+    public HostRoleStatus getStatus() {
+      return status;
+    }
+
+    public void setStatus(HostRoleStatus status) {
+      this.status = status;
+    }
+
+    public HostRoleStatus getDisplayStatus() {
+      return displayStatus;
+    }
+
+    public void setDisplayStatus(HostRoleStatus displayStatus) {
+      this.displayStatus = displayStatus;
+    }
+
+    public Boolean isCompleted() {
+      return status.isCompletedState() && displayStatus.isCompletedState();
+    }
+
+    public Set <StageEntityPK> getStageEntityPks() {
+      return stageEntityPks;
+    }
+
+    public void addStageEntityPK(StageEntityPK stageEntityPK) {
+      stageEntityPks.add(stageEntityPK);
+    }
+
+  }
+
+  /**
+   * This class stores information needed to determine {@link Stage#status} and {@link Stage#displayStatus}
+   * This information is cached for all {@link Stage} of all running {@link Request} at {@link #activeStageMap}
+   */
+  public class ActiveStage {
+    private HostRoleStatus status;
+    private HostRoleStatus displayStatus;
+    private Boolean skippable;
+    private Set <Long> taskIds;
+
+    //Map of roles to successFactors for this stage. Default is 1 i.e. 100%
+    private Map<Role, Float> successFactors = new HashMap<Role, Float>();
+
+    public ActiveStage(HostRoleStatus status, HostRoleStatus displayStatus,
+                       Map<Role, Float> successFactors, Boolean skippable, Set<Long> taskIds) {
+      this.status = status;
+      this.displayStatus = displayStatus;
+      this.successFactors =  successFactors;
+      this.skippable = skippable;
+      this.taskIds = taskIds;
+    }
+
+    public HostRoleStatus getStatus() {
+      return status;
+    }
+
+    public void setStatus(HostRoleStatus status) {
+      this.status = status;
+    }
+
+    public HostRoleStatus getDisplayStatus() {
+      return displayStatus;
+    }
+
+    public void setDisplayStatus(HostRoleStatus displayStatus) {
+      this.displayStatus = displayStatus;
+    }
+
+    public Boolean getSkippable() {
+      return skippable;
+    }
+
+    public void setSkippable(Boolean skippable) {
+      this.skippable = skippable;
+    }
+
+    public Map<Role, Float> getSuccessFactors() {
+      return successFactors;
+    }
+
+    public void setSuccessFactors(Map<Role, Float> successFactors) {
+      this.successFactors = successFactors;
+    }
+
+    public Set <Long> getTaskIds() {
+      return taskIds;
+    }
+
+    public void addTaskId(Long taskId) {
+      taskIds.add(taskId);
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
new file mode 100644
index 0000000..fdc41e5
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import org.apache.ambari.server.events.TaskEvent;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link TaskEventPublisher} is used to publish instances of
+ * {@link TaskEvent} to any {@link com.google.common.eventbus.Subscribe} interested.
+ * It uses a single-threaded, serial {@link EventBus}.
+ */
+@Singleton
+public class TaskEventPublisher {
+
+  /**
+   * A single threaded, synchronous event bus for processing task events.
+   */
+  private final EventBus m_eventBus = new EventBus("ambari-task-report-event-bus");
+
+
+  /**
+   * Publishes the specified event to all registered listeners that
+   * {@link Subscribe} to  {@link TaskEvent} instances.
+   *
+   * @param event {@link TaskEvent}
+   */
+  public void publish(TaskEvent event) {
+    m_eventBus.post(event);
+  }
+
+  /**
+   * Register a listener to receive events. The listener should use the
+   * {@link Subscribe} annotation.
+   *
+   * @param object
+   *          the listener to receive events.
+   */
+  public void register(Object object) {
+    m_eventBus.register(object);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 02c4091..e834045 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -40,6 +40,8 @@ import org.apache.ambari.annotations.TransactionalLock;
 import org.apache.ambari.annotations.TransactionalLock.LockArea;
 import org.apache.ambari.annotations.TransactionalLock.LockType;
 import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.api.query.JpaPredicateVisitor;
 import org.apache.ambari.server.api.query.JpaSortBuilder;
@@ -49,6 +51,9 @@ import org.apache.ambari.server.controller.spi.Predicate;
 import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.SortRequest;
 import org.apache.ambari.server.controller.utilities.PredicateHelper;
+import org.apache.ambari.server.events.TaskCreateEvent;
+import org.apache.ambari.server.events.TaskUpdateEvent;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.TransactionalLocks;
 import org.apache.ambari.server.orm.entities.HostEntity;
@@ -58,9 +63,11 @@ import org.apache.ambari.server.orm.entities.StageEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
@@ -144,6 +151,13 @@ public class HostRoleCommandDAO {
   @Inject
   private Configuration configuration;
 
+
+  @Inject
+  HostRoleCommandFactory hostRoleCommandFactory;
+
+  @Inject
+  private TaskEventPublisher taskEventPublisher;
+
   /**
    * Used to ensure that methods which rely on the completion of
    * {@link Transactional} can detect when they are able to run.
@@ -629,11 +643,17 @@ public class HostRoleCommandDAO {
   @Transactional
   @TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
   public HostRoleCommandEntity merge(HostRoleCommandEntity entity) {
+    entity = mergeWithoutPublishEvent(entity);
+    publishTaskUpdateEvent(Collections.singletonList(hostRoleCommandFactory.createExisting(entity)));
+    return entity;
+  }
+
+  @Transactional
+  @TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
+  public HostRoleCommandEntity mergeWithoutPublishEvent(HostRoleCommandEntity entity) {
     EntityManager entityManager = entityManagerProvider.get();
     entity = entityManager.merge(entity);
-
     invalidateHostRoleCommandStatusSummaryCache(entity);
-
     return entity;
   }
 
@@ -667,10 +687,51 @@ public class HostRoleCommandDAO {
     }
 
     invalidateHostRoleCommandStatusSummaryCache(requestsToInvalidate);
-
+    publishTaskUpdateEvent(getHostRoleCommands(entities));
     return managedList;
   }
 
+  /**
+   *
+   * @param entities
+   */
+  public List<HostRoleCommand> getHostRoleCommands(Collection<HostRoleCommandEntity> entities) {
+    Function<HostRoleCommandEntity, HostRoleCommand> transform = new Function<HostRoleCommandEntity, HostRoleCommand> () {
+      @Override
+      public HostRoleCommand apply(HostRoleCommandEntity entity) {
+        return hostRoleCommandFactory.createExisting(entity);
+      }
+    };
+    return FluentIterable.from(entities)
+        .transform(transform)
+        .toList();
+
+  }
+
+  /**
+   *
+   * @param hostRoleCommands
+   */
+  public void publishTaskUpdateEvent(List<HostRoleCommand> hostRoleCommands) {
+    if (!hostRoleCommands.isEmpty()) {
+      TaskUpdateEvent taskUpdateEvent = new TaskUpdateEvent(hostRoleCommands);
+      taskEventPublisher.publish(taskUpdateEvent);
+    }
+  }
+
+  /**
+   *
+   * @param hostRoleCommands
+   */
+  public void publishTaskCreateEvent(List<HostRoleCommand> hostRoleCommands) {
+    if (!hostRoleCommands.isEmpty()) {
+      TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
+      taskEventPublisher.publish(taskCreateEvent);
+    }
+  }
+
+
+
   @Transactional
   @TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
   public void remove(HostRoleCommandEntity entity) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 1c4d0a3..2696f66 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -144,6 +144,14 @@ public class RequestDAO {
   }
 
   @Transactional
+  public void updateStatus(long requestId, HostRoleStatus status, HostRoleStatus displayStatus) {
+    RequestEntity requestEntity = findByPK(requestId);
+    requestEntity.setStatus(status);
+    requestEntity.setDisplayStatus(displayStatus);
+    merge(requestEntity);
+  }
+
+  @Transactional
   public void create(RequestEntity requestEntity) {
     entityManagerProvider.get().persist(requestEntity);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index d2f899f..126468a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.orm.dao;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -173,11 +174,15 @@ public class StageDAO {
     return daoUtils.selectList(query);
   }
 
+  /**
+   *
+   * @param statuses {@link HostRoleStatus}
+   * @return list of stage entities
+   */
   @RequiresSession
-  public List<StageEntity> findByCommandStatuses(
-      Collection<HostRoleStatus> statuses) {
+  public List<StageEntity> findByStatuses(Collection<HostRoleStatus> statuses) {
     TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(
-        "StageEntity.findByCommandStatuses", StageEntity.class);
+        "StageEntity.findByStatuses", StageEntity.class);
 
     query.setParameter("statuses", statuses);
     return daoUtils.selectList(query);
@@ -280,8 +285,8 @@ public class StageDAO {
    *          the stage entity to update
    * @param desiredStatus
    *          the desired stage status
-   * @param controller
-   *          the ambari management controller
+   * @param actionManager
+   *          the action manager
    *
    * @throws java.lang.IllegalArgumentException
    *           if the transition to the desired status is not a legal transition
@@ -301,9 +306,11 @@ public class StageDAO {
     if (desiredStatus == HostRoleStatus.ABORTED) {
       actionManager.cancelRequest(stage.getRequestId(), "User aborted.");
     } else {
+      List <HostRoleCommandEntity> hrcWithChangedStatus = new ArrayList<HostRoleCommandEntity>();
       for (HostRoleCommandEntity hostRoleCommand : tasks) {
         HostRoleStatus hostRoleStatus = hostRoleCommand.getStatus();
         if (hostRoleStatus.equals(currentStatus)) {
+          hrcWithChangedStatus.add(hostRoleCommand);
           hostRoleCommand.setStatus(desiredStatus);
 
           if (desiredStatus == HostRoleStatus.PENDING) {
@@ -316,6 +323,21 @@ public class StageDAO {
   }
 
   /**
+   *
+   * @param stageEntityPK  {@link StageEntityPK}
+   * @param status {@link HostRoleStatus}
+   * @param displayStatus {@link HostRoleStatus}
+   */
+  @Transactional
+  public void updateStatus(StageEntityPK stageEntityPK, HostRoleStatus status, HostRoleStatus displayStatus) {
+    StageEntity stageEntity = findByPK(stageEntityPK);
+    stageEntity.setStatus(status);
+    stageEntity.setDisplayStatus(displayStatus);
+    merge(stageEntity);
+  }
+
+
+  /**
    * Determine whether or not it is valid to transition from this stage status
    * to the given status.
    *

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index 74271b9..a809295 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -105,9 +105,9 @@ public class HostRoleCommandEntity {
   @Basic
   private Integer exitcode = 0;
 
-  @Column(name = "status")
+  @Column(name = "status", nullable = false)
   @Enumerated(EnumType.STRING)
-  private HostRoleStatus status;
+  private HostRoleStatus status = HostRoleStatus.PENDING;
 
   @Column(name = "std_error")
   @Lob