You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ja...@apache.org on 2017/02/17 17:34:42 UTC

[1/2] ambari git commit: AMBARI-18868. Stage and Request status should be persisted in the database. (jaimin)

Repository: ambari
Updated Branches:
  refs/heads/trunk dd174f417 -> 0fc7a6671


http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
index 7944d21..f19aa72 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
@@ -69,9 +69,28 @@ public class RequestEntity {
   @Enumerated(value = EnumType.STRING)
   private RequestType requestType;
 
-  @Column(name = "status")
+  /**
+   * This is the logical status of the request and
+   * represents if the intent of the request has been accomplished or not
+   *
+   *  Status calculated by calculating {@link StageEntity#status} of all belonging stages
+   *
+   */
+  @Column(name = "status", nullable = false)
   @Enumerated(value = EnumType.STRING)
-  private HostRoleStatus status;
+  private HostRoleStatus status = HostRoleStatus.PENDING;
+
+  /**
+   * This status informs if any of the underlying tasks
+   * have faced any type of failures {@link HostRoleStatus#isFailedState()}
+   *
+   * Status calculated by only taking into account
+   * all belonging {@link HostRoleCommandEntity#status} (or {@link StageEntity#status})
+   *
+   */
+  @Column(name = "display_status", nullable = false)
+  @Enumerated(value = EnumType.STRING)
+  private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
 
   @Basic
   @Column(name = "create_time", nullable = false)
@@ -89,7 +108,7 @@ public class RequestEntity {
   @Column(name = "exclusive_execution", insertable = true, updatable = true, nullable = false)
   private Integer exclusive = 0;
 
-  @OneToMany(mappedBy = "request")
+  @OneToMany(mappedBy = "request", cascade = CascadeType.REMOVE)
   private Collection<StageEntity> stages;
 
   @OneToMany(mappedBy = "requestEntity", cascade = CascadeType.ALL)
@@ -207,14 +226,38 @@ public class RequestEntity {
     this.commandName = commandName;
   }
 
+  /**
+   *  get status for the request
+   * @return {@link HostRoleStatus}
+   */
   public HostRoleStatus getStatus() {
     return status;
   }
 
+  /**
+   * sets status for the request
+   * @param status {@link HostRoleStatus}
+   */
   public void setStatus(HostRoleStatus status) {
     this.status = status;
   }
 
+  /**
+   * get display status for the request
+   * @return  {@link HostRoleStatus}
+   */
+  public HostRoleStatus getDisplayStatus() {
+    return displayStatus;
+  }
+
+  /**
+   * sets display status for the request
+   * @param displayStatus {@link HostRoleStatus}
+   */
+  public void setDisplayStatus(HostRoleStatus displayStatus) {
+    this.displayStatus = displayStatus;
+  }
+
   public RequestScheduleEntity getRequestScheduleEntity() {
     return requestScheduleEntity;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index f9c8810..f68338f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -39,17 +39,21 @@ import javax.persistence.OneToMany;
 import javax.persistence.Table;
 
 import org.apache.ambari.server.actionmanager.CommandExecutionType;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
 
 @Entity
 @Table(name = "stage")
 @IdClass(org.apache.ambari.server.orm.entities.StageEntityPK.class)
 @NamedQueries({
     @NamedQuery(
-        name = "StageEntity.findByCommandStatuses",
-        query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.requestId, stage.stageId"),
+        name = "StageEntity.findByStatuses",
+        query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses ORDER BY stage.requestId, stage.stageId"),
+    @NamedQuery(
+        name = "StageEntity.findByPK",
+        query = "SELECT stage from StageEntity stage WHERE stage.requestId = :requestId AND stage.stageId = :stageId"),
     @NamedQuery(
         name = "StageEntity.findByRequestIdAndCommandStatuses",
-        query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.requestId = :requestId AND roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.stageId"),
+        query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses AND stage.requestId = :requestId ORDER BY stage.stageId"),
     @NamedQuery(
         name = "StageEntity.findIdsByRequestId",
         query = "SELECT stage.stageId FROM StageEntity stage WHERE stage.requestId = :requestId ORDER BY stage.stageId ASC") })
@@ -110,6 +114,32 @@ public class StageEntity {
   @Basic
   private byte[] hostParamsStage;
 
+  /**
+   * This status informs if the advanced criteria for the stage success
+   * as established at the time of stage creation has been accomplished or not
+   *
+   *  Status calculated by taking into account following
+   *  a) {@link #roleSuccessCriterias}
+   *  b) {@link #skippable}
+   *  c) {@link HostRoleCommandEntity#autoSkipOnFailure}
+   *  d) {@link HostRoleCommandEntity#status}
+   *
+   */
+  @Column(name = "status",  nullable = false)
+  @Enumerated(EnumType.STRING)
+  private HostRoleStatus status = HostRoleStatus.PENDING;
+
+  /**
+   * This status informs if any of the underlying tasks
+   * have faced any type of failures {@link HostRoleStatus#isFailedState()}
+   *
+   * Status calculated by only taking into account {@link HostRoleCommandEntity#status}
+   *
+   */
+  @Column(name = "display_status", nullable = false)
+  @Enumerated(EnumType.STRING)
+  private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
+
   @ManyToOne
   @JoinColumn(name = "request_id", referencedColumnName = "request_id", nullable = false)
   private RequestEntity request;
@@ -195,6 +225,40 @@ public class StageEntity {
     this.commandExecutionType = commandExecutionType;
   }
 
+  /**
+   * get status for the stage
+   * @return {@link HostRoleStatus}
+   */
+  public HostRoleStatus getStatus() {
+    return status;
+  }
+
+  /**
+   * sets status for the stage
+   * @param status {@link HostRoleStatus}
+   */
+  public void setStatus(HostRoleStatus status) {
+    this.status = status;
+  }
+
+  /**
+   * get display status for the stage
+   * @return  {@link HostRoleStatus}
+   */
+  public HostRoleStatus getDisplayStatus() {
+    return displayStatus;
+  }
+
+
+  /**
+   * sets display status for the stage
+   * @param displayStatus {@link HostRoleStatus}
+   */
+  public void setDisplayStatus(HostRoleStatus displayStatus) {
+    this.displayStatus = displayStatus;
+  }
+
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java
index 9ca0470..34d175c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java
@@ -68,4 +68,16 @@ public class StageEntityPK implements Serializable {
     result = 31 * result + (stageId != null ? stageId.hashCode() : 0);
     return result;
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder("StageEntityPK{");
+    buffer.append("stageId=").append(getStageId());
+    buffer.append("requestId=").append(getRequestId());
+    buffer.append("}");
+    return buffer.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
index 4f90ef3..0267a5e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
@@ -19,11 +19,25 @@ package org.apache.ambari.server.upgrade;
 
 
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
+import javax.persistence.EntityManager;
+
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.StageFactory;
 import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.internal.CalculatedStatus;
+import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.dao.DaoUtils;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
@@ -41,6 +55,12 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog {
    */
   private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog300.class);
 
+  private static final String STAGE_TABLE = "stage";
+  private static final String STAGE_STATUS_COLUMN = "status";
+  private static final String STAGE_DISPLAY_STATUS_COLUMN = "display_status";
+  private static final String REQUEST_TABLE = "request";
+  private static final String REQUEST_DISPLAY_STATUS_COLUMN = "display_status";
+
   @Inject
   DaoUtils daoUtils;
 
@@ -83,6 +103,16 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog {
    */
   @Override
   protected void executeDDLUpdates() throws AmbariException, SQLException {
+    updateStageTable();
+  }
+
+  protected void updateStageTable() throws SQLException {
+    dbAccessor.addColumn(STAGE_TABLE,
+        new DBAccessor.DBColumnInfo(STAGE_STATUS_COLUMN, String.class, 255, HostRoleStatus.PENDING, false));
+    dbAccessor.addColumn(STAGE_TABLE,
+        new DBAccessor.DBColumnInfo(STAGE_DISPLAY_STATUS_COLUMN, String.class, 255, HostRoleStatus.PENDING, false));
+    dbAccessor.addColumn(REQUEST_TABLE,
+        new DBAccessor.DBColumnInfo(REQUEST_DISPLAY_STATUS_COLUMN, String.class, 255, HostRoleStatus.PENDING, false));
   }
 
   /**
@@ -99,6 +129,7 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog {
   protected void executeDMLUpdates() throws AmbariException, SQLException {
     addNewConfigurationsFromXml();
     showHcatDeletedUserMessage();
+    setStatusOfStagesAndRequests();
   }
 
   protected void showHcatDeletedUserMessage() {
@@ -122,4 +153,43 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog {
 
   }
 
+  protected void setStatusOfStagesAndRequests() {
+    executeInTransaction(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          RequestDAO requestDAO = injector.getInstance(RequestDAO.class);
+          StageFactory stageFactory = injector.getInstance(StageFactory.class);
+          EntityManager em = getEntityManagerProvider().get();
+          List<RequestEntity> requestEntities= requestDAO.findAll();
+          for (RequestEntity requestEntity: requestEntities) {
+            Collection<StageEntity> stageEntities= requestEntity.getStages();
+            List <HostRoleStatus> stageDisplayStatuses = new ArrayList<>();
+            List <HostRoleStatus> stageStatuses = new ArrayList<>();
+            for (StageEntity stageEntity: stageEntities) {
+              Stage stage = stageFactory.createExisting(stageEntity);
+              List<HostRoleCommand> hostRoleCommands = stage.getOrderedHostRoleCommands();
+              Map<HostRoleStatus, Integer> statusCount = CalculatedStatus.calculateStatusCountsForTasks(hostRoleCommands);
+              HostRoleStatus stageDisplayStatus = CalculatedStatus.calculateSummaryDisplayStatus(statusCount, hostRoleCommands.size(), stage.isSkippable());
+              HostRoleStatus stageStatus = CalculatedStatus.calculateStageStatus(hostRoleCommands, statusCount, stage.getSuccessFactors(), stage.isSkippable());
+              stageEntity.setStatus(stageStatus);
+              stageStatuses.add(stageStatus);
+              stageEntity.setDisplayStatus(stageDisplayStatus);
+              stageDisplayStatuses.add(stageDisplayStatus);
+              em.merge(stageEntity);
+            }
+            HostRoleStatus requestStatus = CalculatedStatus.getOverallStatusForRequest(stageStatuses);
+            requestEntity.setStatus(requestStatus);
+            HostRoleStatus requestDisplayStatus = CalculatedStatus.getOverallDisplayStatusForRequest(stageDisplayStatuses);
+            requestEntity.setDisplayStatus(requestDisplayStatus);
+            em.merge(requestEntity);
+          }
+        } catch (Exception e) {
+          LOG.warn("Setting status for stages and Requests threw exception. ", e);
+        }
+      }
+    });
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
index f007b53..6c7cb09 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
@@ -345,7 +345,8 @@ CREATE TABLE request (
   request_type VARCHAR(255),
   request_schedule_id BIGINT,
   start_time BIGINT NOT NULL,
-  status VARCHAR(255),
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_request PRIMARY KEY (request_id),
   CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
 
@@ -361,6 +362,8 @@ CREATE TABLE stage (
   command_params BLOB,
   host_params BLOB,
   command_execution_type VARCHAR(32) NOT NULL DEFAULT 'STAGE',
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_stage PRIMARY KEY (stage_id, request_id),
   CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
 
@@ -378,7 +381,7 @@ CREATE TABLE host_role_command (
   start_time BIGINT NOT NULL,
   original_start_time BIGINT NOT NULL,
   end_time BIGINT,
-  status VARCHAR(255),
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   auto_skip_on_failure SMALLINT DEFAULT 0 NOT NULL,
   std_error BLOB,
   std_out BLOB,

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
index f6cb896..ebb0da0 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
@@ -356,7 +356,8 @@ CREATE TABLE request (
   request_context VARCHAR(255),
   request_type VARCHAR(255),
   start_time BIGINT NOT NULL,
-  status VARCHAR(255),
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_request PRIMARY KEY (request_id),
   CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
 
@@ -372,6 +373,8 @@ CREATE TABLE stage (
   command_params LONGBLOB,
   host_params LONGBLOB,
   command_execution_type VARCHAR(32) NOT NULL DEFAULT 'STAGE',
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_stage PRIMARY KEY (stage_id, request_id),
   CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
 
@@ -390,7 +393,7 @@ CREATE TABLE host_role_command (
   start_time BIGINT NOT NULL,
   original_start_time BIGINT NOT NULL,
   end_time BIGINT,
-  status VARCHAR(100),
+  status VARCHAR(100) NOT NULL DEFAULT 'PENDING',
   auto_skip_on_failure SMALLINT DEFAULT 0 NOT NULL,
   std_error LONGBLOB,
   std_out LONGBLOB,

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
index 19253e8..884eb06 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
@@ -336,7 +336,8 @@ CREATE TABLE request (
   request_context VARCHAR(255),
   request_type VARCHAR(255),
   start_time NUMBER(19) NOT NULL,
-  status VARCHAR(255),
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_request PRIMARY KEY (request_id),
   CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
 
@@ -352,6 +353,8 @@ CREATE TABLE stage (
   command_params BLOB,
   host_params BLOB,
   command_execution_type VARCHAR2(32) DEFAULT 'STAGE' NOT NULL,
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_stage PRIMARY KEY (stage_id, request_id),
   CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
 
@@ -370,7 +373,7 @@ CREATE TABLE host_role_command (
   start_time NUMBER(19) NOT NULL,
   original_start_time NUMBER(19) NOT NULL,
   end_time NUMBER(19),
-  status VARCHAR2(255) NULL,
+  status VARCHAR2(255) NOT NULL DEFAULT 'PENDING',
   auto_skip_on_failure NUMBER(1) DEFAULT 0 NOT NULL,
   std_error BLOB NULL,
   std_out BLOB NULL,

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
index b13a9e3..7e57d9f 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
@@ -345,7 +345,8 @@ CREATE TABLE request (
   request_type VARCHAR(255),
   request_schedule_id BIGINT,
   start_time BIGINT NOT NULL,
-  status VARCHAR(255),
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_request PRIMARY KEY (request_id),
   CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
 
@@ -361,6 +362,8 @@ CREATE TABLE stage (
   command_params BYTEA,
   host_params BYTEA,
   command_execution_type VARCHAR(32) DEFAULT 'STAGE' NOT NULL,
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_stage PRIMARY KEY (stage_id, request_id),
   CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
 
@@ -378,7 +381,7 @@ CREATE TABLE host_role_command (
   start_time BIGINT NOT NULL,
   original_start_time BIGINT NOT NULL,
   end_time BIGINT,
-  status VARCHAR(255),
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   auto_skip_on_failure SMALLINT DEFAULT 0 NOT NULL,
   std_error BYTEA,
   std_out BYTEA,

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
index cf2954a..2c4bd55 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
@@ -334,7 +334,8 @@ CREATE TABLE request (
   request_context VARCHAR(255),
   request_type VARCHAR(255),
   start_time NUMERIC(19) NOT NULL,
-  status VARCHAR(255),
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_request PRIMARY KEY (request_id),
   CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
 
@@ -350,6 +351,8 @@ CREATE TABLE stage (
   command_params IMAGE,
   host_params IMAGE,
   command_execution_type VARCHAR(32) NOT NULL DEFAULT 'STAGE',
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_stage PRIMARY KEY (stage_id, request_id),
   CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
 
@@ -368,7 +371,7 @@ CREATE TABLE host_role_command (
   start_time NUMERIC(19) NOT NULL,
   original_start_time NUMERIC(19) NOT NULL,
   end_time NUMERIC(19),
-  status VARCHAR(255),
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   auto_skip_on_failure SMALLINT DEFAULT 0 NOT NULL,
   std_error IMAGE,
   std_out IMAGE,

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
index 16c269a..a86a767 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
@@ -350,7 +350,8 @@ CREATE TABLE request (
   request_type VARCHAR(255),
   request_schedule_id BIGINT,
   start_time BIGINT NOT NULL,
-  status VARCHAR(255),
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_request PRIMARY KEY CLUSTERED (request_id),
   CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
 
@@ -366,6 +367,8 @@ CREATE TABLE stage (
   command_params VARBINARY(MAX),
   host_params VARBINARY(MAX),
   command_execution_type VARCHAR(32) NOT NULL DEFAULT 'STAGE',
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+  display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   CONSTRAINT PK_stage PRIMARY KEY CLUSTERED (stage_id, request_id),
   CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
 
@@ -383,7 +386,7 @@ CREATE TABLE host_role_command (
   start_time BIGINT NOT NULL,
   original_start_time BIGINT NOT NULL,
   end_time BIGINT,
-  status VARCHAR(255),
+  status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
   auto_skip_on_failure SMALLINT DEFAULT 0 NOT NULL,
   std_error VARBINARY(max),
   std_out VARBINARY(max),

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index 177ac70..edc5683 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -55,6 +55,7 @@ import org.apache.ambari.server.utils.CommandUtils;
 import org.apache.ambari.server.utils.StageUtils;
 import org.easymock.EasyMock;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -67,8 +68,6 @@ import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import com.google.inject.util.Modules;
 
-import junit.framework.Assert;
-
 public class TestActionDBAccessorImpl {
   private static final Logger log = LoggerFactory.getLogger(TestActionDBAccessorImpl.class);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 6519126..526ca7c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyCollectionOf;
+import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -100,9 +101,11 @@ import org.apache.ambari.server.utils.StageUtils;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -119,8 +122,6 @@ import com.google.inject.Injector;
 import com.google.inject.Provider;
 import com.google.inject.persist.UnitOfWork;
 
-import junit.framework.Assert;
-
 public class TestActionScheduler {
 
   private static final Logger log = LoggerFactory.getLogger(TestActionScheduler.class);
@@ -207,6 +208,8 @@ public class TestActionScheduler {
     when(host.getHostName()).thenReturn(hostname);
 
     ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
     List<Stage> stages = new ArrayList<Stage>();
     Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
       "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
@@ -222,7 +225,7 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm,
-        10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, null, null);
+        10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -314,6 +317,8 @@ public class TestActionScheduler {
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
     when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
 
@@ -335,7 +340,7 @@ public class TestActionScheduler {
 
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, null, null);
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
 
@@ -405,6 +410,8 @@ public class TestActionScheduler {
 
     when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     doAnswer(new Answer<Void>() {
       @Override
@@ -508,6 +515,8 @@ public class TestActionScheduler {
 
     when(db.getCommandsInProgressCount()).thenReturn(stages.size());
     when(db.getStagesInProgress()).thenReturn(stages);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     doAnswer(new Answer<Collection<HostRoleCommandEntity>>() {
       @Override
@@ -543,7 +552,7 @@ public class TestActionScheduler {
     // Make sure the NN install doesn't timeout
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
     scheduler.setTaskTimeoutAdjustment(false);
 
     int cycleCount=0;
@@ -606,6 +615,8 @@ public class TestActionScheduler {
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -658,7 +669,7 @@ public class TestActionScheduler {
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -721,6 +732,8 @@ public class TestActionScheduler {
     stages.add(stage12);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -735,7 +748,7 @@ public class TestActionScheduler {
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
         unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf,
-        entityManagerProviderMock, (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        entityManagerProviderMock, hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     scheduler.doWork();
 
@@ -763,6 +776,8 @@ public class TestActionScheduler {
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -816,7 +831,7 @@ public class TestActionScheduler {
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION").isCompletedState()
@@ -976,6 +991,8 @@ public class TestActionScheduler {
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1028,7 +1045,7 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -1124,6 +1141,8 @@ public class TestActionScheduler {
             RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1136,7 +1155,7 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
@@ -1214,6 +1233,8 @@ public class TestActionScheduler {
             RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1228,7 +1249,7 @@ public class TestActionScheduler {
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
             new HostsMap((String) null),
         unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
 
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
@@ -1289,6 +1310,8 @@ public class TestActionScheduler {
 
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1303,7 +1326,7 @@ public class TestActionScheduler {
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
         unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
@@ -1544,6 +1567,8 @@ public class TestActionScheduler {
     stage.setLastAttemptTime(host2, Role.HBASE_CLIENT.toString(), now);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1616,7 +1641,7 @@ public class TestActionScheduler {
     ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3,
         new HostsMap((String) null),
         unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     scheduler.doWork();
 
@@ -1729,6 +1754,8 @@ public class TestActionScheduler {
             "host1", "cluster1", Role.HDFS_CLIENT, RoleCommand.UPGRADE, Service.Type.HDFS, 4, 2, 1));
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1808,7 +1835,7 @@ public class TestActionScheduler {
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
         unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     ActionManager am = new ActionManager(db, requestFactory, scheduler);
 
@@ -1976,6 +2003,8 @@ public class TestActionScheduler {
     when(host.getHostName()).thenReturn(hostname);
 
     ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -1993,7 +2022,7 @@ public class TestActionScheduler {
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
         10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -2135,6 +2164,8 @@ public class TestActionScheduler {
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -2187,7 +2218,7 @@ public class TestActionScheduler {
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -2467,6 +2498,8 @@ public class TestActionScheduler {
     when(host3.getHostName()).thenReturn(hostname);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
     when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
     when(db.getStagesInProgress()).thenReturn(stagesInProgress);
 
@@ -2542,7 +2575,7 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
@@ -2706,6 +2739,8 @@ public class TestActionScheduler {
     command.setStatus(HostRoleStatus.FAILED);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
 
     RequestEntity request = mock(RequestEntity.class);
     when(request.isExclusive()).thenReturn(false);
@@ -2776,7 +2811,7 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
-        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+        hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/alerts/AmbariPerformanceRunnableTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/alerts/AmbariPerformanceRunnableTest.java b/ambari-server/src/test/java/org/apache/ambari/server/alerts/AmbariPerformanceRunnableTest.java
index 7b1a5a2..facd802 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/alerts/AmbariPerformanceRunnableTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/alerts/AmbariPerformanceRunnableTest.java
@@ -18,12 +18,13 @@
 
 package org.apache.ambari.server.alerts;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.Map;
 import javax.persistence.EntityManager;
 
 import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
 import org.apache.ambari.server.alerts.AmbariPerformanceRunnable.PerformanceArea;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.internal.ClusterResourceProvider;
@@ -287,6 +289,7 @@ public class AmbariPerformanceRunnableTest {
       binder.bind(AlertsDAO.class).toInstance(createNiceMock(AlertsDAO.class));
       binder.bind(EntityManager.class).toInstance(createNiceMock(EntityManager.class));
       binder.bind(ActionManager.class).toInstance(createNiceMock(ActionManager.class));
+      binder.bind(HostRoleCommandFactory.class).toInstance(createNiceMock(HostRoleCommandFactory.class));
       binder.bind(HostRoleCommandDAO.class).toInstance(createNiceMock(HostRoleCommandDAO.class));
       binder.bind(AmbariManagementController.class).toInstance(createNiceMock(AmbariManagementController.class));
       binder.bind(AlertDefinitionFactory.class).toInstance(createNiceMock(AlertDefinitionFactory.class));

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
index a0701b6..f8b57e5 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
@@ -624,7 +624,6 @@ public class UpgradeResourceProviderTest {
     RequestEntity requestEntity = new RequestEntity();
     requestEntity.setRequestId(2L);
     requestEntity.setClusterId(cluster.getClusterId());
-    requestEntity.setStatus(HostRoleStatus.PENDING);
     requestEntity.setStages(new ArrayList<StageEntity>());
     requestDao.create(requestEntity);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeSummaryResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeSummaryResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeSummaryResourceProviderTest.java
index 619e367..f009767 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeSummaryResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeSummaryResourceProviderTest.java
@@ -205,7 +205,6 @@ public class UpgradeSummaryResourceProviderTest {
     RequestEntity requestEntity = new RequestEntity();
     requestEntity.setRequestId(upgradeRequestId);
     requestEntity.setClusterId(cluster.getClusterId());
-    requestEntity.setStatus(HostRoleStatus.PENDING);
     requestDAO.create(requestEntity);
 
     // Create the stage and add it to the request

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
new file mode 100644
index 0000000..64a731b
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.listeners.tasks;
+
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyObject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.ExecutionCommandWrapperFactory;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.events.TaskCreateEvent;
+import org.apache.ambari.server.events.TaskUpdateEvent;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
+import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.dao.StageDAO;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.inject.Inject;
+
+
+public class TaskStatusListenerTest extends EasyMockSupport {
+
+  private TaskEventPublisher publisher = new TaskEventPublisher();
+
+  @Inject
+  private ExecutionCommandDAO executionCommandDAO;
+
+  @Inject
+  private ExecutionCommandWrapperFactory ecwFactory;
+
+
+  @Test
+  public void testOnTaskUpdateEvent() {
+    List<HostRoleCommand> hostRoleCommands = new ArrayList<HostRoleCommand>();
+    ServiceComponentHostEvent serviceComponentHostEvent = createNiceMock(ServiceComponentHostEvent.class);
+    HostDAO hostDAO = createNiceMock(HostDAO.class);
+    replayAll();
+
+    int hostRoleCommandSize = 3;
+    int hrcCounter = 1;
+    for (int stageCounter = 0; stageCounter < 2; stageCounter++) {
+      for (int i = 1; i <= hostRoleCommandSize; i++,hrcCounter++) {
+        String hostname = "hostname-" + hrcCounter;
+        HostRoleCommand hostRoleCommand = new HostRoleCommand(hostname, Role.DATANODE,
+            serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory);
+        hostRoleCommand.setStatus(HostRoleStatus.PENDING);
+        hostRoleCommand.setRequestId(1L);
+        hostRoleCommand.setStageId(stageCounter);
+        hostRoleCommand.setTaskId(hrcCounter);
+        hostRoleCommands.add(hostRoleCommand);
+      }
+    }
+
+    HostRoleStatus hostRoleStatus = HostRoleStatus.PENDING;
+    StageDAO stageDAO = createNiceMock(StageDAO.class);
+    RequestDAO requestDAO = createNiceMock(RequestDAO.class);
+    StageEntity stageEntity = createNiceMock(StageEntity.class);
+    RequestEntity requestEntity = createNiceMock(RequestEntity.class);
+    EasyMock.expect(stageEntity.getStatus()).andReturn(hostRoleStatus).anyTimes();;
+    EasyMock.expect(stageEntity.getDisplayStatus()).andReturn(hostRoleStatus).anyTimes();
+    EasyMock.expect(stageEntity.isSkippable()).andReturn(Boolean.FALSE).anyTimes();;
+    EasyMock.expect(stageEntity.getRoleSuccessCriterias()).andReturn(Collections.<RoleSuccessCriteriaEntity>emptyList()).anyTimes();
+    EasyMock.expect(stageDAO.findByPK(anyObject(StageEntityPK.class))).andReturn(stageEntity).anyTimes();
+    EasyMock.expect(requestEntity.getStatus()).andReturn(hostRoleStatus).anyTimes();;
+    EasyMock.expect(requestEntity.getDisplayStatus()).andReturn(hostRoleStatus).anyTimes();
+    EasyMock.expect(requestDAO.findByPK(anyLong())).andReturn(requestEntity).anyTimes();
+
+    requestDAO.updateStatus(1L,HostRoleStatus.COMPLETED,HostRoleStatus.SKIPPED_FAILED);
+    EasyMock.expectLastCall().times(1);
+
+
+
+    EasyMock.replay(stageEntity);
+    EasyMock.replay(requestEntity);
+    EasyMock.replay(stageDAO);
+    EasyMock.replay(requestDAO);
+
+    TaskCreateEvent event = new TaskCreateEvent(hostRoleCommands);
+    TaskStatusListener listener = new TaskStatusListener(publisher,stageDAO,requestDAO);
+
+    Assert.assertTrue(listener.getActiveTasksMap().isEmpty());
+    Assert.assertTrue(listener.getActiveStageMap().isEmpty());
+    Assert.assertTrue(listener.getActiveRequestMap().isEmpty());
+
+    listener.onTaskCreateEvent(event);
+    Assert.assertEquals(listener.getActiveTasksMap().size(),6);
+    Assert.assertEquals(listener.getActiveStageMap().size(),2);
+    Assert.assertEquals(listener.getActiveRequestMap().size(),1);
+    Assert.assertEquals(listener.getActiveRequestMap().get(1L).getStatus(), hostRoleStatus);
+
+
+
+    // update of a task status of IN_PROGRESS should cascade into an update of request status
+    String hostname = "hostname-1";
+    HostRoleCommand hostRoleCommand = new HostRoleCommand(hostname, Role.DATANODE,
+        serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory);
+    hostRoleCommand.setStatus(HostRoleStatus.IN_PROGRESS);
+    hostRoleCommand.setRequestId(1L);
+    hostRoleCommand.setStageId(0);
+    hostRoleCommand.setTaskId(1L);
+    listener.onTaskUpdateEvent(new TaskUpdateEvent(Collections.singletonList(hostRoleCommand)));
+    Assert.assertEquals(HostRoleStatus.IN_PROGRESS, listener.getActiveRequestMap().get(1L).getStatus());
+
+    // update of all tasks status of skip_failed and  completed states should cascade into request status of completed
+    // and request display status to be of skip_failed
+    hrcCounter = 1;
+    List<HostRoleCommand> finalHostRoleCommands = new ArrayList<HostRoleCommand>();
+    HostRoleStatus finalHostRoleStatus = HostRoleStatus.COMPLETED;
+    for (int stageCounter = 0; stageCounter < 2; stageCounter++) {
+      for (int i = 1; i <= hostRoleCommandSize; i++,hrcCounter++) {
+        String finalHostname = "hostname-" + hrcCounter;
+        HostRoleCommand finalHostRoleCommand = new HostRoleCommand(finalHostname, Role.DATANODE,
+            serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory);
+        finalHostRoleCommand.setStatus(finalHostRoleStatus);
+        finalHostRoleCommand.setRequestId(1L);
+        finalHostRoleCommand.setStageId(stageCounter);
+        finalHostRoleCommand.setTaskId(hrcCounter);
+        finalHostRoleCommands.add(finalHostRoleCommand);
+      }
+      finalHostRoleStatus = HostRoleStatus.SKIPPED_FAILED;
+    }
+
+    listener.onTaskUpdateEvent(new TaskUpdateEvent(finalHostRoleCommands));
+
+    //Once request status and display status are in completed state, it should no longer be tracked by TaskStatusListener
+    Assert.assertNull(listener.getActiveRequestMap().get(1L));
+
+    // verify request status = completed and display_status = skip_failed
+    verifyAll();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
index b1c10f5..1709da8 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
@@ -38,6 +38,7 @@ import javax.persistence.EntityManager;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.H2DatabaseCleaner;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
 import org.apache.ambari.server.actionmanager.RequestFactory;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.AmbariCustomCommandExecutionHelper;
@@ -980,6 +981,7 @@ public class ConfigHelperTest {
           bind(Clusters.class).toInstance(createNiceMock(Clusters.class));
           bind(ClusterController.class).toInstance(clusterController);
           bind(StackManagerFactory.class).toInstance(createNiceMock(StackManagerFactory.class));
+          bind(HostRoleCommandFactory.class).toInstance(createNiceMock(HostRoleCommandFactory.class));
           bind(HostRoleCommandDAO.class).toInstance(createNiceMock(HostRoleCommandDAO.class));
         }
       });

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterEffectiveVersionTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterEffectiveVersionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterEffectiveVersionTest.java
index 9d339e2..d3c8acf 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterEffectiveVersionTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterEffectiveVersionTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import javax.persistence.EntityManager;
 
 import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
 import org.apache.ambari.server.actionmanager.RequestFactory;
 import org.apache.ambari.server.actionmanager.StageFactory;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -68,6 +69,7 @@ import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.eclipse.jetty.server.SessionManager;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -83,8 +85,6 @@ import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 
-import junit.framework.Assert;
-
 /**
  * Tests that cluster effective version is calcualted correctly during upgrades.
  */
@@ -256,6 +256,7 @@ public class ClusterEffectiveVersionTest extends EasyMockSupport {
       binder.bind(DBAccessor.class).toInstance(EasyMock.createNiceMock(DBAccessor.class));
       binder.bind(EntityManager.class).toInstance(EasyMock.createNiceMock(EntityManager.class));
       binder.bind(ActionManager.class).toInstance(EasyMock.createNiceMock(ActionManager.class));
+      binder.bind(HostRoleCommandFactory.class).toInstance(EasyMock.createNiceMock(HostRoleCommandFactory.class));
       binder.bind(HostRoleCommandDAO.class).toInstance(EasyMock.createNiceMock(HostRoleCommandDAO.class));
       binder.bind(AmbariManagementController.class).toInstance(EasyMock.createNiceMock(AmbariManagementController.class));
       binder.bind(ClusterController.class).toInstance(EasyMock.createNiceMock(ClusterController.class));

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
index ed95b0b..e699e49 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
@@ -251,7 +251,6 @@ public class RetryUpgradeActionServiceTest {
     RequestEntity requestEntity = new RequestEntity();
     requestEntity.setRequestId(upgradeRequestId);
     requestEntity.setClusterId(cluster.getClusterId());
-    requestEntity.setStatus(HostRoleStatus.PENDING);
     requestDAO.create(requestEntity);
 
     // Create the stage and add it to the request

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
index d7979e8..ec001ec 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
@@ -31,15 +31,18 @@ public class UpgradeCatalog300Test {
   public void testExecuteDMLUpdates() throws Exception {
     Method addNewConfigurationsFromXml = AbstractUpgradeCatalog.class.getDeclaredMethod("addNewConfigurationsFromXml");
     Method showHcatDeletedUserMessage = UpgradeCatalog300.class.getDeclaredMethod("showHcatDeletedUserMessage");
+    Method setStatusOfStagesAndRequests = UpgradeCatalog300.class.getDeclaredMethod("setStatusOfStagesAndRequests");
 
    UpgradeCatalog300 upgradeCatalog300 = createMockBuilder(UpgradeCatalog300.class)
             .addMockedMethod(showHcatDeletedUserMessage)
             .addMockedMethod(addNewConfigurationsFromXml)
+            .addMockedMethod(setStatusOfStagesAndRequests)
             .createMock();
 
 
     upgradeCatalog300.addNewConfigurationsFromXml();
     upgradeCatalog300.showHcatDeletedUserMessage();
+    upgradeCatalog300.setStatusOfStagesAndRequests();
 
 
     replay(upgradeCatalog300);
@@ -49,4 +52,21 @@ public class UpgradeCatalog300Test {
     verify(upgradeCatalog300);
   }
 
+  @Test
+  public void testExecuteDDLUpdates() throws Exception {
+    Method updateStageTable = UpgradeCatalog300.class.getDeclaredMethod("updateStageTable");
+    UpgradeCatalog300 upgradeCatalog300 = createMockBuilder(UpgradeCatalog300.class)
+        .addMockedMethod(updateStageTable)
+        .createMock();
+
+    upgradeCatalog300.updateStageTable();
+
+    replay(upgradeCatalog300);
+
+    upgradeCatalog300.executeDDLUpdates();
+
+    verify(upgradeCatalog300);
+  }
+
+
 }


[2/2] ambari git commit: AMBARI-18868. Stage and Request status should be persisted in the database. (jaimin)

Posted by ja...@apache.org.
AMBARI-18868. Stage and Request status should be persisted in the database. (jaimin)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0fc7a667
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0fc7a667
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0fc7a667

Branch: refs/heads/trunk
Commit: 0fc7a6671feb10dc0e8475dc4878942cf19f46cc
Parents: dd174f4
Author: Jaimin Jetly <ja...@hortonworks.com>
Authored: Fri Feb 17 09:31:10 2017 -0800
Committer: Jaimin Jetly <ja...@hortonworks.com>
Committed: Fri Feb 17 09:31:10 2017 -0800

----------------------------------------------------------------------
 .../actionmanager/ActionDBAccessorImpl.java     | 108 ++--
 .../server/actionmanager/ActionScheduler.java   |  31 +
 .../ambari/server/actionmanager/Request.java    |   8 +-
 .../ambari/server/actionmanager/Stage.java      |  25 +
 .../controller/internal/CalculatedStatus.java   | 390 +++++++++++-
 .../ambari/server/events/TaskCreateEvent.java   |  48 ++
 .../apache/ambari/server/events/TaskEvent.java  |  66 ++
 .../ambari/server/events/TaskUpdateEvent.java   |  35 ++
 .../listeners/tasks/TaskStatusListener.java     | 609 +++++++++++++++++++
 .../events/publishers/TaskEventPublisher.java   |  62 ++
 .../server/orm/dao/HostRoleCommandDAO.java      |  67 +-
 .../ambari/server/orm/dao/RequestDAO.java       |   8 +
 .../apache/ambari/server/orm/dao/StageDAO.java  |  32 +-
 .../orm/entities/HostRoleCommandEntity.java     |   4 +-
 .../server/orm/entities/RequestEntity.java      |  49 +-
 .../ambari/server/orm/entities/StageEntity.java |  70 ++-
 .../server/orm/entities/StageEntityPK.java      |  12 +
 .../server/upgrade/UpgradeCatalog300.java       |  70 +++
 .../main/resources/Ambari-DDL-Derby-CREATE.sql  |   7 +-
 .../main/resources/Ambari-DDL-MySQL-CREATE.sql  |   7 +-
 .../main/resources/Ambari-DDL-Oracle-CREATE.sql |   7 +-
 .../resources/Ambari-DDL-Postgres-CREATE.sql    |   7 +-
 .../resources/Ambari-DDL-SQLAnywhere-CREATE.sql |   7 +-
 .../resources/Ambari-DDL-SQLServer-CREATE.sql   |   7 +-
 .../actionmanager/TestActionDBAccessorImpl.java |   3 +-
 .../actionmanager/TestActionScheduler.java      |  71 ++-
 .../alerts/AmbariPerformanceRunnableTest.java   |   7 +-
 .../internal/UpgradeResourceProviderTest.java   |   1 -
 .../UpgradeSummaryResourceProviderTest.java     |   1 -
 .../listeners/tasks/TaskStatusListenerTest.java | 164 +++++
 .../ambari/server/state/ConfigHelperTest.java   |   2 +
 .../cluster/ClusterEffectiveVersionTest.java    |   5 +-
 .../services/RetryUpgradeActionServiceTest.java |   1 -
 .../server/upgrade/UpgradeCatalog300Test.java   |  20 +
 34 files changed, 1892 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 7881a4b..b813fe6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -45,7 +45,9 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
 import org.apache.ambari.server.events.HostsRemovedEvent;
 import org.apache.ambari.server.events.RequestFinishedEvent;
+import org.apache.ambari.server.events.TaskCreateEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
@@ -130,6 +132,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   AmbariEventPublisher ambariEventPublisher;
 
   @Inject
+  TaskEventPublisher taskEventPublisher;
+
+  @Inject
   AuditLogger auditLogger;
 
   /**
@@ -205,8 +210,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   public Collection<HostRoleCommandEntity> abortOperation(long requestId) {
     long now = System.currentTimeMillis();
 
-    endRequest(requestId);
-
     // only request commands which actually need to be aborted; requesting all
     // commands here can cause OOM problems during large requests like upgrades
     List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequestIdAndStatuses(requestId,
@@ -228,7 +231,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     if (!commands.isEmpty()) {
       return hostRoleCommandDAO.mergeAll(commands);
     }
-
+    endRequest(requestId);
     return Collections.emptyList();
   }
 
@@ -283,7 +286,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   @Override
   @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
   public List<Stage> getStagesInProgress() {
-    List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(
+    List<StageEntity> stageEntities = stageDAO.findByStatuses(
       HostRoleStatus.IN_PROGRESS_STATUSES);
     return getStagesForEntities(stageEntities);
   }
@@ -343,6 +346,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     RequestEntity requestEntity = request.constructNewPersistenceEntity();
 
     Long clusterId = -1L;
+    Long requestId = requestEntity.getRequestId();
     ClusterEntity clusterEntity = clusterDAO.findById(request.getClusterId());
     if (clusterEntity != null) {
       clusterId = clusterEntity.getClusterId();
@@ -356,8 +360,11 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
 
     addRequestToAuditlogCache(request);
 
+    List<HostRoleCommand> hostRoleCommands = new ArrayList<>();
+
     for (Stage stage : request.getStages()) {
       StageEntity stageEntity = stage.constructNewPersistenceEntity();
+      Long stageId = stageEntity.getStageId();
       stageEntities.add(stageEntity);
       stageEntity.setClusterId(clusterId);
       stageEntity.setRequest(requestEntity);
@@ -366,6 +373,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
       List<HostRoleCommand> orderedHostRoleCommands = stage.getOrderedHostRoleCommands();
 
       for (HostRoleCommand hostRoleCommand : orderedHostRoleCommands) {
+        hostRoleCommand.setRequestId(requestId);
+        hostRoleCommand.setStageId(stageId);
         HostRoleCommandEntity hostRoleCommandEntity = hostRoleCommand.constructNewPersistenceEntity();
         hostRoleCommandEntity.setStage(stageEntity);
         hostRoleCommandDAO.create(hostRoleCommandEntity);
@@ -415,11 +424,12 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
         hostRoleCommandEntity.setExecutionCommand(executionCommandEntity);
 
         executionCommandDAO.create(hostRoleCommandEntity.getExecutionCommand());
-        hostRoleCommandEntity = hostRoleCommandDAO.merge(hostRoleCommandEntity);
+        hostRoleCommandEntity = hostRoleCommandDAO.mergeWithoutPublishEvent(hostRoleCommandEntity);
 
         if (null != hostEntity) {
           hostEntity = hostDAO.merge(hostEntity);
         }
+        hostRoleCommands.add(hostRoleCommand);
       }
 
       for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
@@ -431,6 +441,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
 
     requestEntity.setStages(stageEntities);
     requestDAO.merge(requestEntity);
+
+    TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
+    taskEventPublisher.publish(taskCreateEvent);
   }
 
   @Override
@@ -497,66 +510,55 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     long now = System.currentTimeMillis();
 
     List<Long> requestsToCheck = new ArrayList<Long>();
-    List<Long> abortedCommandUpdates = new ArrayList<Long>();
 
     List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet());
+    List<HostRoleCommandEntity> commandEntitiesToMerge = new ArrayList<HostRoleCommandEntity>();
     for (HostRoleCommandEntity commandEntity : commandEntities) {
       CommandReport report = taskReports.get(commandEntity.getTaskId());
-
-      boolean statusChanged = false;
-
-      switch (commandEntity.getStatus()) {
-        case ABORTED:
-          // We don't want to overwrite statuses for ABORTED tasks with
-          // statuses that have been received from the agent after aborting task
-          abortedCommandUpdates.add(commandEntity.getTaskId());
-          break;
-        default:
-          HostRoleStatus status = HostRoleStatus.valueOf(report.getStatus());
-          // if FAILED and marked for holding then set status = HOLDING_FAILED
-          if (status == HostRoleStatus.FAILED && commandEntity.isRetryAllowed()) {
-            status = HostRoleStatus.HOLDING_FAILED;
-
-            // tasks can be marked as skipped when they fail
-            if (commandEntity.isFailureAutoSkipped()) {
-              status = HostRoleStatus.SKIPPED_FAILED;
-            }
+      HostRoleStatus existingTaskStatus = commandEntity.getStatus();
+      HostRoleStatus reportedTaskStatus = HostRoleStatus.valueOf(report.getStatus());
+      if (!existingTaskStatus.isCompletedState() || existingTaskStatus == HostRoleStatus.ABORTED) {
+        // if FAILED and marked for holding then set reportedTaskStatus = HOLDING_FAILED
+        if (reportedTaskStatus == HostRoleStatus.FAILED && commandEntity.isRetryAllowed()) {
+          reportedTaskStatus = HostRoleStatus.HOLDING_FAILED;
+
+          // tasks can be marked as skipped when they fail
+          if (commandEntity.isFailureAutoSkipped()) {
+            reportedTaskStatus = HostRoleStatus.SKIPPED_FAILED;
           }
-
-          commandEntity.setStatus(status);
-          statusChanged = true;
-          break;
-      }
-
-      commandEntity.setStdOut(report.getStdOut().getBytes());
-      commandEntity.setStdError(report.getStdErr().getBytes());
-      commandEntity.setStructuredOut(report.getStructuredOut() == null ? null :
-        report.getStructuredOut().getBytes());
-      commandEntity.setExitcode(report.getExitCode());
-
-      if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
-        commandEntity.setEndTime(now);
-
-        String actionId = report.getActionId();
-        long[] requestStageIds = StageUtils.getRequestStage(actionId);
-        long requestId = requestStageIds[0];
-        long stageId = requestStageIds[1];
-        if(statusChanged) {
-          auditLog(commandEntity, requestId);
         }
-        if (requestDAO.getLastStageId(requestId).equals(stageId)) {
-          requestsToCheck.add(requestId);
+        if (!existingTaskStatus.isCompletedState()) {
+          commandEntity.setStatus(reportedTaskStatus);
         }
+        commandEntity.setStdOut(report.getStdOut().getBytes());
+        commandEntity.setStdError(report.getStdErr().getBytes());
+        commandEntity.setStructuredOut(report.getStructuredOut() == null ? null :
+            report.getStructuredOut().getBytes());
+        commandEntity.setExitcode(report.getExitCode());
+        if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
+          commandEntity.setEndTime(now);
+
+          String actionId = report.getActionId();
+          long[] requestStageIds = StageUtils.getRequestStage(actionId);
+          long requestId = requestStageIds[0];
+          long stageId = requestStageIds[1];
+          auditLog(commandEntity, requestId);
+          if (requestDAO.getLastStageId(requestId).equals(stageId)) {
+            requestsToCheck.add(requestId);
+          }
+        }
+        commandEntitiesToMerge.add(commandEntity);
+      } else {
+       LOG.warn(String.format("Request for invalid transition of host role command status received for task id %d from " +
+           "agent: %s -> %s",commandEntity.getTaskId(),existingTaskStatus,reportedTaskStatus));
       }
     }
 
     // no need to merge if there's nothing to merge
-    if (!commandEntities.isEmpty()) {
-      hostRoleCommandDAO.mergeAll(commandEntities);
+    if (!commandEntitiesToMerge.isEmpty()) {
+      hostRoleCommandDAO.mergeAll(commandEntitiesToMerge);
     }
 
-    // Invalidate cache because of updates to ABORTED commands
-    hostRoleCommandCache.invalidateAll(abortedCommandUpdates);
 
     for (Long requestId : requestsToCheck) {
       endRequestIfCompleted(requestId);
@@ -923,7 +925,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
       return HostRoleStatus.QUEUED;
     }
     Collection<HostRoleStatus> taskStatuses = details.getTaskStatuses();
-    return CalculatedStatus.calculateSummaryStatusOfStage(CalculatedStatus.calculateStatusCounts(taskStatuses), numberOfTasks, false);
+    return CalculatedStatus.calculateSummaryStatus(CalculatedStatus.calculateStatusCounts(taskStatuses), numberOfTasks, false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 680c0a6..a92c03c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.persistence.EntityManager;
 
@@ -49,6 +50,7 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
 import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.events.publishers.JPAEventPublisher;
 import org.apache.ambari.server.metadata.RoleCommandOrder;
@@ -75,10 +77,13 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.reflect.TypeToken;
@@ -179,6 +184,9 @@ class ActionScheduler implements Runnable {
    * we receive awake() request during running a scheduler iteration.
    */
   private boolean activeAwakeRequest = false;
+
+  private AtomicBoolean taskStatusLoaded = new AtomicBoolean();
+
   //Cache for clusterHostinfo, key - stageId-requestId
   private Cache<String, Map<String, Set<String>>> clusterHostInfoCache;
   private Cache<String, Map<String, String>> commandParamsStageCache;
@@ -353,6 +361,8 @@ class ActionScheduler implements Runnable {
         LOG.debug("Processing {} in progress stages ", stages.size());
       }
 
+      publishInProgressTasks(stages);
+
       if (stages.isEmpty()) {
         // Nothing to do
         if (LOG.isDebugEnabled()) {
@@ -532,6 +542,27 @@ class ActionScheduler implements Runnable {
     }
   }
 
+  /**
+   * publish event to load {@link TaskStatusListener#activeTasksMap} {@link TaskStatusListener#activeStageMap}
+   * and {@link TaskStatusListener#activeRequestMap} for all running request once during server startup.
+   * This is required as some tasks may have been in progress when server was last stopped
+   * @param stages list of stages
+   */
+  private void publishInProgressTasks(List<Stage> stages) {
+    if (taskStatusLoaded.compareAndSet(false, true)) {
+      if (!stages.isEmpty()) {
+        Function<Stage, Long> transform = new Function<Stage, Long>() {
+          @Override
+          public Long apply(Stage stage) {
+            return stage.getRequestId();
+          }
+        };
+        Set<Long> runningRequestID = ImmutableSet.copyOf(Lists.transform(stages, transform));
+        List<HostRoleCommand> hostRoleCommands = db.getAllTasksByRequestIds(runningRequestID);
+        hostRoleCommandDAO.publishTaskCreateEvent(hostRoleCommands);
+      }
+    }
+  }
 
   /**
    * Returns the list of hosts that have a task assigned

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
index 31e11c1..502c016 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
@@ -71,7 +71,8 @@ public class Request {
    * As of now, this field is not used. Request status is
    * calculated at RequestResourceProvider on the fly.
    */
-  private HostRoleStatus status; // not persisted yet
+  private HostRoleStatus status = HostRoleStatus.PENDING;
+  private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
   private String inputs;
   private List<RequestResourceFilter> resourceFilters;
   private RequestOperationLevel operationLevel;
@@ -186,6 +187,7 @@ public class Request {
     this.requestType = entity.getRequestType();
     this.commandName = entity.getCommandName();
     this.status = entity.getStatus();
+    this.displayStatus = entity.getDisplayStatus();
     if (entity.getRequestScheduleEntity() != null) {
       this.requestScheduleId = entity.getRequestScheduleEntity().getScheduleId();
     }
@@ -241,6 +243,8 @@ public class Request {
     requestEntity.setInputs(inputs);
     requestEntity.setRequestType(requestType);
     requestEntity.setRequestScheduleId(requestScheduleId);
+    requestEntity.setStatus(status);
+    requestEntity.setDisplayStatus(displayStatus);
     //TODO set all fields
 
     if (resourceFilters != null) {
@@ -381,6 +385,8 @@ public class Request {
         ", startTime=" + startTime +
         ", endTime=" + endTime +
         ", inputs='" + inputs + '\'' +
+        ", status='" + status + '\'' +
+        ", displayStatus='" + displayStatus + '\'' +
         ", resourceFilters='" + resourceFilters + '\'' +
         ", operationLevel='" + operationLevel + '\'' +
         ", requestType=" + requestType +

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index 4a05b32..f7ceca2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -74,6 +74,8 @@ public class Stage {
   private long stageId = -1;
   private final String logDir;
   private final String requestContext;
+  private HostRoleStatus status = HostRoleStatus.PENDING;
+  private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
   private String clusterHostInfo;
   private String commandParamsStage;
   private String hostParamsStage;
@@ -157,6 +159,8 @@ public class Stage {
     commandParamsStage = stageEntity.getCommandParamsStage();
     hostParamsStage = stageEntity.getHostParamsStage();
     commandExecutionType = stageEntity.getCommandExecutionType();
+    status = stageEntity.getStatus();
+    displayStatus = stageEntity.getDisplayStatus();
 
     List<Long> taskIds = hostRoleCommandDAO.findTaskIdsByStage(requestId, stageId);
     Collection<HostRoleCommand> commands = dbAccessor.getTasks(taskIds);
@@ -197,6 +201,8 @@ public class Stage {
     stageEntity.setCommandParamsStage(commandParamsStage);
     stageEntity.setHostParamsStage(hostParamsStage);
     stageEntity.setCommandExecutionType(commandExecutionType);
+    stageEntity.setStatus(status);
+    stageEntity.setDisplayStatus(displayStatus);
 
     for (Role role : successFactors.keySet()) {
       RoleSuccessCriteriaEntity roleSuccessCriteriaEntity = new RoleSuccessCriteriaEntity();
@@ -290,6 +296,23 @@ public class Stage {
     this.commandExecutionType = commandExecutionType;
   }
 
+  /**
+   * get current status of the stage
+   * @return {@link HostRoleStatus}
+   */
+  public HostRoleStatus getStatus() {
+    return status;
+  }
+
+  /**
+   * sets status of the stage
+   * @param status {@link HostRoleStatus}
+   */
+  public void setStatus(HostRoleStatus status) {
+    this.status = status;
+  }
+
+
   public synchronized void setStageId(long stageId) {
     if (this.stageId != -1) {
       throw new RuntimeException("Attempt to set stageId again! Not allowed.");
@@ -915,6 +938,8 @@ public class Stage {
     builder.append("clusterHostInfo="+clusterHostInfo+"\n");
     builder.append("commandParamsStage="+commandParamsStage+"\n");
     builder.append("hostParamsStage="+hostParamsStage+"\n");
+    builder.append("status="+status+"\n");
+    builder.append("displayStatus="+displayStatus+"\n");
     builder.append("Success Factors:\n");
     for (Role r : successFactors.keySet()) {
       builder.append("  role: "+r+", factor: "+successFactors.get(r)+"\n");

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
index 3c415df..32dd03d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
@@ -26,12 +26,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.ambari.server.Role;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
 import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
 import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
 
 /**
  * Status of a request resource, calculated from a set of tasks or stages.
@@ -142,7 +150,7 @@ public class CalculatedStatus {
 
     Map<HostRoleStatus, Integer> taskStatusCounts = CalculatedStatus.calculateTaskEntityStatusCounts(tasks);
 
-    HostRoleStatus status = calculateSummaryStatusOfStage(taskStatusCounts, size, skippable);
+    HostRoleStatus status = calculateSummaryStatus(taskStatusCounts, size, skippable);
 
     double progressPercent = calculateProgressPercent(taskStatusCounts, size);
 
@@ -167,7 +175,7 @@ public class CalculatedStatus {
 
       // calculate the stage status from the task status counts
       HostRoleStatus stageStatus =
-          calculateSummaryStatusOfStage(calculateTaskEntityStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
+          calculateSummaryStatus(calculateTaskEntityStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
 
       stageStatuses.add(stageStatus);
 
@@ -203,7 +211,7 @@ public class CalculatedStatus {
 
       // calculate the stage status from the task status counts
       HostRoleStatus stageStatus =
-          calculateSummaryStatusOfStage(calculateTaskStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
+          calculateSummaryStatus(calculateTaskStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
 
       stageStatuses.add(stageStatus);
 
@@ -256,6 +264,126 @@ public class CalculatedStatus {
   }
 
   /**
+   * Returns counts of tasks that are in various states.
+   *
+   * @param hostRoleCommands  collection of beans {@link HostRoleCommand}
+   *
+   * @return a map of counts of tasks keyed by the task status
+   */
+  public static Map<HostRoleStatus, Integer> calculateStatusCountsForTasks(Collection<HostRoleCommand> hostRoleCommands) {
+    Map<HostRoleStatus, Integer> counters = new HashMap<>();
+    // initialize
+    for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+      counters.put(hostRoleStatus, 0);
+    }
+    // calculate counts
+    for (HostRoleCommand hrc : hostRoleCommands) {
+      // count tasks where isCompletedState() == true as COMPLETED
+      // but don't count tasks with COMPLETED status twice
+      if (hrc.getStatus().isCompletedState() && hrc.getStatus() != HostRoleStatus.COMPLETED) {
+        // Increase total number of completed tasks;
+        counters.put(HostRoleStatus.COMPLETED, counters.get(HostRoleStatus.COMPLETED) + 1);
+      }
+      // Increment counter for particular status
+      counters.put(hrc.getStatus(), counters.get(hrc.getStatus()) + 1);
+    }
+
+    // We overwrite the value to have the sum converged
+    counters.put(HostRoleStatus.IN_PROGRESS,
+        hostRoleCommands.size() -
+            counters.get(HostRoleStatus.COMPLETED) -
+            counters.get(HostRoleStatus.QUEUED) -
+            counters.get(HostRoleStatus.PENDING));
+
+    return counters;
+  }
+
+  /**
+   * Returns map for counts of stages that are in various states.
+   *
+   * @param stages  collection of beans {@link org.apache.ambari.server.events.listeners.tasks.TaskStatusListener.ActiveStage}
+   *
+   * @return a map of counts of tasks keyed by the task status
+   */
+  public static Map<StatusType,Map<HostRoleStatus, Integer>> calculateStatusCountsForStage(Collection<TaskStatusListener.ActiveStage> stages) {
+
+    Map<StatusType,Map<HostRoleStatus, Integer>> counters = new HashMap<>();
+    for (StatusType statusType : StatusType.values()) {
+      Map <HostRoleStatus, Integer> statusMap = new HashMap<HostRoleStatus, Integer>();
+      counters.put(statusType,statusMap);
+      // initialize
+      for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+        statusMap.put(hostRoleStatus, 0);
+      }
+      for (TaskStatusListener.ActiveStage stage : stages) {
+        // count tasks where isCompletedState() == true as COMPLETED
+        // but don't count tasks with COMPLETED status twice
+        HostRoleStatus status;
+        if (statusType == StatusType.DISPLAY_STATUS) {
+          status = stage.getDisplayStatus();
+        } else {
+          status = stage.getStatus();
+        }
+        if (status.isCompletedState() && status != HostRoleStatus.COMPLETED) {
+          // Increase total number of completed tasks;
+          statusMap.put(HostRoleStatus.COMPLETED, statusMap.get(HostRoleStatus.COMPLETED) + 1);
+        }
+
+        // Increment counter for particular status
+        statusMap.put(status, statusMap.get(status) + 1);
+      }
+      statusMap.put(HostRoleStatus.IN_PROGRESS,
+          stages.size() -
+              statusMap.get(HostRoleStatus.COMPLETED) -
+              statusMap.get(HostRoleStatus.QUEUED) -
+              statusMap.get(HostRoleStatus.PENDING));
+    }
+    return counters;
+  }
+
+
+  /**
+   * Returns counts of tasks that are in various states.
+   *
+   * @param hostRoleCommands  collection of beans {@link HostRoleCommand}
+   *
+   * @return a map of counts of tasks keyed by the task status
+   */
+  public static Map<HostRoleStatus, Integer> calculateStatusCountsForTasks(Collection<HostRoleCommand> hostRoleCommands, StageEntityPK stage) {
+    Map<HostRoleStatus, Integer> counters = new HashMap<>();
+    List<HostRoleCommand> hostRoleCommandsOfStage = new ArrayList<>();
+    // initialize
+    for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+      counters.put(hostRoleStatus, 0);
+    }
+    // calculate counts
+    for (HostRoleCommand hrc : hostRoleCommands) {
+      if (stage.getStageId() == hrc.getStageId() && stage.getRequestId() == hrc.getRequestId()) {
+        // count tasks where isCompletedState() == true as COMPLETED
+        // but don't count tasks with COMPLETED status twice
+        if (hrc.getStatus().isCompletedState() && hrc.getStatus() != HostRoleStatus.COMPLETED) {
+          // Increase total number of completed tasks;
+          counters.put(HostRoleStatus.COMPLETED, counters.get(HostRoleStatus.COMPLETED) + 1);
+        }
+
+        // Increment counter for particular status
+        counters.put(hrc.getStatus(), counters.get(hrc.getStatus()) + 1);
+
+        hostRoleCommandsOfStage.add(hrc);
+      }
+    }
+
+    // We overwrite the value to have the sum converged
+    counters.put(HostRoleStatus.IN_PROGRESS,
+        hostRoleCommandsOfStage.size() -
+            counters.get(HostRoleStatus.COMPLETED) -
+            counters.get(HostRoleStatus.QUEUED) -
+            counters.get(HostRoleStatus.PENDING));
+
+    return counters;
+  }
+
+  /**
    * Returns counts of task entities that are in various states.
    *
    * @param tasks  the collection of task entities
@@ -329,7 +457,7 @@ public class CalculatedStatus {
       int total = summary.getTaskTotal();
       boolean skip = summary.isStageSkippable();
       Map<HostRoleStatus, Integer> counts = calculateStatusCounts(summary.getTaskStatuses());
-      HostRoleStatus stageStatus = calculateSummaryStatusOfStage(counts, total, skip);
+      HostRoleStatus stageStatus = calculateSummaryStatus(counts, total, skip);
       HostRoleStatus stageDisplayStatus = calculateSummaryDisplayStatus(counts, total, skip);
 
       stageStatuses.add(stageStatus);
@@ -392,7 +520,7 @@ public class CalculatedStatus {
    *
    * @return summary request status based on statuses of tasks in different states.
    */
-  public static HostRoleStatus calculateSummaryStatusOfStage(Map<HostRoleStatus, Integer> counters,
+  public static HostRoleStatus calculateSummaryStatus(Map<HostRoleStatus, Integer> counters,
       int total, boolean skippable) {
 
     // when there are 0 tasks, return COMPLETED
@@ -435,6 +563,230 @@ public class CalculatedStatus {
   }
 
   /**
+   *
+   * @param counters counts of resources that are in various states
+   * @param skippable {Boolean} <code>TRUE<code/> if failure of any of the task should not fail the stage
+   * @return {@link HostRoleStatus}
+   */
+  public static HostRoleStatus calculateSummaryStatusFromPartialSet(Map<HostRoleStatus, Integer> counters,
+                                                      boolean skippable) {
+
+    HostRoleStatus status = HostRoleStatus.PENDING;
+    // By definition, any tasks in a future stage must be held in a PENDING status.
+    if (counters.get(HostRoleStatus.HOLDING) > 0 || counters.get(HostRoleStatus.HOLDING_FAILED) > 0 || counters.get(HostRoleStatus.HOLDING_TIMEDOUT) > 0) {
+      status =  counters.get(HostRoleStatus.HOLDING) > 0 ? HostRoleStatus.HOLDING :
+          counters.get(HostRoleStatus.HOLDING_FAILED) > 0 ? HostRoleStatus.HOLDING_FAILED :
+              HostRoleStatus.HOLDING_TIMEDOUT;
+    }
+
+    // Because tasks are not skippable, guaranteed to be FAILED
+    if (counters.get(HostRoleStatus.FAILED) > 0 && !skippable) {
+      status = HostRoleStatus.FAILED;
+    }
+
+    // Because tasks are not skippable, guaranteed to be TIMEDOUT
+    if (counters.get(HostRoleStatus.TIMEDOUT) > 0  && !skippable) {
+      status = HostRoleStatus.TIMEDOUT;
+    }
+
+    int inProgressTasks =  counters.get(HostRoleStatus.QUEUED) + counters.get(HostRoleStatus.IN_PROGRESS);
+    if (inProgressTasks > 0) {
+      status = HostRoleStatus.IN_PROGRESS;
+    }
+
+    return status;
+  }
+
+
+  /**
+   *
+   * @param hostRoleCommands list of {@link HostRoleCommand} for a stage
+   * @param counters counts of resources that are in various states
+   * @param successFactors Map of roles to their successfactor for a stage
+   * @param skippable {Boolean} <code>TRUE<code/> if failure of any of the task should not fail the stage
+   * @return {@link HostRoleStatus} based on success factor
+   */
+  public static HostRoleStatus calculateStageStatus(List <HostRoleCommand> hostRoleCommands, Map<HostRoleStatus, Integer> counters, Map<Role, Float> successFactors,
+                                                    boolean skippable) {
+
+    // when there are 0 tasks, return COMPLETED
+    int total = hostRoleCommands.size();
+    if (total == 0) {
+      return HostRoleStatus.COMPLETED;
+    }
+
+    if (counters.get(HostRoleStatus.PENDING) == total) {
+      return HostRoleStatus.PENDING;
+    }
+
+    // By definition, any tasks in a future stage must be held in a PENDING status.
+    if (counters.get(HostRoleStatus.HOLDING) > 0 || counters.get(HostRoleStatus.HOLDING_FAILED) > 0 || counters.get(HostRoleStatus.HOLDING_TIMEDOUT) > 0) {
+      return counters.get(HostRoleStatus.HOLDING) > 0 ? HostRoleStatus.HOLDING :
+          counters.get(HostRoleStatus.HOLDING_FAILED) > 0 ? HostRoleStatus.HOLDING_FAILED :
+              HostRoleStatus.HOLDING_TIMEDOUT;
+    }
+
+
+    if (counters.get(HostRoleStatus.FAILED) > 0 && !skippable) {
+      Set<Role> rolesWithFailedTasks = getRolesOfFailedTasks(hostRoleCommands);
+      Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithFailedTasks, successFactors);
+      if (didStageFailed) return HostRoleStatus.FAILED;
+    }
+
+
+    if (counters.get(HostRoleStatus.TIMEDOUT) > 0  && !skippable) {
+      Set<Role> rolesWithTimedOutTasks = getRolesOfTimedOutTasks(hostRoleCommands);
+      Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithTimedOutTasks, successFactors);
+      if (didStageFailed) return HostRoleStatus.TIMEDOUT;
+    }
+
+    int numActiveTasks = counters.get(HostRoleStatus.PENDING) + counters.get(HostRoleStatus.QUEUED) + counters.get(HostRoleStatus.IN_PROGRESS);
+
+    if (numActiveTasks > 0) {
+      return HostRoleStatus.IN_PROGRESS;
+    } else if (counters.get(HostRoleStatus.ABORTED) > 0) {
+      Set<Role> rolesWithTimedOutTasks = getRolesOfAbortedTasks(hostRoleCommands);
+      Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithTimedOutTasks, successFactors);
+      if (didStageFailed) return HostRoleStatus.ABORTED;
+    }
+
+    return HostRoleStatus.COMPLETED;
+  }
+
+  /**
+   *  Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#FAILED}
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @return Set of {@link Role}
+   */
+  protected static Set<Role> getRolesOfFailedTasks(List <HostRoleCommand> hostRoleCommands) {
+    return getRolesOfTasks(hostRoleCommands, HostRoleStatus.FAILED);
+  }
+
+  /**
+   *  Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#TIMEDOUT}
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @return Set of {@link Role}
+   */
+  protected static Set<Role> getRolesOfTimedOutTasks(List <HostRoleCommand> hostRoleCommands) {
+    return getRolesOfTasks(hostRoleCommands, HostRoleStatus.TIMEDOUT);
+  }
+
+  /**
+   *  Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#ABORTED}
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @return Set of {@link Role}
+   */
+  protected static Set<Role> getRolesOfAbortedTasks(List <HostRoleCommand> hostRoleCommands) {
+    return getRolesOfTasks(hostRoleCommands, HostRoleStatus.ABORTED);
+  }
+
+  /**
+   * Get all {@link Role} any of whose tasks are in given {@code status}
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @param status {@link HostRoleStatus}
+   * @return Set of {@link Role}
+   */
+  protected static Set<Role> getRolesOfTasks(List <HostRoleCommand> hostRoleCommands, final HostRoleStatus status) {
+
+    Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+      @Override
+      public boolean apply(HostRoleCommand hrc) {
+        return hrc.getStatus() ==  status;
+      }
+    };
+
+    Function<HostRoleCommand, Role> transform = new Function<HostRoleCommand, Role>() {
+      @Override
+      public Role apply(HostRoleCommand hrc) {
+        return hrc.getRole();
+      }
+    };
+    return FluentIterable.from(hostRoleCommands)
+        .filter(predicate)
+        .transform(transform)
+        .toSet();
+  }
+
+  /**
+   *
+   * @param hostRoleCommands list of {@link HostRoleCommand} for a stage
+   * @param roles  set of roles to be checked for meeting success criteria
+   * @param successFactors  map of role to it's success factor
+   * @return {Boolean} <code>TRUE</code> if stage failed due to hostRoleCommands of any role not meeting success criteria
+   */
+  protected static Boolean didStageFailed(List<HostRoleCommand> hostRoleCommands, Set<Role> roles, Map<Role, Float> successFactors) {
+    Boolean isFailed = Boolean.FALSE;
+    for (Role role: roles) {
+      List <HostRoleCommand> hostRoleCommandsOfRole = getHostRoleCommandsOfRole(hostRoleCommands, role);
+      List <HostRoleCommand> failedHostRoleCommands =  getFailedHostRoleCommands(hostRoleCommandsOfRole);
+      float successRatioForRole = (hostRoleCommandsOfRole.size() - failedHostRoleCommands.size())/hostRoleCommandsOfRole.size();
+      Float successFactorForRole =  successFactors.get(role) == null ? 1.0f : successFactors.get(role);
+      if (successRatioForRole  < successFactorForRole) {
+        isFailed = Boolean.TRUE;
+        break;
+      }
+    }
+    return isFailed;
+  }
+
+  /**
+   *
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @param role {@link Role}
+   * @return list of {@link HostRoleCommand} that belongs to {@link Role}
+   */
+  protected static List<HostRoleCommand> getHostRoleCommandsOfRole(List <HostRoleCommand> hostRoleCommands, final Role role) {
+    Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+      @Override
+      public boolean apply(HostRoleCommand hrc) {
+        return hrc.getRole() ==  role;
+      }
+    };
+    return FluentIterable.from(hostRoleCommands)
+        .filter(predicate)
+        .toList();
+  }
+
+  /**
+   *
+   * @param hostRoleCommands list of {@link HostRoleCommand}
+   * @return list of {@link HostRoleCommand} with failed status
+   */
+  protected static List<HostRoleCommand> getFailedHostRoleCommands(List <HostRoleCommand> hostRoleCommands) {
+    Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+      @Override
+      public boolean apply(HostRoleCommand hrc) {
+        return hrc.getStatus().isFailedAndNotSkippableState();
+      }
+    };
+    return FluentIterable.from(hostRoleCommands)
+        .filter(predicate)
+        .toList();
+  }
+
+
+  /**
+   * Calculate overall status from collection of statuses
+   * @param hostRoleStatuses list of all stage's {@link HostRoleStatus}
+   * @return overall status of a request
+   */
+  public static HostRoleStatus getOverallStatusForRequest (Collection<HostRoleStatus> hostRoleStatuses) {
+    Map<HostRoleStatus, Integer> statusCount = calculateStatusCounts(hostRoleStatuses);
+    return calculateSummaryStatus(statusCount, hostRoleStatuses.size(), false);
+  }
+
+  /**
+   * Calculate overall display status from collection of statuses
+   * @param hostRoleStatuses list of all stage's {@link HostRoleStatus}
+   * @return overall display status of a request
+   */
+  public static HostRoleStatus getOverallDisplayStatusForRequest (Collection<HostRoleStatus> hostRoleStatuses) {
+    Map<HostRoleStatus, Integer> statusCount = calculateStatusCounts(hostRoleStatuses);
+    return calculateSummaryDisplayStatus(statusCount, hostRoleStatuses.size(), false);
+  }
+
+
+  /**
    * Calculate overall status of an upgrade.
    *
    * @param counters   counts of resources that are in various states
@@ -444,7 +796,7 @@ public class CalculatedStatus {
    */
   protected static HostRoleStatus calculateSummaryStatusOfUpgrade(
       Map<HostRoleStatus, Integer> counters, int total) {
-    return calculateSummaryStatusOfStage(counters, total, false);
+    return calculateSummaryStatus(counters, total, false);
   }
 
   /**
@@ -456,10 +808,28 @@ public class CalculatedStatus {
    *
    * @return summary request status based on statuses of tasks in different states.
    */
-  protected static HostRoleStatus calculateSummaryDisplayStatus(
+  public static HostRoleStatus calculateSummaryDisplayStatus(
       Map<HostRoleStatus, Integer> counters, int total, boolean skippable) {
-    return counters.get(HostRoleStatus.SKIPPED_FAILED) > 0 ? HostRoleStatus.SKIPPED_FAILED :
-           counters.get(HostRoleStatus.FAILED) > 0 ? HostRoleStatus.FAILED:
-           calculateSummaryStatusOfStage(counters, total, skippable);
+    return counters.get(HostRoleStatus.FAILED) > 0 ? HostRoleStatus.FAILED:
+           counters.get(HostRoleStatus.TIMEDOUT) > 0 ? HostRoleStatus.TIMEDOUT:
+           counters.get(HostRoleStatus.SKIPPED_FAILED) > 0 ? HostRoleStatus.SKIPPED_FAILED :
+           calculateSummaryStatus(counters, total, skippable);
+  }
+
+  /**
+   * kind of {@link HostRoleStatus} persisted by {@link Stage} and {@link Request}
+   */
+  public enum StatusType {
+    STATUS("status"),
+    DISPLAY_STATUS("display_status");
+    private String value;
+
+    StatusType(String value) {
+      this.value = value;
+    }
+
+    public String getValue() {
+      return value;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
new file mode 100644
index 0000000..9d73122
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
+
+/**
+ * The {@link TaskCreateEvent} is to be fired every time
+ * when any request is to be tracked as running requests in
+ * {@link TaskStatusListener}
+ * This usually happens when new request is created by user action or
+ * when ambari-server starts with some stages in non-completed state
+ */
+public class TaskCreateEvent extends TaskEvent {
+
+
+  /**
+   * Constructor.
+   *
+   * @param hostRoleCommandList
+   *          all hostRoleCommands for all requests
+   */
+  public TaskCreateEvent(List<HostRoleCommand> hostRoleCommandList) {
+    super(hostRoleCommandList);
+  }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
new file mode 100644
index 0000000..ca351d7
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * {@link TaskEvent} is the base for all events related to create/update of tasks
+ * that might result in update of stage/request status
+ */
+public class TaskEvent {
+  /**
+   * List of {@link HostRoleCommand}
+   */
+  private List<HostRoleCommand> hostRoleCommands;
+
+  /**
+   * Constructor.
+   *
+   * @param hostRoleCommands
+   *          list of HRCs which have been reported back by the agents.
+   */
+  public TaskEvent(List<HostRoleCommand> hostRoleCommands) {
+    this.hostRoleCommands = hostRoleCommands;
+  }
+
+  /**
+   *  Gets hostRoleCommands that created event
+   * @return List of {@link HostRoleCommand}
+   */
+  public List<HostRoleCommand> getHostRoleCommands() {
+    return hostRoleCommands;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    String hostRoleCommands = StringUtils.join(this.hostRoleCommands, ", ");
+    StringBuilder buffer = new StringBuilder("TaskEvent{");
+    buffer.append("hostRoleCommands=").append(hostRoleCommands);
+    buffer.append("}");
+    return buffer.toString();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
new file mode 100644
index 0000000..84f67f5
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+
+/**
+ * The {@link TaskUpdateEvent} is to be fired every time
+ * when host role commands are merged to the database
+ */
+public class TaskUpdateEvent extends TaskEvent{
+
+  public TaskUpdateEvent(List<HostRoleCommand> hostRoleCommandList) {
+    super(hostRoleCommandList);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
new file mode 100644
index 0000000..bc146ef
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -0,0 +1,609 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.listeners.tasks;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.controller.internal.CalculatedStatus;
+import org.apache.ambari.server.events.TaskCreateEvent;
+import org.apache.ambari.server.events.TaskUpdateEvent;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.dao.StageDAO;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * The {@link TaskStatusListener} is used to constantly update status of running Stages and Requests
+ * {@link TaskUpdateEvent} listens for all incoming events. These events are fired when either host role commands are created/updated
+ * This listener maintains map of all running tasks, stages and requests
+ */
+@Singleton
+@EagerSingleton
+public class TaskStatusListener {
+  /**
+   * Logger.
+   */
+  private final static Logger LOG = LoggerFactory.getLogger(TaskStatusListener.class);
+
+  /**
+   * Maps task id to its {@link HostRoleCommand} Object.
+   * Map has entries of all tasks of all active(ongoing) requests
+   * NOTE: Partial loading of tasks for any request may lead to incorrect update of the request status
+   */
+  private Map<Long,HostRoleCommand> activeTasksMap = new ConcurrentHashMap<>();
+
+  /**
+   * Maps all ongoing request id to its {@link ActiveRequest}
+   */
+  private Map<Long, ActiveRequest> activeRequestMap = new ConcurrentHashMap<>();
+
+  /**
+   * Maps {@link StageEntityPK} of all ongoing requests to its {@link ActiveStage}
+   * with updated {@link ActiveStage#status} and {@link ActiveStage#displayStatus}.
+   */
+  private Map<StageEntityPK, ActiveStage> activeStageMap = new ConcurrentHashMap<>();
+
+  private StageDAO stageDAO;
+
+  private RequestDAO requestDAO;
+
+
+  @Inject
+  public TaskStatusListener(TaskEventPublisher taskEventPublisher, StageDAO stageDAO, RequestDAO requestDAO) {
+    this.stageDAO = stageDAO;
+    this.requestDAO = requestDAO;
+    taskEventPublisher.register(this);
+  }
+
+  public Map<Long,HostRoleCommand> getActiveTasksMap() {
+    return activeTasksMap;
+  }
+
+  public Map<Long, ActiveRequest> getActiveRequestMap() {
+    return activeRequestMap;
+  }
+
+  public Map<StageEntityPK, ActiveStage> getActiveStageMap() {
+    return activeStageMap;
+  }
+
+  /**
+   * On receiving task update event, update related entries of the running request, stage and task in the maps
+   * Event containing newly created tasks is expected to contain complete set of all tasks for a request
+   * @param event Consumes {@link TaskUpdateEvent}.
+   */
+  @Subscribe
+  public void onTaskUpdateEvent(TaskUpdateEvent event) {
+    LOG.debug("Received task update event {}", event);
+    List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
+    List<HostRoleCommand>  hostRoleCommandWithReceivedStatus =  new ArrayList<>();
+    Set<StageEntityPK> stagesWithReceivedTaskStatus = new HashSet<>();
+    Set<Long> requestIdsWithReceivedTaskStatus =  new HashSet<>();
+    for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) {
+      Long reportedTaskId = hostRoleCommand.getTaskId();
+      HostRoleCommand activeTask =  activeTasksMap.get(reportedTaskId);
+      if (activeTask == null) {
+        LOG.error(String.format("Received update for a task %d which is not being tracked as running task", reportedTaskId));
+      } else  {
+        hostRoleCommandWithReceivedStatus.add(hostRoleCommand);
+        StageEntityPK stageEntityPK = new StageEntityPK();
+        stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+        stageEntityPK.setStageId(hostRoleCommand.getStageId());
+        stagesWithReceivedTaskStatus.add(stageEntityPK);
+        requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
+      }
+    }
+
+    updateActiveTasksMap(hostRoleCommandWithReceivedStatus);
+    Boolean didAnyStageStatusUpdated = updateActiveStagesStatus(stagesWithReceivedTaskStatus, hostRoleCommandListAll);
+    // Presumption: If there is no update in any of the running stage's status
+    // then none of the running request status needs to be updated
+    if (didAnyStageStatusUpdated) {
+      updateActiveRequestsStatus(requestIdsWithReceivedTaskStatus, stagesWithReceivedTaskStatus);
+    }
+
+  }
+
+  /**
+   * On receiving task create event, create entries in the running request, stage and task in the maps
+   * @param event Consumes {@link TaskCreateEvent}.
+   */
+  @Subscribe
+  public void onTaskCreateEvent(TaskCreateEvent event) {
+    LOG.debug("Received task create event {}", event);
+    List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
+
+    for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) {
+      activeTasksMap.put(hostRoleCommand.getTaskId(), hostRoleCommand);
+      addStagePK(hostRoleCommand);
+      addRequestId(hostRoleCommand);
+    }
+  }
+
+
+  /**
+   * update changed host role command status
+   * @param hostRoleCommandWithReceivedStatus list of host role commands reported
+   */
+  private void updateActiveTasksMap(List<HostRoleCommand> hostRoleCommandWithReceivedStatus) {
+    for (HostRoleCommand hostRoleCommand : hostRoleCommandWithReceivedStatus) {
+      Long taskId = hostRoleCommand.getTaskId();
+      activeTasksMap.put(taskId , hostRoleCommand);
+    }
+  }
+
+
+  /**
+   * Adds new {@link StageEntityPK} to be tracked as running stage in {@link #activeStageMap}
+   * @param hostRoleCommand newly created {@link HostRoleCommand} in {@link #activeTasksMap}
+   */
+  private void addStagePK(HostRoleCommand hostRoleCommand) {
+    StageEntityPK stageEntityPK = new StageEntityPK();
+    stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+    stageEntityPK.setStageId(hostRoleCommand.getStageId());
+    if (activeStageMap.containsKey(stageEntityPK)) {
+      activeStageMap.get(stageEntityPK).addTaskId(hostRoleCommand.getTaskId());
+    } else {
+      StageEntity stageEntity = stageDAO.findByPK(stageEntityPK);
+      // Stage entity of the hostrolecommand should be persisted before publishing task create event
+      assert stageEntity != null;
+      Map<Role, Float> successFactors = new HashMap<>();
+      Collection<RoleSuccessCriteriaEntity> roleSuccessCriteriaEntities = stageEntity.getRoleSuccessCriterias();
+      for (RoleSuccessCriteriaEntity successCriteriaEntity : roleSuccessCriteriaEntities) {
+        successFactors.put(successCriteriaEntity.getRole(), successCriteriaEntity.getSuccessFactor().floatValue());
+      }
+      Set<Long> taskIdSet =  Sets.newHashSet(hostRoleCommand.getTaskId());
+
+      ActiveStage reportedStage = new ActiveStage(stageEntity.getStatus(), stageEntity.getDisplayStatus(),
+          successFactors, stageEntity.isSkippable(), taskIdSet);
+      activeStageMap.put(stageEntityPK, reportedStage);
+    }
+  }
+
+  /**
+   * update and persist all changed stage status
+   * @param stagesWithReceivedTaskStatus set of stages that has received task status
+   * @param hostRoleCommandListAll list of all task updates received from agent
+   * @return  <code>true</code> if any of the stage has changed it's existing status;
+   *          <code>false</code> otherwise
+   */
+  private Boolean updateActiveStagesStatus(final Set<StageEntityPK> stagesWithReceivedTaskStatus, List<HostRoleCommand> hostRoleCommandListAll) {
+    Boolean didAnyStageStatusUpdated = Boolean.FALSE;
+    for (StageEntityPK reportedStagePK : stagesWithReceivedTaskStatus) {
+      if (activeStageMap.containsKey(reportedStagePK)) {
+        Boolean didStatusChange = updateStageStatus(reportedStagePK, hostRoleCommandListAll);
+        if (didStatusChange) {
+          ActiveStage reportedStage = activeStageMap.get(reportedStagePK);
+          stageDAO.updateStatus(reportedStagePK, reportedStage.getStatus(), reportedStage.getDisplayStatus());
+          didAnyStageStatusUpdated = Boolean.TRUE;
+        }
+      } else {
+        LOG.error(String.format("Received update for a task whose stage is not being tracked as running stage: %s", reportedStagePK.toString()));
+      }
+
+    }
+    return didAnyStageStatusUpdated;
+  }
+
+  /**
+   * Adds new request id to be tracked as running request in {@link #activeRequestMap}
+   * @param hostRoleCommand newly created {@link HostRoleCommand} in {@link #activeTasksMap}
+   */
+  private void addRequestId(HostRoleCommand hostRoleCommand) {
+    Long requestId = hostRoleCommand.getRequestId();
+    StageEntityPK stageEntityPK = new StageEntityPK();
+    stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+    stageEntityPK.setStageId(hostRoleCommand.getStageId());
+    if (activeRequestMap.containsKey(requestId)) {
+      activeRequestMap.get(requestId).addStageEntityPK(stageEntityPK);
+    } else {
+      RequestEntity requestEntity = requestDAO.findByPK(requestId);
+      // Request entity of the hostrolecommand should be persisted before publishing task create event
+      assert requestEntity != null;
+      Set<StageEntityPK> stageEntityPKs =  Sets.newHashSet(stageEntityPK);
+      ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(), stageEntityPKs);
+      activeRequestMap.put(requestId, request);
+    }
+  }
+
+
+  /**
+   * update and persist changed request status
+   * @param requestIdsWithReceivedTaskStatus set of request ids that has received tasks status
+   * @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status
+   */
+  private void updateActiveRequestsStatus(final Set<Long> requestIdsWithReceivedTaskStatus, Set<StageEntityPK> stagesWithChangedTaskStatus) {
+    for (Long reportedRequestId : requestIdsWithReceivedTaskStatus) {
+      if (activeRequestMap.containsKey(reportedRequestId)) {
+        ActiveRequest request =  activeRequestMap.get(reportedRequestId);
+        Boolean didStatusChange = updateRequestStatus(reportedRequestId, stagesWithChangedTaskStatus);
+        if (didStatusChange) {
+          requestDAO.updateStatus(reportedRequestId, request.getStatus(), request.getDisplayStatus());
+        }
+        if (request.isCompleted() && isAllTasksCompleted(reportedRequestId)) {
+          // Request is considered ton have been finished if request status and all of it's tasks status are completed
+          // in that case, request and it's stages
+          // and tasks should no longer be tracked as active(running)
+          removeRequestStageAndTasks(reportedRequestId);
+        }
+      } else {
+        LOG.error(String.format("Received update for a task whose request %d is not being tracked as running request", reportedRequestId));
+      }
+
+    }
+  }
+
+  /**
+   *
+   * @param requestId request Id
+   * @return  <code>false</code> if any of the task belonging to requestId has incomplete status
+   *          <code>true</code> otherwise
+   */
+  private Boolean isAllTasksCompleted(Long requestId) {
+    Boolean result = Boolean.TRUE;
+    for (Map.Entry<Long, HostRoleCommand> entry : activeTasksMap.entrySet()) {
+      if (entry.getValue().getRequestId() == requestId && !entry.getValue().getStatus().isCompletedState()) {
+        result = Boolean.FALSE;
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Removes entries from {@link #activeTasksMap},{@link #activeStageMap} and {@link #activeRequestMap}
+   * @param requestId request id whose entry and it's stage and task entries is to be removed
+   */
+  private void removeRequestStageAndTasks(Long requestId) {
+    removeTasks(requestId);
+    removeStages(requestId);
+    removeRequest(requestId);
+  }
+
+
+  /**
+   * Filters list of {@link Stage} to list of {@link StageEntityPK}
+   * @param requestID requestId
+   * @return  list of StageEntityPK
+   */
+  private List<StageEntityPK> getAllStageEntityPKForRequest(final Long requestID) {
+    Predicate<StageEntityPK> predicate = new Predicate<StageEntityPK>() {
+      @Override
+      public boolean apply(StageEntityPK stageEntityPK) {
+        return stageEntityPK.getRequestId().equals(requestID);
+      }
+    };
+    return  FluentIterable.from(activeStageMap.keySet())
+        .filter(predicate)
+        .toList();
+  }
+
+
+
+  /**
+   * Returns the computed status of the stage from the status of it's host role commands
+   * @param stagePK {@link StageEntityPK} primary key for the stage entity
+   * @param hostRoleCommandListAll list of all hrc received whose status has been received from agent
+   * @return {@link Boolean} <code>TRUE</code> if status of the given stage changed.
+   */
+  private Boolean updateStageStatus(final StageEntityPK stagePK, List<HostRoleCommand> hostRoleCommandListAll) {
+    Boolean didAnyStatusChanged = Boolean.FALSE;
+    ActiveStage reportedStage = activeStageMap.get(stagePK);
+    HostRoleStatus stageCurrentStatus = reportedStage.getStatus();
+    HostRoleStatus stageCurrentDisplayStatus = reportedStage.getDisplayStatus();
+
+
+    // if stage is already marked to be completed then do not calculate reported status from host role commands
+    // Presumption: There will be no status transition of the host role command from one completed state to another
+    if (!stageCurrentDisplayStatus.isCompletedState() || !stageCurrentStatus.isCompletedState()) {
+      Map<HostRoleStatus, Integer> receivedTaskStatusCount = CalculatedStatus.calculateStatusCountsForTasks(hostRoleCommandListAll, stagePK);
+      HostRoleStatus statusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(receivedTaskStatusCount, reportedStage.getSkippable());
+      HostRoleStatus displayStatusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(receivedTaskStatusCount, Boolean.FALSE);
+      if (statusFromPartialSet == HostRoleStatus.PENDING || displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+        Function<Long,HostRoleCommand> transform = new Function<Long,HostRoleCommand>(){
+          @Override
+          public HostRoleCommand apply(Long taskId) {
+            return activeTasksMap.get(taskId);
+          }
+        };
+
+        List<HostRoleCommand> activeHostRoleCommandsOfStage = FluentIterable.from(reportedStage.getTaskIds())
+            .transform(transform).toList();
+        Map<HostRoleStatus, Integer> statusCount = CalculatedStatus.calculateStatusCountsForTasks(activeHostRoleCommandsOfStage);
+        if (displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+          // calculate and get new display status of the stage as per the new status of received host role commands
+          HostRoleStatus display_status = CalculatedStatus.calculateSummaryDisplayStatus(statusCount, activeHostRoleCommandsOfStage.size(), reportedStage.getSkippable());
+          if (display_status != stageCurrentDisplayStatus) {
+            reportedStage.setDisplayStatus(display_status);
+            didAnyStatusChanged = Boolean.TRUE;
+          }
+
+        } else {
+          reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+          didAnyStatusChanged = Boolean.TRUE;
+        }
+
+        if (statusFromPartialSet == HostRoleStatus.PENDING) {
+          // calculate status of the stage as per the new status of received host role commands
+          HostRoleStatus status = CalculatedStatus.calculateStageStatus(activeHostRoleCommandsOfStage, statusCount, reportedStage.getSuccessFactors(), reportedStage.getSkippable());
+          if (status != stageCurrentStatus) {
+            reportedStage.setStatus(status);
+            didAnyStatusChanged = Boolean.TRUE;
+          }
+        } else {
+          reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+          didAnyStatusChanged = Boolean.TRUE;
+        }
+      } else {
+        reportedStage.setStatus(statusFromPartialSet);
+        reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+        didAnyStatusChanged = Boolean.TRUE;
+      }
+    }
+
+    return didAnyStatusChanged;
+  }
+
+  /**
+   *
+   * @param requestId {@link Request} whose status is to be updated
+   * @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status
+   * @return {Boolean} <code>TRUE</code> if request status has changed from existing
+   */
+  private Boolean updateRequestStatus (final Long requestId, Set<StageEntityPK> stagesWithChangedTaskStatus) {
+    Boolean didStatusChanged = Boolean.FALSE;
+    ActiveRequest request = activeRequestMap.get(requestId);
+    HostRoleStatus requestCurrentStatus = request.getStatus();
+    HostRoleStatus requestCurrentDisplayStatus = request.getDisplayStatus();
+
+    if (!requestCurrentDisplayStatus.isCompletedState() || !requestCurrentStatus.isCompletedState()) {
+      List <ActiveStage>  activeStagesWithChangesTaskStatus = new ArrayList<>();
+      for (StageEntityPK stageEntityPK:stagesWithChangedTaskStatus) {
+        if (requestId.equals(stageEntityPK.getRequestId())) {
+          ActiveStage activeStage = activeStageMap.get(stageEntityPK);
+          activeStagesWithChangesTaskStatus.add(activeStage);
+        }
+      }
+
+
+      Map<CalculatedStatus.StatusType,Map<HostRoleStatus, Integer>> stageStatusCountFromPartialSet = CalculatedStatus.calculateStatusCountsForStage(activeStagesWithChangesTaskStatus);
+      HostRoleStatus statusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(stageStatusCountFromPartialSet.get(CalculatedStatus.StatusType.STATUS), Boolean.FALSE);
+      HostRoleStatus displayStatusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(stageStatusCountFromPartialSet.get(CalculatedStatus.StatusType.DISPLAY_STATUS), Boolean.FALSE);
+
+      if (statusFromPartialSet == HostRoleStatus.PENDING || displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+        List <ActiveStage> allActiveStages = new ArrayList<>();
+        for (StageEntityPK stageEntityPK:request.getStageEntityPks()) {
+          ActiveStage activeStage = activeStageMap.get(stageEntityPK);
+          allActiveStages.add(activeStage);
+        }
+        Map<CalculatedStatus.StatusType,Map<HostRoleStatus, Integer>> stageStatusCount = CalculatedStatus.calculateStatusCountsForStage(allActiveStages);
+
+        if (displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+          // calculate and get new display status of the stage as per the new status of received host role commands
+
+          HostRoleStatus display_status = CalculatedStatus.calculateSummaryDisplayStatus(stageStatusCount.get(CalculatedStatus.StatusType.DISPLAY_STATUS), allActiveStages.size(), false);
+          if (display_status != requestCurrentDisplayStatus) {
+            request.setDisplayStatus(display_status);
+            didStatusChanged = Boolean.TRUE;
+          }
+
+        } else {
+          request.setDisplayStatus(displayStatusFromPartialSet);
+          didStatusChanged = Boolean.TRUE;
+        }
+
+        if (statusFromPartialSet == HostRoleStatus.PENDING) {
+          // calculate status of the stage as per the new status of received host role commands
+          HostRoleStatus status = CalculatedStatus.calculateSummaryStatus(stageStatusCount.get(CalculatedStatus.StatusType.STATUS), allActiveStages.size(), false);
+          if (status != requestCurrentStatus) {
+            request.setStatus(status);
+            didStatusChanged = Boolean.TRUE;
+          }
+        } else {
+          request.setDisplayStatus(displayStatusFromPartialSet);
+          didStatusChanged = Boolean.TRUE;
+        }
+      } else {
+        request.setStatus(statusFromPartialSet);
+        request.setDisplayStatus(displayStatusFromPartialSet);
+        didStatusChanged = Boolean.TRUE;
+      }
+    }
+
+    return didStatusChanged;
+  }
+
+
+  /**
+   * Removes list of {@link HostRoleCommand} entries from {@link #activeTasksMap}
+   * @param requestId request id
+   */
+  private void removeTasks(Long requestId) {
+    Iterator<Map.Entry<Long, HostRoleCommand>> iter = activeTasksMap.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<Long, HostRoleCommand> entry = iter.next();
+      HostRoleCommand hrc = entry.getValue();
+      if (hrc.getRequestId() == requestId) {
+        if (!hrc.getStatus().isCompletedState()) {
+          LOG.error(String.format("Task %d should have been completed before being removed from running task cache(activeTasksMap)", hrc.getTaskId()));
+        }
+        iter.remove();
+      }
+    }
+  }
+
+
+  /**
+   * Removes list of {@link StageEntityPK} entries from {@link #activeStageMap}
+   * @param requestId request Id
+   */
+  private void removeStages(Long requestId) {
+    List <StageEntityPK> stageEntityPKs = getAllStageEntityPKForRequest(requestId);
+    for (StageEntityPK stageEntityPK: stageEntityPKs) {
+      activeStageMap.remove(stageEntityPK);
+    }
+  }
+
+
+  /**
+   * Removes request id from {@link #activeRequestMap}
+   * @param requestId request Id
+   */
+  private void removeRequest(Long requestId) {
+    activeRequestMap.remove(requestId);
+  }
+
+
+  /**
+   * This class stores {@link Request#status} and {@link Request#displayStatus} information
+   * This information is cached for all running {@link Request} at {@link #activeRequestMap}
+   */
+  protected class ActiveRequest {
+    private HostRoleStatus status;
+    private HostRoleStatus displayStatus;
+    private Set <StageEntityPK> stageEntityPks;
+
+    public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks) {
+      this.status = status;
+      this.displayStatus = displayStatus;
+      this.stageEntityPks = stageEntityPks;
+    }
+
+    public HostRoleStatus getStatus() {
+      return status;
+    }
+
+    public void setStatus(HostRoleStatus status) {
+      this.status = status;
+    }
+
+    public HostRoleStatus getDisplayStatus() {
+      return displayStatus;
+    }
+
+    public void setDisplayStatus(HostRoleStatus displayStatus) {
+      this.displayStatus = displayStatus;
+    }
+
+    public Boolean isCompleted() {
+      return status.isCompletedState() && displayStatus.isCompletedState();
+    }
+
+    public Set <StageEntityPK> getStageEntityPks() {
+      return stageEntityPks;
+    }
+
+    public void addStageEntityPK(StageEntityPK stageEntityPK) {
+      stageEntityPks.add(stageEntityPK);
+    }
+
+  }
+
+  /**
+   * This class stores information needed to determine {@link Stage#status} and {@link Stage#displayStatus}
+   * This information is cached for all {@link Stage} of all running {@link Request} at {@link #activeStageMap}
+   */
+  public class ActiveStage {
+    private HostRoleStatus status;
+    private HostRoleStatus displayStatus;
+    private Boolean skippable;
+    private Set <Long> taskIds;
+
+    //Map of roles to successFactors for this stage. Default is 1 i.e. 100%
+    private Map<Role, Float> successFactors = new HashMap<Role, Float>();
+
+    public ActiveStage(HostRoleStatus status, HostRoleStatus displayStatus,
+                       Map<Role, Float> successFactors, Boolean skippable, Set<Long> taskIds) {
+      this.status = status;
+      this.displayStatus = displayStatus;
+      this.successFactors =  successFactors;
+      this.skippable = skippable;
+      this.taskIds = taskIds;
+    }
+
+    public HostRoleStatus getStatus() {
+      return status;
+    }
+
+    public void setStatus(HostRoleStatus status) {
+      this.status = status;
+    }
+
+    public HostRoleStatus getDisplayStatus() {
+      return displayStatus;
+    }
+
+    public void setDisplayStatus(HostRoleStatus displayStatus) {
+      this.displayStatus = displayStatus;
+    }
+
+    public Boolean getSkippable() {
+      return skippable;
+    }
+
+    public void setSkippable(Boolean skippable) {
+      this.skippable = skippable;
+    }
+
+    public Map<Role, Float> getSuccessFactors() {
+      return successFactors;
+    }
+
+    public void setSuccessFactors(Map<Role, Float> successFactors) {
+      this.successFactors = successFactors;
+    }
+
+    public Set <Long> getTaskIds() {
+      return taskIds;
+    }
+
+    public void addTaskId(Long taskId) {
+      taskIds.add(taskId);
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
new file mode 100644
index 0000000..fdc41e5
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import org.apache.ambari.server.events.TaskEvent;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link TaskEventPublisher} is used to publish instances of
+ * {@link TaskEvent} to any {@link com.google.common.eventbus.Subscribe} interested.
+ * It uses a single-threaded, serial {@link EventBus}.
+ */
+@Singleton
+public class TaskEventPublisher {
+
+  /**
+   * A single threaded, synchronous event bus for processing task events.
+   */
+  private final EventBus m_eventBus = new EventBus("ambari-task-report-event-bus");
+
+
+  /**
+   * Publishes the specified event to all registered listeners that
+   * {@link Subscribe} to  {@link TaskEvent} instances.
+   *
+   * @param event {@link TaskEvent}
+   */
+  public void publish(TaskEvent event) {
+    m_eventBus.post(event);
+  }
+
+  /**
+   * Register a listener to receive events. The listener should use the
+   * {@link Subscribe} annotation.
+   *
+   * @param object
+   *          the listener to receive events.
+   */
+  public void register(Object object) {
+    m_eventBus.register(object);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 02c4091..e834045 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -40,6 +40,8 @@ import org.apache.ambari.annotations.TransactionalLock;
 import org.apache.ambari.annotations.TransactionalLock.LockArea;
 import org.apache.ambari.annotations.TransactionalLock.LockType;
 import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.api.query.JpaPredicateVisitor;
 import org.apache.ambari.server.api.query.JpaSortBuilder;
@@ -49,6 +51,9 @@ import org.apache.ambari.server.controller.spi.Predicate;
 import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.SortRequest;
 import org.apache.ambari.server.controller.utilities.PredicateHelper;
+import org.apache.ambari.server.events.TaskCreateEvent;
+import org.apache.ambari.server.events.TaskUpdateEvent;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.TransactionalLocks;
 import org.apache.ambari.server.orm.entities.HostEntity;
@@ -58,9 +63,11 @@ import org.apache.ambari.server.orm.entities.StageEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
@@ -144,6 +151,13 @@ public class HostRoleCommandDAO {
   @Inject
   private Configuration configuration;
 
+
+  @Inject
+  HostRoleCommandFactory hostRoleCommandFactory;
+
+  @Inject
+  private TaskEventPublisher taskEventPublisher;
+
   /**
    * Used to ensure that methods which rely on the completion of
    * {@link Transactional} can detect when they are able to run.
@@ -629,11 +643,17 @@ public class HostRoleCommandDAO {
   @Transactional
   @TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
   public HostRoleCommandEntity merge(HostRoleCommandEntity entity) {
+    entity = mergeWithoutPublishEvent(entity);
+    publishTaskUpdateEvent(Collections.singletonList(hostRoleCommandFactory.createExisting(entity)));
+    return entity;
+  }
+
+  @Transactional
+  @TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
+  public HostRoleCommandEntity mergeWithoutPublishEvent(HostRoleCommandEntity entity) {
     EntityManager entityManager = entityManagerProvider.get();
     entity = entityManager.merge(entity);
-
     invalidateHostRoleCommandStatusSummaryCache(entity);
-
     return entity;
   }
 
@@ -667,10 +687,51 @@ public class HostRoleCommandDAO {
     }
 
     invalidateHostRoleCommandStatusSummaryCache(requestsToInvalidate);
-
+    publishTaskUpdateEvent(getHostRoleCommands(entities));
     return managedList;
   }
 
+  /**
+   *
+   * @param entities
+   */
+  public List<HostRoleCommand> getHostRoleCommands(Collection<HostRoleCommandEntity> entities) {
+    Function<HostRoleCommandEntity, HostRoleCommand> transform = new Function<HostRoleCommandEntity, HostRoleCommand> () {
+      @Override
+      public HostRoleCommand apply(HostRoleCommandEntity entity) {
+        return hostRoleCommandFactory.createExisting(entity);
+      }
+    };
+    return FluentIterable.from(entities)
+        .transform(transform)
+        .toList();
+
+  }
+
+  /**
+   *
+   * @param hostRoleCommands
+   */
+  public void publishTaskUpdateEvent(List<HostRoleCommand> hostRoleCommands) {
+    if (!hostRoleCommands.isEmpty()) {
+      TaskUpdateEvent taskUpdateEvent = new TaskUpdateEvent(hostRoleCommands);
+      taskEventPublisher.publish(taskUpdateEvent);
+    }
+  }
+
+  /**
+   *
+   * @param hostRoleCommands
+   */
+  public void publishTaskCreateEvent(List<HostRoleCommand> hostRoleCommands) {
+    if (!hostRoleCommands.isEmpty()) {
+      TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
+      taskEventPublisher.publish(taskCreateEvent);
+    }
+  }
+
+
+
   @Transactional
   @TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
   public void remove(HostRoleCommandEntity entity) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 1c4d0a3..2696f66 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -144,6 +144,14 @@ public class RequestDAO {
   }
 
   @Transactional
+  public void updateStatus(long requestId, HostRoleStatus status, HostRoleStatus displayStatus) {
+    RequestEntity requestEntity = findByPK(requestId);
+    requestEntity.setStatus(status);
+    requestEntity.setDisplayStatus(displayStatus);
+    merge(requestEntity);
+  }
+
+  @Transactional
   public void create(RequestEntity requestEntity) {
     entityManagerProvider.get().persist(requestEntity);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index d2f899f..126468a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.orm.dao;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -173,11 +174,15 @@ public class StageDAO {
     return daoUtils.selectList(query);
   }
 
+  /**
+   *
+   * @param statuses {@link HostRoleStatus}
+   * @return list of stage entities
+   */
   @RequiresSession
-  public List<StageEntity> findByCommandStatuses(
-      Collection<HostRoleStatus> statuses) {
+  public List<StageEntity> findByStatuses(Collection<HostRoleStatus> statuses) {
     TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(
-        "StageEntity.findByCommandStatuses", StageEntity.class);
+        "StageEntity.findByStatuses", StageEntity.class);
 
     query.setParameter("statuses", statuses);
     return daoUtils.selectList(query);
@@ -280,8 +285,8 @@ public class StageDAO {
    *          the stage entity to update
    * @param desiredStatus
    *          the desired stage status
-   * @param controller
-   *          the ambari management controller
+   * @param actionManager
+   *          the action manager
    *
    * @throws java.lang.IllegalArgumentException
    *           if the transition to the desired status is not a legal transition
@@ -301,9 +306,11 @@ public class StageDAO {
     if (desiredStatus == HostRoleStatus.ABORTED) {
       actionManager.cancelRequest(stage.getRequestId(), "User aborted.");
     } else {
+      List <HostRoleCommandEntity> hrcWithChangedStatus = new ArrayList<HostRoleCommandEntity>();
       for (HostRoleCommandEntity hostRoleCommand : tasks) {
         HostRoleStatus hostRoleStatus = hostRoleCommand.getStatus();
         if (hostRoleStatus.equals(currentStatus)) {
+          hrcWithChangedStatus.add(hostRoleCommand);
           hostRoleCommand.setStatus(desiredStatus);
 
           if (desiredStatus == HostRoleStatus.PENDING) {
@@ -316,6 +323,21 @@ public class StageDAO {
   }
 
   /**
+   *
+   * @param stageEntityPK  {@link StageEntityPK}
+   * @param status {@link HostRoleStatus}
+   * @param displayStatus {@link HostRoleStatus}
+   */
+  @Transactional
+  public void updateStatus(StageEntityPK stageEntityPK, HostRoleStatus status, HostRoleStatus displayStatus) {
+    StageEntity stageEntity = findByPK(stageEntityPK);
+    stageEntity.setStatus(status);
+    stageEntity.setDisplayStatus(displayStatus);
+    merge(stageEntity);
+  }
+
+
+  /**
    * Determine whether or not it is valid to transition from this stage status
    * to the given status.
    *

http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index 74271b9..a809295 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -105,9 +105,9 @@ public class HostRoleCommandEntity {
   @Basic
   private Integer exitcode = 0;
 
-  @Column(name = "status")
+  @Column(name = "status", nullable = false)
   @Enumerated(EnumType.STRING)
-  private HostRoleStatus status;
+  private HostRoleStatus status = HostRoleStatus.PENDING;
 
   @Column(name = "std_error")
   @Lob