You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ja...@apache.org on 2017/02/17 17:34:42 UTC
[1/2] ambari git commit: AMBARI-18868. Stage and Request status
should be persisted in the database. (jaimin)
Repository: ambari
Updated Branches:
refs/heads/trunk dd174f417 -> 0fc7a6671
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
index 7944d21..f19aa72 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
@@ -69,9 +69,28 @@ public class RequestEntity {
@Enumerated(value = EnumType.STRING)
private RequestType requestType;
- @Column(name = "status")
+ /**
+ * This is the logical status of the request and
+ * represents if the intent of the request has been accomplished or not
+ *
+ * Status calculated by calculating {@link StageEntity#status} of all belonging stages
+ *
+ */
+ @Column(name = "status", nullable = false)
@Enumerated(value = EnumType.STRING)
- private HostRoleStatus status;
+ private HostRoleStatus status = HostRoleStatus.PENDING;
+
+ /**
+ * This status informs if any of the underlying tasks
+ * have faced any type of failures {@link HostRoleStatus#isFailedState()}
+ *
+ * Status calculated by only taking into account
+ * all belonging {@link HostRoleCommandEntity#status} (or {@link StageEntity#status})
+ *
+ */
+ @Column(name = "display_status", nullable = false)
+ @Enumerated(value = EnumType.STRING)
+ private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
@Basic
@Column(name = "create_time", nullable = false)
@@ -89,7 +108,7 @@ public class RequestEntity {
@Column(name = "exclusive_execution", insertable = true, updatable = true, nullable = false)
private Integer exclusive = 0;
- @OneToMany(mappedBy = "request")
+ @OneToMany(mappedBy = "request", cascade = CascadeType.REMOVE)
private Collection<StageEntity> stages;
@OneToMany(mappedBy = "requestEntity", cascade = CascadeType.ALL)
@@ -207,14 +226,38 @@ public class RequestEntity {
this.commandName = commandName;
}
+ /**
+ * get status for the request
+ * @return {@link HostRoleStatus}
+ */
public HostRoleStatus getStatus() {
return status;
}
+ /**
+ * sets status for the request
+ * @param status {@link HostRoleStatus}
+ */
public void setStatus(HostRoleStatus status) {
this.status = status;
}
+ /**
+ * get display status for the request
+ * @return {@link HostRoleStatus}
+ */
+ public HostRoleStatus getDisplayStatus() {
+ return displayStatus;
+ }
+
+ /**
+ * sets display status for the request
+ * @param displayStatus {@link HostRoleStatus}
+ */
+ public void setDisplayStatus(HostRoleStatus displayStatus) {
+ this.displayStatus = displayStatus;
+ }
+
public RequestScheduleEntity getRequestScheduleEntity() {
return requestScheduleEntity;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index f9c8810..f68338f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -39,17 +39,21 @@ import javax.persistence.OneToMany;
import javax.persistence.Table;
import org.apache.ambari.server.actionmanager.CommandExecutionType;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
@Entity
@Table(name = "stage")
@IdClass(org.apache.ambari.server.orm.entities.StageEntityPK.class)
@NamedQueries({
@NamedQuery(
- name = "StageEntity.findByCommandStatuses",
- query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.requestId, stage.stageId"),
+ name = "StageEntity.findByStatuses",
+ query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses ORDER BY stage.requestId, stage.stageId"),
+ @NamedQuery(
+ name = "StageEntity.findByPK",
+ query = "SELECT stage from StageEntity stage WHERE stage.requestId = :requestId AND stage.stageId = :stageId"),
@NamedQuery(
name = "StageEntity.findByRequestIdAndCommandStatuses",
- query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.requestId = :requestId AND roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.stageId"),
+ query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses AND stage.requestId = :requestId ORDER BY stage.stageId"),
@NamedQuery(
name = "StageEntity.findIdsByRequestId",
query = "SELECT stage.stageId FROM StageEntity stage WHERE stage.requestId = :requestId ORDER BY stage.stageId ASC") })
@@ -110,6 +114,32 @@ public class StageEntity {
@Basic
private byte[] hostParamsStage;
+ /**
+ * This status informs if the advanced criteria for the stage success
+ * as established at the time of stage creation has been accomplished or not
+ *
+ * Status calculated by taking into account following
+ * a) {@link #roleSuccessCriterias}
+ * b) {@link #skippable}
+ * c) {@link HostRoleCommandEntity#autoSkipOnFailure}
+ * d) {@link HostRoleCommandEntity#status}
+ *
+ */
+ @Column(name = "status", nullable = false)
+ @Enumerated(EnumType.STRING)
+ private HostRoleStatus status = HostRoleStatus.PENDING;
+
+ /**
+ * This status informs if any of the underlying tasks
+ * have faced any type of failures {@link HostRoleStatus#isFailedState()}
+ *
+ * Status calculated by only taking into account {@link HostRoleCommandEntity#status}
+ *
+ */
+ @Column(name = "display_status", nullable = false)
+ @Enumerated(EnumType.STRING)
+ private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
+
@ManyToOne
@JoinColumn(name = "request_id", referencedColumnName = "request_id", nullable = false)
private RequestEntity request;
@@ -195,6 +225,40 @@ public class StageEntity {
this.commandExecutionType = commandExecutionType;
}
+ /**
+ * get status for the stage
+ * @return {@link HostRoleStatus}
+ */
+ public HostRoleStatus getStatus() {
+ return status;
+ }
+
+ /**
+ * sets status for the stage
+ * @param status {@link HostRoleStatus}
+ */
+ public void setStatus(HostRoleStatus status) {
+ this.status = status;
+ }
+
+ /**
+ * get display status for the stage
+ * @return {@link HostRoleStatus}
+ */
+ public HostRoleStatus getDisplayStatus() {
+ return displayStatus;
+ }
+
+
+ /**
+ * sets display status for the stage
+ * @param displayStatus {@link HostRoleStatus}
+ */
+ public void setDisplayStatus(HostRoleStatus displayStatus) {
+ this.displayStatus = displayStatus;
+ }
+
+
@Override
public boolean equals(Object o) {
if (this == o) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java
index 9ca0470..34d175c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java
@@ -68,4 +68,16 @@ public class StageEntityPK implements Serializable {
result = 31 * result + (stageId != null ? stageId.hashCode() : 0);
return result;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder("StageEntityPK{");
+ buffer.append("stageId=").append(getStageId());
+ buffer.append("requestId=").append(getRequestId());
+ buffer.append("}");
+ return buffer.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
index 4f90ef3..0267a5e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
@@ -19,11 +19,25 @@ package org.apache.ambari.server.upgrade;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import javax.persistence.EntityManager;
+
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.StageFactory;
import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.internal.CalculatedStatus;
+import org.apache.ambari.server.orm.DBAccessor;
import org.apache.ambari.server.orm.dao.DaoUtils;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Config;
@@ -41,6 +55,12 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog {
*/
private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog300.class);
+ private static final String STAGE_TABLE = "stage";
+ private static final String STAGE_STATUS_COLUMN = "status";
+ private static final String STAGE_DISPLAY_STATUS_COLUMN = "display_status";
+ private static final String REQUEST_TABLE = "request";
+ private static final String REQUEST_DISPLAY_STATUS_COLUMN = "display_status";
+
@Inject
DaoUtils daoUtils;
@@ -83,6 +103,16 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog {
*/
@Override
protected void executeDDLUpdates() throws AmbariException, SQLException {
+ updateStageTable();
+ }
+
+ protected void updateStageTable() throws SQLException {
+ dbAccessor.addColumn(STAGE_TABLE,
+ new DBAccessor.DBColumnInfo(STAGE_STATUS_COLUMN, String.class, 255, HostRoleStatus.PENDING, false));
+ dbAccessor.addColumn(STAGE_TABLE,
+ new DBAccessor.DBColumnInfo(STAGE_DISPLAY_STATUS_COLUMN, String.class, 255, HostRoleStatus.PENDING, false));
+ dbAccessor.addColumn(REQUEST_TABLE,
+ new DBAccessor.DBColumnInfo(REQUEST_DISPLAY_STATUS_COLUMN, String.class, 255, HostRoleStatus.PENDING, false));
}
/**
@@ -99,6 +129,7 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog {
protected void executeDMLUpdates() throws AmbariException, SQLException {
addNewConfigurationsFromXml();
showHcatDeletedUserMessage();
+ setStatusOfStagesAndRequests();
}
protected void showHcatDeletedUserMessage() {
@@ -122,4 +153,43 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog {
}
+ protected void setStatusOfStagesAndRequests() {
+ executeInTransaction(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ RequestDAO requestDAO = injector.getInstance(RequestDAO.class);
+ StageFactory stageFactory = injector.getInstance(StageFactory.class);
+ EntityManager em = getEntityManagerProvider().get();
+ List<RequestEntity> requestEntities= requestDAO.findAll();
+ for (RequestEntity requestEntity: requestEntities) {
+ Collection<StageEntity> stageEntities= requestEntity.getStages();
+ List <HostRoleStatus> stageDisplayStatuses = new ArrayList<>();
+ List <HostRoleStatus> stageStatuses = new ArrayList<>();
+ for (StageEntity stageEntity: stageEntities) {
+ Stage stage = stageFactory.createExisting(stageEntity);
+ List<HostRoleCommand> hostRoleCommands = stage.getOrderedHostRoleCommands();
+ Map<HostRoleStatus, Integer> statusCount = CalculatedStatus.calculateStatusCountsForTasks(hostRoleCommands);
+ HostRoleStatus stageDisplayStatus = CalculatedStatus.calculateSummaryDisplayStatus(statusCount, hostRoleCommands.size(), stage.isSkippable());
+ HostRoleStatus stageStatus = CalculatedStatus.calculateStageStatus(hostRoleCommands, statusCount, stage.getSuccessFactors(), stage.isSkippable());
+ stageEntity.setStatus(stageStatus);
+ stageStatuses.add(stageStatus);
+ stageEntity.setDisplayStatus(stageDisplayStatus);
+ stageDisplayStatuses.add(stageDisplayStatus);
+ em.merge(stageEntity);
+ }
+ HostRoleStatus requestStatus = CalculatedStatus.getOverallStatusForRequest(stageStatuses);
+ requestEntity.setStatus(requestStatus);
+ HostRoleStatus requestDisplayStatus = CalculatedStatus.getOverallDisplayStatusForRequest(stageDisplayStatuses);
+ requestEntity.setDisplayStatus(requestDisplayStatus);
+ em.merge(requestEntity);
+ }
+ } catch (Exception e) {
+ LOG.warn("Setting status for stages and Requests threw exception. ", e);
+ }
+ }
+ });
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
index f007b53..6c7cb09 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
@@ -345,7 +345,8 @@ CREATE TABLE request (
request_type VARCHAR(255),
request_schedule_id BIGINT,
start_time BIGINT NOT NULL,
- status VARCHAR(255),
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_request PRIMARY KEY (request_id),
CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
@@ -361,6 +362,8 @@ CREATE TABLE stage (
command_params BLOB,
host_params BLOB,
command_execution_type VARCHAR(32) NOT NULL DEFAULT 'STAGE',
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_stage PRIMARY KEY (stage_id, request_id),
CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
@@ -378,7 +381,7 @@ CREATE TABLE host_role_command (
start_time BIGINT NOT NULL,
original_start_time BIGINT NOT NULL,
end_time BIGINT,
- status VARCHAR(255),
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
auto_skip_on_failure SMALLINT DEFAULT 0 NOT NULL,
std_error BLOB,
std_out BLOB,
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
index f6cb896..ebb0da0 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
@@ -356,7 +356,8 @@ CREATE TABLE request (
request_context VARCHAR(255),
request_type VARCHAR(255),
start_time BIGINT NOT NULL,
- status VARCHAR(255),
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_request PRIMARY KEY (request_id),
CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
@@ -372,6 +373,8 @@ CREATE TABLE stage (
command_params LONGBLOB,
host_params LONGBLOB,
command_execution_type VARCHAR(32) NOT NULL DEFAULT 'STAGE',
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_stage PRIMARY KEY (stage_id, request_id),
CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
@@ -390,7 +393,7 @@ CREATE TABLE host_role_command (
start_time BIGINT NOT NULL,
original_start_time BIGINT NOT NULL,
end_time BIGINT,
- status VARCHAR(100),
+ status VARCHAR(100) NOT NULL DEFAULT 'PENDING',
auto_skip_on_failure SMALLINT DEFAULT 0 NOT NULL,
std_error LONGBLOB,
std_out LONGBLOB,
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
index 19253e8..884eb06 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
@@ -336,7 +336,8 @@ CREATE TABLE request (
request_context VARCHAR(255),
request_type VARCHAR(255),
start_time NUMBER(19) NOT NULL,
- status VARCHAR(255),
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_request PRIMARY KEY (request_id),
CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
@@ -352,6 +353,8 @@ CREATE TABLE stage (
command_params BLOB,
host_params BLOB,
command_execution_type VARCHAR2(32) DEFAULT 'STAGE' NOT NULL,
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_stage PRIMARY KEY (stage_id, request_id),
CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
@@ -370,7 +373,7 @@ CREATE TABLE host_role_command (
start_time NUMBER(19) NOT NULL,
original_start_time NUMBER(19) NOT NULL,
end_time NUMBER(19),
- status VARCHAR2(255) NULL,
+ status VARCHAR2(255) NOT NULL DEFAULT 'PENDING',
auto_skip_on_failure NUMBER(1) DEFAULT 0 NOT NULL,
std_error BLOB NULL,
std_out BLOB NULL,
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
index b13a9e3..7e57d9f 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
@@ -345,7 +345,8 @@ CREATE TABLE request (
request_type VARCHAR(255),
request_schedule_id BIGINT,
start_time BIGINT NOT NULL,
- status VARCHAR(255),
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_request PRIMARY KEY (request_id),
CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
@@ -361,6 +362,8 @@ CREATE TABLE stage (
command_params BYTEA,
host_params BYTEA,
command_execution_type VARCHAR(32) DEFAULT 'STAGE' NOT NULL,
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_stage PRIMARY KEY (stage_id, request_id),
CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
@@ -378,7 +381,7 @@ CREATE TABLE host_role_command (
start_time BIGINT NOT NULL,
original_start_time BIGINT NOT NULL,
end_time BIGINT,
- status VARCHAR(255),
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
auto_skip_on_failure SMALLINT DEFAULT 0 NOT NULL,
std_error BYTEA,
std_out BYTEA,
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
index cf2954a..2c4bd55 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
@@ -334,7 +334,8 @@ CREATE TABLE request (
request_context VARCHAR(255),
request_type VARCHAR(255),
start_time NUMERIC(19) NOT NULL,
- status VARCHAR(255),
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_request PRIMARY KEY (request_id),
CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
@@ -350,6 +351,8 @@ CREATE TABLE stage (
command_params IMAGE,
host_params IMAGE,
command_execution_type VARCHAR(32) NOT NULL DEFAULT 'STAGE',
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_stage PRIMARY KEY (stage_id, request_id),
CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
@@ -368,7 +371,7 @@ CREATE TABLE host_role_command (
start_time NUMERIC(19) NOT NULL,
original_start_time NUMERIC(19) NOT NULL,
end_time NUMERIC(19),
- status VARCHAR(255),
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
auto_skip_on_failure SMALLINT DEFAULT 0 NOT NULL,
std_error IMAGE,
std_out IMAGE,
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
index 16c269a..a86a767 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
@@ -350,7 +350,8 @@ CREATE TABLE request (
request_type VARCHAR(255),
request_schedule_id BIGINT,
start_time BIGINT NOT NULL,
- status VARCHAR(255),
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_request PRIMARY KEY CLUSTERED (request_id),
CONSTRAINT FK_request_schedule_id FOREIGN KEY (request_schedule_id) REFERENCES requestschedule (schedule_id));
@@ -366,6 +367,8 @@ CREATE TABLE stage (
command_params VARBINARY(MAX),
host_params VARBINARY(MAX),
command_execution_type VARCHAR(32) NOT NULL DEFAULT 'STAGE',
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
+ display_status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
CONSTRAINT PK_stage PRIMARY KEY CLUSTERED (stage_id, request_id),
CONSTRAINT FK_stage_request_id FOREIGN KEY (request_id) REFERENCES request (request_id));
@@ -383,7 +386,7 @@ CREATE TABLE host_role_command (
start_time BIGINT NOT NULL,
original_start_time BIGINT NOT NULL,
end_time BIGINT,
- status VARCHAR(255),
+ status VARCHAR(255) NOT NULL DEFAULT 'PENDING',
auto_skip_on_failure SMALLINT DEFAULT 0 NOT NULL,
std_error VARBINARY(max),
std_out VARBINARY(max),
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index 177ac70..edc5683 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -55,6 +55,7 @@ import org.apache.ambari.server.utils.CommandUtils;
import org.apache.ambari.server.utils.StageUtils;
import org.easymock.EasyMock;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -67,8 +68,6 @@ import com.google.inject.Injector;
import com.google.inject.Singleton;
import com.google.inject.util.Modules;
-import junit.framework.Assert;
-
public class TestActionDBAccessorImpl {
private static final Logger log = LoggerFactory.getLogger(TestActionDBAccessorImpl.class);
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 6519126..526ca7c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyCollectionOf;
+import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@@ -100,9 +101,11 @@ import org.apache.ambari.server.utils.StageUtils;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -119,8 +122,6 @@ import com.google.inject.Injector;
import com.google.inject.Provider;
import com.google.inject.persist.UnitOfWork;
-import junit.framework.Assert;
-
public class TestActionScheduler {
private static final Logger log = LoggerFactory.getLogger(TestActionScheduler.class);
@@ -207,6 +208,8 @@ public class TestActionScheduler {
when(host.getHostName()).thenReturn(hostname);
ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
List<Stage> stages = new ArrayList<Stage>();
Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
@@ -222,7 +225,7 @@ public class TestActionScheduler {
//Keep large number of attempts so that the task is not expired finally
//Small action timeout to test rescheduling
ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm,
- 10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, null, null);
+ 10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null);
scheduler.setTaskTimeoutAdjustment(false);
List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -314,6 +317,8 @@ public class TestActionScheduler {
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
@@ -335,7 +340,7 @@ public class TestActionScheduler {
//Small action timeout to test rescheduling
ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, null, null);
+ new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null);
scheduler.setTaskTimeoutAdjustment(false);
// Start the thread
@@ -405,6 +410,8 @@ public class TestActionScheduler {
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
doAnswer(new Answer<Void>() {
@Override
@@ -508,6 +515,8 @@ public class TestActionScheduler {
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
doAnswer(new Answer<Collection<HostRoleCommandEntity>>() {
@Override
@@ -543,7 +552,7 @@ public class TestActionScheduler {
// Make sure the NN install doesn't timeout
ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
scheduler.setTaskTimeoutAdjustment(false);
int cycleCount=0;
@@ -606,6 +615,8 @@ public class TestActionScheduler {
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -658,7 +669,7 @@ public class TestActionScheduler {
ServerActionExecutor.init(injector);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
int cycleCount = 0;
while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -721,6 +732,8 @@ public class TestActionScheduler {
stages.add(stage12);
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -735,7 +748,7 @@ public class TestActionScheduler {
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf,
- entityManagerProviderMock, (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+ entityManagerProviderMock, hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
scheduler.doWork();
@@ -763,6 +776,8 @@ public class TestActionScheduler {
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -816,7 +831,7 @@ public class TestActionScheduler {
ServerActionExecutor.init(injector);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
int cycleCount = 0;
while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION").isCompletedState()
@@ -976,6 +991,8 @@ public class TestActionScheduler {
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -1028,7 +1045,7 @@ public class TestActionScheduler {
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
int cycleCount = 0;
while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -1124,6 +1141,8 @@ public class TestActionScheduler {
RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -1136,7 +1155,7 @@ public class TestActionScheduler {
Configuration conf = new Configuration(properties);
ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
@@ -1214,6 +1233,8 @@ public class TestActionScheduler {
RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -1228,7 +1249,7 @@ public class TestActionScheduler {
ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
@@ -1289,6 +1310,8 @@ public class TestActionScheduler {
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -1303,7 +1326,7 @@ public class TestActionScheduler {
ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
@@ -1544,6 +1567,8 @@ public class TestActionScheduler {
stage.setLastAttemptTime(host2, Role.HBASE_CLIENT.toString(), now);
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -1616,7 +1641,7 @@ public class TestActionScheduler {
ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
scheduler.doWork();
@@ -1729,6 +1754,8 @@ public class TestActionScheduler {
"host1", "cluster1", Role.HDFS_CLIENT, RoleCommand.UPGRADE, Service.Type.HDFS, 4, 2, 1));
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -1808,7 +1835,7 @@ public class TestActionScheduler {
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
ActionManager am = new ActionManager(db, requestFactory, scheduler);
@@ -1976,6 +2003,8 @@ public class TestActionScheduler {
when(host.getHostName()).thenReturn(hostname);
ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -1993,7 +2022,7 @@ public class TestActionScheduler {
//Small action timeout to test rescheduling
ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
scheduler.setTaskTimeoutAdjustment(false);
List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -2135,6 +2164,8 @@ public class TestActionScheduler {
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -2187,7 +2218,7 @@ public class TestActionScheduler {
ServerActionExecutor.init(injector);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null);
int cycleCount = 0;
while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -2467,6 +2498,8 @@ public class TestActionScheduler {
when(host3.getHostName()).thenReturn(hostname);
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
when(db.getStagesInProgress()).thenReturn(stagesInProgress);
@@ -2542,7 +2575,7 @@ public class TestActionScheduler {
ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
@@ -2706,6 +2739,8 @@ public class TestActionScheduler {
command.setStatus(HostRoleStatus.FAILED);
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -2776,7 +2811,7 @@ public class TestActionScheduler {
ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
- (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
+ hostRoleCommandDAOMock, (HostRoleCommandFactory)null));
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/alerts/AmbariPerformanceRunnableTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/alerts/AmbariPerformanceRunnableTest.java b/ambari-server/src/test/java/org/apache/ambari/server/alerts/AmbariPerformanceRunnableTest.java
index 7b1a5a2..facd802 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/alerts/AmbariPerformanceRunnableTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/alerts/AmbariPerformanceRunnableTest.java
@@ -18,12 +18,13 @@
package org.apache.ambari.server.alerts;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.Map;
import javax.persistence.EntityManager;
import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
import org.apache.ambari.server.alerts.AmbariPerformanceRunnable.PerformanceArea;
import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.controller.internal.ClusterResourceProvider;
@@ -287,6 +289,7 @@ public class AmbariPerformanceRunnableTest {
binder.bind(AlertsDAO.class).toInstance(createNiceMock(AlertsDAO.class));
binder.bind(EntityManager.class).toInstance(createNiceMock(EntityManager.class));
binder.bind(ActionManager.class).toInstance(createNiceMock(ActionManager.class));
+ binder.bind(HostRoleCommandFactory.class).toInstance(createNiceMock(HostRoleCommandFactory.class));
binder.bind(HostRoleCommandDAO.class).toInstance(createNiceMock(HostRoleCommandDAO.class));
binder.bind(AmbariManagementController.class).toInstance(createNiceMock(AmbariManagementController.class));
binder.bind(AlertDefinitionFactory.class).toInstance(createNiceMock(AlertDefinitionFactory.class));
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
index a0701b6..f8b57e5 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
@@ -624,7 +624,6 @@ public class UpgradeResourceProviderTest {
RequestEntity requestEntity = new RequestEntity();
requestEntity.setRequestId(2L);
requestEntity.setClusterId(cluster.getClusterId());
- requestEntity.setStatus(HostRoleStatus.PENDING);
requestEntity.setStages(new ArrayList<StageEntity>());
requestDao.create(requestEntity);
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeSummaryResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeSummaryResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeSummaryResourceProviderTest.java
index 619e367..f009767 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeSummaryResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeSummaryResourceProviderTest.java
@@ -205,7 +205,6 @@ public class UpgradeSummaryResourceProviderTest {
RequestEntity requestEntity = new RequestEntity();
requestEntity.setRequestId(upgradeRequestId);
requestEntity.setClusterId(cluster.getClusterId());
- requestEntity.setStatus(HostRoleStatus.PENDING);
requestDAO.create(requestEntity);
// Create the stage and add it to the request
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
new file mode 100644
index 0000000..64a731b
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.listeners.tasks;
+
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyObject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.ExecutionCommandWrapperFactory;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.events.TaskCreateEvent;
+import org.apache.ambari.server.events.TaskUpdateEvent;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
+import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.dao.StageDAO;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.inject.Inject;
+
+
+public class TaskStatusListenerTest extends EasyMockSupport {
+
+ private TaskEventPublisher publisher = new TaskEventPublisher();
+
+ @Inject
+ private ExecutionCommandDAO executionCommandDAO;
+
+ @Inject
+ private ExecutionCommandWrapperFactory ecwFactory;
+
+
+ @Test
+ public void testOnTaskUpdateEvent() {
+ List<HostRoleCommand> hostRoleCommands = new ArrayList<HostRoleCommand>();
+ ServiceComponentHostEvent serviceComponentHostEvent = createNiceMock(ServiceComponentHostEvent.class);
+ HostDAO hostDAO = createNiceMock(HostDAO.class);
+ replayAll();
+
+ int hostRoleCommandSize = 3;
+ int hrcCounter = 1;
+ for (int stageCounter = 0; stageCounter < 2; stageCounter++) {
+ for (int i = 1; i <= hostRoleCommandSize; i++,hrcCounter++) {
+ String hostname = "hostname-" + hrcCounter;
+ HostRoleCommand hostRoleCommand = new HostRoleCommand(hostname, Role.DATANODE,
+ serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory);
+ hostRoleCommand.setStatus(HostRoleStatus.PENDING);
+ hostRoleCommand.setRequestId(1L);
+ hostRoleCommand.setStageId(stageCounter);
+ hostRoleCommand.setTaskId(hrcCounter);
+ hostRoleCommands.add(hostRoleCommand);
+ }
+ }
+
+ HostRoleStatus hostRoleStatus = HostRoleStatus.PENDING;
+ StageDAO stageDAO = createNiceMock(StageDAO.class);
+ RequestDAO requestDAO = createNiceMock(RequestDAO.class);
+ StageEntity stageEntity = createNiceMock(StageEntity.class);
+ RequestEntity requestEntity = createNiceMock(RequestEntity.class);
+ EasyMock.expect(stageEntity.getStatus()).andReturn(hostRoleStatus).anyTimes();;
+ EasyMock.expect(stageEntity.getDisplayStatus()).andReturn(hostRoleStatus).anyTimes();
+ EasyMock.expect(stageEntity.isSkippable()).andReturn(Boolean.FALSE).anyTimes();;
+ EasyMock.expect(stageEntity.getRoleSuccessCriterias()).andReturn(Collections.<RoleSuccessCriteriaEntity>emptyList()).anyTimes();
+ EasyMock.expect(stageDAO.findByPK(anyObject(StageEntityPK.class))).andReturn(stageEntity).anyTimes();
+ EasyMock.expect(requestEntity.getStatus()).andReturn(hostRoleStatus).anyTimes();;
+ EasyMock.expect(requestEntity.getDisplayStatus()).andReturn(hostRoleStatus).anyTimes();
+ EasyMock.expect(requestDAO.findByPK(anyLong())).andReturn(requestEntity).anyTimes();
+
+ requestDAO.updateStatus(1L,HostRoleStatus.COMPLETED,HostRoleStatus.SKIPPED_FAILED);
+ EasyMock.expectLastCall().times(1);
+
+
+
+ EasyMock.replay(stageEntity);
+ EasyMock.replay(requestEntity);
+ EasyMock.replay(stageDAO);
+ EasyMock.replay(requestDAO);
+
+ TaskCreateEvent event = new TaskCreateEvent(hostRoleCommands);
+ TaskStatusListener listener = new TaskStatusListener(publisher,stageDAO,requestDAO);
+
+ Assert.assertTrue(listener.getActiveTasksMap().isEmpty());
+ Assert.assertTrue(listener.getActiveStageMap().isEmpty());
+ Assert.assertTrue(listener.getActiveRequestMap().isEmpty());
+
+ listener.onTaskCreateEvent(event);
+ Assert.assertEquals(listener.getActiveTasksMap().size(),6);
+ Assert.assertEquals(listener.getActiveStageMap().size(),2);
+ Assert.assertEquals(listener.getActiveRequestMap().size(),1);
+ Assert.assertEquals(listener.getActiveRequestMap().get(1L).getStatus(), hostRoleStatus);
+
+
+
+ // update of a task status of IN_PROGRESS should cascade into an update of request status
+ String hostname = "hostname-1";
+ HostRoleCommand hostRoleCommand = new HostRoleCommand(hostname, Role.DATANODE,
+ serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory);
+ hostRoleCommand.setStatus(HostRoleStatus.IN_PROGRESS);
+ hostRoleCommand.setRequestId(1L);
+ hostRoleCommand.setStageId(0);
+ hostRoleCommand.setTaskId(1L);
+ listener.onTaskUpdateEvent(new TaskUpdateEvent(Collections.singletonList(hostRoleCommand)));
+ Assert.assertEquals(HostRoleStatus.IN_PROGRESS, listener.getActiveRequestMap().get(1L).getStatus());
+
+ // update of all tasks status of skip_failed and completed states should cascade into request status of completed
+ // and request display status to be of skip_failed
+ hrcCounter = 1;
+ List<HostRoleCommand> finalHostRoleCommands = new ArrayList<HostRoleCommand>();
+ HostRoleStatus finalHostRoleStatus = HostRoleStatus.COMPLETED;
+ for (int stageCounter = 0; stageCounter < 2; stageCounter++) {
+ for (int i = 1; i <= hostRoleCommandSize; i++,hrcCounter++) {
+ String finalHostname = "hostname-" + hrcCounter;
+ HostRoleCommand finalHostRoleCommand = new HostRoleCommand(finalHostname, Role.DATANODE,
+ serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory);
+ finalHostRoleCommand.setStatus(finalHostRoleStatus);
+ finalHostRoleCommand.setRequestId(1L);
+ finalHostRoleCommand.setStageId(stageCounter);
+ finalHostRoleCommand.setTaskId(hrcCounter);
+ finalHostRoleCommands.add(finalHostRoleCommand);
+ }
+ finalHostRoleStatus = HostRoleStatus.SKIPPED_FAILED;
+ }
+
+ listener.onTaskUpdateEvent(new TaskUpdateEvent(finalHostRoleCommands));
+
+ //Once request status and display status are in completed state, it should no longer be tracked by TaskStatusListener
+ Assert.assertNull(listener.getActiveRequestMap().get(1L));
+
+ // verify request status = completed and display_status = skip_failed
+ verifyAll();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
index b1c10f5..1709da8 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigHelperTest.java
@@ -38,6 +38,7 @@ import javax.persistence.EntityManager;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.H2DatabaseCleaner;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
import org.apache.ambari.server.actionmanager.RequestFactory;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.controller.AmbariCustomCommandExecutionHelper;
@@ -980,6 +981,7 @@ public class ConfigHelperTest {
bind(Clusters.class).toInstance(createNiceMock(Clusters.class));
bind(ClusterController.class).toInstance(clusterController);
bind(StackManagerFactory.class).toInstance(createNiceMock(StackManagerFactory.class));
+ bind(HostRoleCommandFactory.class).toInstance(createNiceMock(HostRoleCommandFactory.class));
bind(HostRoleCommandDAO.class).toInstance(createNiceMock(HostRoleCommandDAO.class));
}
});
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterEffectiveVersionTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterEffectiveVersionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterEffectiveVersionTest.java
index 9d339e2..d3c8acf 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterEffectiveVersionTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterEffectiveVersionTest.java
@@ -24,6 +24,7 @@ import java.util.List;
import javax.persistence.EntityManager;
import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
import org.apache.ambari.server.actionmanager.RequestFactory;
import org.apache.ambari.server.actionmanager.StageFactory;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -68,6 +69,7 @@ import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.eclipse.jetty.server.SessionManager;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -83,8 +85,6 @@ import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import junit.framework.Assert;
-
/**
* Tests that cluster effective version is calcualted correctly during upgrades.
*/
@@ -256,6 +256,7 @@ public class ClusterEffectiveVersionTest extends EasyMockSupport {
binder.bind(DBAccessor.class).toInstance(EasyMock.createNiceMock(DBAccessor.class));
binder.bind(EntityManager.class).toInstance(EasyMock.createNiceMock(EntityManager.class));
binder.bind(ActionManager.class).toInstance(EasyMock.createNiceMock(ActionManager.class));
+ binder.bind(HostRoleCommandFactory.class).toInstance(EasyMock.createNiceMock(HostRoleCommandFactory.class));
binder.bind(HostRoleCommandDAO.class).toInstance(EasyMock.createNiceMock(HostRoleCommandDAO.class));
binder.bind(AmbariManagementController.class).toInstance(EasyMock.createNiceMock(AmbariManagementController.class));
binder.bind(ClusterController.class).toInstance(EasyMock.createNiceMock(ClusterController.class));
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
index ed95b0b..e699e49 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
@@ -251,7 +251,6 @@ public class RetryUpgradeActionServiceTest {
RequestEntity requestEntity = new RequestEntity();
requestEntity.setRequestId(upgradeRequestId);
requestEntity.setClusterId(cluster.getClusterId());
- requestEntity.setStatus(HostRoleStatus.PENDING);
requestDAO.create(requestEntity);
// Create the stage and add it to the request
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
index d7979e8..ec001ec 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
@@ -31,15 +31,18 @@ public class UpgradeCatalog300Test {
public void testExecuteDMLUpdates() throws Exception {
Method addNewConfigurationsFromXml = AbstractUpgradeCatalog.class.getDeclaredMethod("addNewConfigurationsFromXml");
Method showHcatDeletedUserMessage = UpgradeCatalog300.class.getDeclaredMethod("showHcatDeletedUserMessage");
+ Method setStatusOfStagesAndRequests = UpgradeCatalog300.class.getDeclaredMethod("setStatusOfStagesAndRequests");
UpgradeCatalog300 upgradeCatalog300 = createMockBuilder(UpgradeCatalog300.class)
.addMockedMethod(showHcatDeletedUserMessage)
.addMockedMethod(addNewConfigurationsFromXml)
+ .addMockedMethod(setStatusOfStagesAndRequests)
.createMock();
upgradeCatalog300.addNewConfigurationsFromXml();
upgradeCatalog300.showHcatDeletedUserMessage();
+ upgradeCatalog300.setStatusOfStagesAndRequests();
replay(upgradeCatalog300);
@@ -49,4 +52,21 @@ public class UpgradeCatalog300Test {
verify(upgradeCatalog300);
}
+ @Test
+ public void testExecuteDDLUpdates() throws Exception {
+ Method updateStageTable = UpgradeCatalog300.class.getDeclaredMethod("updateStageTable");
+ UpgradeCatalog300 upgradeCatalog300 = createMockBuilder(UpgradeCatalog300.class)
+ .addMockedMethod(updateStageTable)
+ .createMock();
+
+ upgradeCatalog300.updateStageTable();
+
+ replay(upgradeCatalog300);
+
+ upgradeCatalog300.executeDDLUpdates();
+
+ verify(upgradeCatalog300);
+ }
+
+
}
[2/2] ambari git commit: AMBARI-18868. Stage and Request status
should be persisted in the database. (jaimin)
Posted by ja...@apache.org.
AMBARI-18868. Stage and Request status should be persisted in the database. (jaimin)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0fc7a667
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0fc7a667
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0fc7a667
Branch: refs/heads/trunk
Commit: 0fc7a6671feb10dc0e8475dc4878942cf19f46cc
Parents: dd174f4
Author: Jaimin Jetly <ja...@hortonworks.com>
Authored: Fri Feb 17 09:31:10 2017 -0800
Committer: Jaimin Jetly <ja...@hortonworks.com>
Committed: Fri Feb 17 09:31:10 2017 -0800
----------------------------------------------------------------------
.../actionmanager/ActionDBAccessorImpl.java | 108 ++--
.../server/actionmanager/ActionScheduler.java | 31 +
.../ambari/server/actionmanager/Request.java | 8 +-
.../ambari/server/actionmanager/Stage.java | 25 +
.../controller/internal/CalculatedStatus.java | 390 +++++++++++-
.../ambari/server/events/TaskCreateEvent.java | 48 ++
.../apache/ambari/server/events/TaskEvent.java | 66 ++
.../ambari/server/events/TaskUpdateEvent.java | 35 ++
.../listeners/tasks/TaskStatusListener.java | 609 +++++++++++++++++++
.../events/publishers/TaskEventPublisher.java | 62 ++
.../server/orm/dao/HostRoleCommandDAO.java | 67 +-
.../ambari/server/orm/dao/RequestDAO.java | 8 +
.../apache/ambari/server/orm/dao/StageDAO.java | 32 +-
.../orm/entities/HostRoleCommandEntity.java | 4 +-
.../server/orm/entities/RequestEntity.java | 49 +-
.../ambari/server/orm/entities/StageEntity.java | 70 ++-
.../server/orm/entities/StageEntityPK.java | 12 +
.../server/upgrade/UpgradeCatalog300.java | 70 +++
.../main/resources/Ambari-DDL-Derby-CREATE.sql | 7 +-
.../main/resources/Ambari-DDL-MySQL-CREATE.sql | 7 +-
.../main/resources/Ambari-DDL-Oracle-CREATE.sql | 7 +-
.../resources/Ambari-DDL-Postgres-CREATE.sql | 7 +-
.../resources/Ambari-DDL-SQLAnywhere-CREATE.sql | 7 +-
.../resources/Ambari-DDL-SQLServer-CREATE.sql | 7 +-
.../actionmanager/TestActionDBAccessorImpl.java | 3 +-
.../actionmanager/TestActionScheduler.java | 71 ++-
.../alerts/AmbariPerformanceRunnableTest.java | 7 +-
.../internal/UpgradeResourceProviderTest.java | 1 -
.../UpgradeSummaryResourceProviderTest.java | 1 -
.../listeners/tasks/TaskStatusListenerTest.java | 164 +++++
.../ambari/server/state/ConfigHelperTest.java | 2 +
.../cluster/ClusterEffectiveVersionTest.java | 5 +-
.../services/RetryUpgradeActionServiceTest.java | 1 -
.../server/upgrade/UpgradeCatalog300Test.java | 20 +
34 files changed, 1892 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 7881a4b..b813fe6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -45,7 +45,9 @@ import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.internal.CalculatedStatus;
import org.apache.ambari.server.events.HostsRemovedEvent;
import org.apache.ambari.server.events.RequestFinishedEvent;
+import org.apache.ambari.server.events.TaskCreateEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
import org.apache.ambari.server.orm.dao.HostDAO;
@@ -130,6 +132,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
AmbariEventPublisher ambariEventPublisher;
@Inject
+ TaskEventPublisher taskEventPublisher;
+
+ @Inject
AuditLogger auditLogger;
/**
@@ -205,8 +210,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
public Collection<HostRoleCommandEntity> abortOperation(long requestId) {
long now = System.currentTimeMillis();
- endRequest(requestId);
-
// only request commands which actually need to be aborted; requesting all
// commands here can cause OOM problems during large requests like upgrades
List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequestIdAndStatuses(requestId,
@@ -228,7 +231,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
if (!commands.isEmpty()) {
return hostRoleCommandDAO.mergeAll(commands);
}
-
+ endRequest(requestId);
return Collections.emptyList();
}
@@ -283,7 +286,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
@Override
@Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
public List<Stage> getStagesInProgress() {
- List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(
+ List<StageEntity> stageEntities = stageDAO.findByStatuses(
HostRoleStatus.IN_PROGRESS_STATUSES);
return getStagesForEntities(stageEntities);
}
@@ -343,6 +346,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
RequestEntity requestEntity = request.constructNewPersistenceEntity();
Long clusterId = -1L;
+ Long requestId = requestEntity.getRequestId();
ClusterEntity clusterEntity = clusterDAO.findById(request.getClusterId());
if (clusterEntity != null) {
clusterId = clusterEntity.getClusterId();
@@ -356,8 +360,11 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
addRequestToAuditlogCache(request);
+ List<HostRoleCommand> hostRoleCommands = new ArrayList<>();
+
for (Stage stage : request.getStages()) {
StageEntity stageEntity = stage.constructNewPersistenceEntity();
+ Long stageId = stageEntity.getStageId();
stageEntities.add(stageEntity);
stageEntity.setClusterId(clusterId);
stageEntity.setRequest(requestEntity);
@@ -366,6 +373,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
List<HostRoleCommand> orderedHostRoleCommands = stage.getOrderedHostRoleCommands();
for (HostRoleCommand hostRoleCommand : orderedHostRoleCommands) {
+ hostRoleCommand.setRequestId(requestId);
+ hostRoleCommand.setStageId(stageId);
HostRoleCommandEntity hostRoleCommandEntity = hostRoleCommand.constructNewPersistenceEntity();
hostRoleCommandEntity.setStage(stageEntity);
hostRoleCommandDAO.create(hostRoleCommandEntity);
@@ -415,11 +424,12 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
hostRoleCommandEntity.setExecutionCommand(executionCommandEntity);
executionCommandDAO.create(hostRoleCommandEntity.getExecutionCommand());
- hostRoleCommandEntity = hostRoleCommandDAO.merge(hostRoleCommandEntity);
+ hostRoleCommandEntity = hostRoleCommandDAO.mergeWithoutPublishEvent(hostRoleCommandEntity);
if (null != hostEntity) {
hostEntity = hostDAO.merge(hostEntity);
}
+ hostRoleCommands.add(hostRoleCommand);
}
for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
@@ -431,6 +441,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
requestEntity.setStages(stageEntities);
requestDAO.merge(requestEntity);
+
+ TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
+ taskEventPublisher.publish(taskCreateEvent);
}
@Override
@@ -497,66 +510,55 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
long now = System.currentTimeMillis();
List<Long> requestsToCheck = new ArrayList<Long>();
- List<Long> abortedCommandUpdates = new ArrayList<Long>();
List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet());
+ List<HostRoleCommandEntity> commandEntitiesToMerge = new ArrayList<HostRoleCommandEntity>();
for (HostRoleCommandEntity commandEntity : commandEntities) {
CommandReport report = taskReports.get(commandEntity.getTaskId());
-
- boolean statusChanged = false;
-
- switch (commandEntity.getStatus()) {
- case ABORTED:
- // We don't want to overwrite statuses for ABORTED tasks with
- // statuses that have been received from the agent after aborting task
- abortedCommandUpdates.add(commandEntity.getTaskId());
- break;
- default:
- HostRoleStatus status = HostRoleStatus.valueOf(report.getStatus());
- // if FAILED and marked for holding then set status = HOLDING_FAILED
- if (status == HostRoleStatus.FAILED && commandEntity.isRetryAllowed()) {
- status = HostRoleStatus.HOLDING_FAILED;
-
- // tasks can be marked as skipped when they fail
- if (commandEntity.isFailureAutoSkipped()) {
- status = HostRoleStatus.SKIPPED_FAILED;
- }
+ HostRoleStatus existingTaskStatus = commandEntity.getStatus();
+ HostRoleStatus reportedTaskStatus = HostRoleStatus.valueOf(report.getStatus());
+ if (!existingTaskStatus.isCompletedState() || existingTaskStatus == HostRoleStatus.ABORTED) {
+ // if FAILED and marked for holding then set reportedTaskStatus = HOLDING_FAILED
+ if (reportedTaskStatus == HostRoleStatus.FAILED && commandEntity.isRetryAllowed()) {
+ reportedTaskStatus = HostRoleStatus.HOLDING_FAILED;
+
+ // tasks can be marked as skipped when they fail
+ if (commandEntity.isFailureAutoSkipped()) {
+ reportedTaskStatus = HostRoleStatus.SKIPPED_FAILED;
}
-
- commandEntity.setStatus(status);
- statusChanged = true;
- break;
- }
-
- commandEntity.setStdOut(report.getStdOut().getBytes());
- commandEntity.setStdError(report.getStdErr().getBytes());
- commandEntity.setStructuredOut(report.getStructuredOut() == null ? null :
- report.getStructuredOut().getBytes());
- commandEntity.setExitcode(report.getExitCode());
-
- if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
- commandEntity.setEndTime(now);
-
- String actionId = report.getActionId();
- long[] requestStageIds = StageUtils.getRequestStage(actionId);
- long requestId = requestStageIds[0];
- long stageId = requestStageIds[1];
- if(statusChanged) {
- auditLog(commandEntity, requestId);
}
- if (requestDAO.getLastStageId(requestId).equals(stageId)) {
- requestsToCheck.add(requestId);
+ if (!existingTaskStatus.isCompletedState()) {
+ commandEntity.setStatus(reportedTaskStatus);
}
+ commandEntity.setStdOut(report.getStdOut().getBytes());
+ commandEntity.setStdError(report.getStdErr().getBytes());
+ commandEntity.setStructuredOut(report.getStructuredOut() == null ? null :
+ report.getStructuredOut().getBytes());
+ commandEntity.setExitcode(report.getExitCode());
+ if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
+ commandEntity.setEndTime(now);
+
+ String actionId = report.getActionId();
+ long[] requestStageIds = StageUtils.getRequestStage(actionId);
+ long requestId = requestStageIds[0];
+ long stageId = requestStageIds[1];
+ auditLog(commandEntity, requestId);
+ if (requestDAO.getLastStageId(requestId).equals(stageId)) {
+ requestsToCheck.add(requestId);
+ }
+ }
+ commandEntitiesToMerge.add(commandEntity);
+ } else {
+ LOG.warn(String.format("Request for invalid transition of host role command status received for task id %d from " +
+ "agent: %s -> %s",commandEntity.getTaskId(),existingTaskStatus,reportedTaskStatus));
}
}
// no need to merge if there's nothing to merge
- if (!commandEntities.isEmpty()) {
- hostRoleCommandDAO.mergeAll(commandEntities);
+ if (!commandEntitiesToMerge.isEmpty()) {
+ hostRoleCommandDAO.mergeAll(commandEntitiesToMerge);
}
- // Invalidate cache because of updates to ABORTED commands
- hostRoleCommandCache.invalidateAll(abortedCommandUpdates);
for (Long requestId : requestsToCheck) {
endRequestIfCompleted(requestId);
@@ -923,7 +925,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
return HostRoleStatus.QUEUED;
}
Collection<HostRoleStatus> taskStatuses = details.getTaskStatuses();
- return CalculatedStatus.calculateSummaryStatusOfStage(CalculatedStatus.calculateStatusCounts(taskStatuses), numberOfTasks, false);
+ return CalculatedStatus.calculateSummaryStatus(CalculatedStatus.calculateStatusCounts(taskStatuses), numberOfTasks, false);
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 680c0a6..a92c03c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.persistence.EntityManager;
@@ -49,6 +50,7 @@ import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.events.publishers.JPAEventPublisher;
import org.apache.ambari.server.metadata.RoleCommandOrder;
@@ -75,10 +77,13 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.eventbus.Subscribe;
import com.google.common.reflect.TypeToken;
@@ -179,6 +184,9 @@ class ActionScheduler implements Runnable {
* we receive awake() request during running a scheduler iteration.
*/
private boolean activeAwakeRequest = false;
+
+ private AtomicBoolean taskStatusLoaded = new AtomicBoolean();
+
//Cache for clusterHostinfo, key - stageId-requestId
private Cache<String, Map<String, Set<String>>> clusterHostInfoCache;
private Cache<String, Map<String, String>> commandParamsStageCache;
@@ -353,6 +361,8 @@ class ActionScheduler implements Runnable {
LOG.debug("Processing {} in progress stages ", stages.size());
}
+ publishInProgressTasks(stages);
+
if (stages.isEmpty()) {
// Nothing to do
if (LOG.isDebugEnabled()) {
@@ -532,6 +542,27 @@ class ActionScheduler implements Runnable {
}
}
+ /**
+ * publish event to load {@link TaskStatusListener#activeTasksMap} {@link TaskStatusListener#activeStageMap}
+ * and {@link TaskStatusListener#activeRequestMap} for all running request once during server startup.
+ * This is required as some tasks may have been in progress when server was last stopped
+ * @param stages list of stages
+ */
+ private void publishInProgressTasks(List<Stage> stages) {
+ if (taskStatusLoaded.compareAndSet(false, true)) {
+ if (!stages.isEmpty()) {
+ Function<Stage, Long> transform = new Function<Stage, Long>() {
+ @Override
+ public Long apply(Stage stage) {
+ return stage.getRequestId();
+ }
+ };
+ Set<Long> runningRequestID = ImmutableSet.copyOf(Lists.transform(stages, transform));
+ List<HostRoleCommand> hostRoleCommands = db.getAllTasksByRequestIds(runningRequestID);
+ hostRoleCommandDAO.publishTaskCreateEvent(hostRoleCommands);
+ }
+ }
+ }
/**
* Returns the list of hosts that have a task assigned
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
index 31e11c1..502c016 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
@@ -71,7 +71,8 @@ public class Request {
* As of now, this field is not used. Request status is
* calculated at RequestResourceProvider on the fly.
*/
- private HostRoleStatus status; // not persisted yet
+ private HostRoleStatus status = HostRoleStatus.PENDING;
+ private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
private String inputs;
private List<RequestResourceFilter> resourceFilters;
private RequestOperationLevel operationLevel;
@@ -186,6 +187,7 @@ public class Request {
this.requestType = entity.getRequestType();
this.commandName = entity.getCommandName();
this.status = entity.getStatus();
+ this.displayStatus = entity.getDisplayStatus();
if (entity.getRequestScheduleEntity() != null) {
this.requestScheduleId = entity.getRequestScheduleEntity().getScheduleId();
}
@@ -241,6 +243,8 @@ public class Request {
requestEntity.setInputs(inputs);
requestEntity.setRequestType(requestType);
requestEntity.setRequestScheduleId(requestScheduleId);
+ requestEntity.setStatus(status);
+ requestEntity.setDisplayStatus(displayStatus);
//TODO set all fields
if (resourceFilters != null) {
@@ -381,6 +385,8 @@ public class Request {
", startTime=" + startTime +
", endTime=" + endTime +
", inputs='" + inputs + '\'' +
+ ", status='" + status + '\'' +
+ ", displayStatus='" + displayStatus + '\'' +
", resourceFilters='" + resourceFilters + '\'' +
", operationLevel='" + operationLevel + '\'' +
", requestType=" + requestType +
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index 4a05b32..f7ceca2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -74,6 +74,8 @@ public class Stage {
private long stageId = -1;
private final String logDir;
private final String requestContext;
+ private HostRoleStatus status = HostRoleStatus.PENDING;
+ private HostRoleStatus displayStatus = HostRoleStatus.PENDING;
private String clusterHostInfo;
private String commandParamsStage;
private String hostParamsStage;
@@ -157,6 +159,8 @@ public class Stage {
commandParamsStage = stageEntity.getCommandParamsStage();
hostParamsStage = stageEntity.getHostParamsStage();
commandExecutionType = stageEntity.getCommandExecutionType();
+ status = stageEntity.getStatus();
+ displayStatus = stageEntity.getDisplayStatus();
List<Long> taskIds = hostRoleCommandDAO.findTaskIdsByStage(requestId, stageId);
Collection<HostRoleCommand> commands = dbAccessor.getTasks(taskIds);
@@ -197,6 +201,8 @@ public class Stage {
stageEntity.setCommandParamsStage(commandParamsStage);
stageEntity.setHostParamsStage(hostParamsStage);
stageEntity.setCommandExecutionType(commandExecutionType);
+ stageEntity.setStatus(status);
+ stageEntity.setDisplayStatus(displayStatus);
for (Role role : successFactors.keySet()) {
RoleSuccessCriteriaEntity roleSuccessCriteriaEntity = new RoleSuccessCriteriaEntity();
@@ -290,6 +296,23 @@ public class Stage {
this.commandExecutionType = commandExecutionType;
}
+ /**
+ * get current status of the stage
+ * @return {@link HostRoleStatus}
+ */
+ public HostRoleStatus getStatus() {
+ return status;
+ }
+
+ /**
+ * sets status of the stage
+ * @param status {@link HostRoleStatus}
+ */
+ public void setStatus(HostRoleStatus status) {
+ this.status = status;
+ }
+
+
public synchronized void setStageId(long stageId) {
if (this.stageId != -1) {
throw new RuntimeException("Attempt to set stageId again! Not allowed.");
@@ -915,6 +938,8 @@ public class Stage {
builder.append("clusterHostInfo="+clusterHostInfo+"\n");
builder.append("commandParamsStage="+commandParamsStage+"\n");
builder.append("hostParamsStage="+hostParamsStage+"\n");
+ builder.append("status="+status+"\n");
+ builder.append("displayStatus="+displayStatus+"\n");
builder.append("Success Factors:\n");
for (Role r : successFactors.keySet()) {
builder.append(" role: "+r+", factor: "+successFactors.get(r)+"\n");
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
index 3c415df..32dd03d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
@@ -26,12 +26,20 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.ambari.server.Role;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
/**
* Status of a request resource, calculated from a set of tasks or stages.
@@ -142,7 +150,7 @@ public class CalculatedStatus {
Map<HostRoleStatus, Integer> taskStatusCounts = CalculatedStatus.calculateTaskEntityStatusCounts(tasks);
- HostRoleStatus status = calculateSummaryStatusOfStage(taskStatusCounts, size, skippable);
+ HostRoleStatus status = calculateSummaryStatus(taskStatusCounts, size, skippable);
double progressPercent = calculateProgressPercent(taskStatusCounts, size);
@@ -167,7 +175,7 @@ public class CalculatedStatus {
// calculate the stage status from the task status counts
HostRoleStatus stageStatus =
- calculateSummaryStatusOfStage(calculateTaskEntityStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
+ calculateSummaryStatus(calculateTaskEntityStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
stageStatuses.add(stageStatus);
@@ -203,7 +211,7 @@ public class CalculatedStatus {
// calculate the stage status from the task status counts
HostRoleStatus stageStatus =
- calculateSummaryStatusOfStage(calculateTaskStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
+ calculateSummaryStatus(calculateTaskStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable());
stageStatuses.add(stageStatus);
@@ -256,6 +264,126 @@ public class CalculatedStatus {
}
/**
+ * Returns counts of tasks that are in various states.
+ *
+ * @param hostRoleCommands collection of beans {@link HostRoleCommand}
+ *
+ * @return a map of counts of tasks keyed by the task status
+ */
+ public static Map<HostRoleStatus, Integer> calculateStatusCountsForTasks(Collection<HostRoleCommand> hostRoleCommands) {
+ Map<HostRoleStatus, Integer> counters = new HashMap<>();
+ // initialize
+ for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+ counters.put(hostRoleStatus, 0);
+ }
+ // calculate counts
+ for (HostRoleCommand hrc : hostRoleCommands) {
+ // count tasks where isCompletedState() == true as COMPLETED
+ // but don't count tasks with COMPLETED status twice
+ if (hrc.getStatus().isCompletedState() && hrc.getStatus() != HostRoleStatus.COMPLETED) {
+ // Increase total number of completed tasks;
+ counters.put(HostRoleStatus.COMPLETED, counters.get(HostRoleStatus.COMPLETED) + 1);
+ }
+ // Increment counter for particular status
+ counters.put(hrc.getStatus(), counters.get(hrc.getStatus()) + 1);
+ }
+
+ // We overwrite the value to have the sum converged
+ counters.put(HostRoleStatus.IN_PROGRESS,
+ hostRoleCommands.size() -
+ counters.get(HostRoleStatus.COMPLETED) -
+ counters.get(HostRoleStatus.QUEUED) -
+ counters.get(HostRoleStatus.PENDING));
+
+ return counters;
+ }
+
+ /**
+ * Returns map for counts of stages that are in various states.
+ *
+ * @param stages collection of beans {@link org.apache.ambari.server.events.listeners.tasks.TaskStatusListener.ActiveStage}
+ *
+ * @return a map of counts of tasks keyed by the task status
+ */
+ public static Map<StatusType,Map<HostRoleStatus, Integer>> calculateStatusCountsForStage(Collection<TaskStatusListener.ActiveStage> stages) {
+
+ Map<StatusType,Map<HostRoleStatus, Integer>> counters = new HashMap<>();
+ for (StatusType statusType : StatusType.values()) {
+ Map <HostRoleStatus, Integer> statusMap = new HashMap<HostRoleStatus, Integer>();
+ counters.put(statusType,statusMap);
+ // initialize
+ for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+ statusMap.put(hostRoleStatus, 0);
+ }
+ for (TaskStatusListener.ActiveStage stage : stages) {
+ // count tasks where isCompletedState() == true as COMPLETED
+ // but don't count tasks with COMPLETED status twice
+ HostRoleStatus status;
+ if (statusType == StatusType.DISPLAY_STATUS) {
+ status = stage.getDisplayStatus();
+ } else {
+ status = stage.getStatus();
+ }
+ if (status.isCompletedState() && status != HostRoleStatus.COMPLETED) {
+ // Increase total number of completed tasks;
+ statusMap.put(HostRoleStatus.COMPLETED, statusMap.get(HostRoleStatus.COMPLETED) + 1);
+ }
+
+ // Increment counter for particular status
+ statusMap.put(status, statusMap.get(status) + 1);
+ }
+ statusMap.put(HostRoleStatus.IN_PROGRESS,
+ stages.size() -
+ statusMap.get(HostRoleStatus.COMPLETED) -
+ statusMap.get(HostRoleStatus.QUEUED) -
+ statusMap.get(HostRoleStatus.PENDING));
+ }
+ return counters;
+ }
+
+
+ /**
+ * Returns counts of tasks that are in various states.
+ *
+ * @param hostRoleCommands collection of beans {@link HostRoleCommand}
+ *
+ * @return a map of counts of tasks keyed by the task status
+ */
+ public static Map<HostRoleStatus, Integer> calculateStatusCountsForTasks(Collection<HostRoleCommand> hostRoleCommands, StageEntityPK stage) {
+ Map<HostRoleStatus, Integer> counters = new HashMap<>();
+ List<HostRoleCommand> hostRoleCommandsOfStage = new ArrayList<>();
+ // initialize
+ for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) {
+ counters.put(hostRoleStatus, 0);
+ }
+ // calculate counts
+ for (HostRoleCommand hrc : hostRoleCommands) {
+ if (stage.getStageId() == hrc.getStageId() && stage.getRequestId() == hrc.getRequestId()) {
+ // count tasks where isCompletedState() == true as COMPLETED
+ // but don't count tasks with COMPLETED status twice
+ if (hrc.getStatus().isCompletedState() && hrc.getStatus() != HostRoleStatus.COMPLETED) {
+ // Increase total number of completed tasks;
+ counters.put(HostRoleStatus.COMPLETED, counters.get(HostRoleStatus.COMPLETED) + 1);
+ }
+
+ // Increment counter for particular status
+ counters.put(hrc.getStatus(), counters.get(hrc.getStatus()) + 1);
+
+ hostRoleCommandsOfStage.add(hrc);
+ }
+ }
+
+ // We overwrite the value to have the sum converged
+ counters.put(HostRoleStatus.IN_PROGRESS,
+ hostRoleCommandsOfStage.size() -
+ counters.get(HostRoleStatus.COMPLETED) -
+ counters.get(HostRoleStatus.QUEUED) -
+ counters.get(HostRoleStatus.PENDING));
+
+ return counters;
+ }
+
+ /**
* Returns counts of task entities that are in various states.
*
* @param tasks the collection of task entities
@@ -329,7 +457,7 @@ public class CalculatedStatus {
int total = summary.getTaskTotal();
boolean skip = summary.isStageSkippable();
Map<HostRoleStatus, Integer> counts = calculateStatusCounts(summary.getTaskStatuses());
- HostRoleStatus stageStatus = calculateSummaryStatusOfStage(counts, total, skip);
+ HostRoleStatus stageStatus = calculateSummaryStatus(counts, total, skip);
HostRoleStatus stageDisplayStatus = calculateSummaryDisplayStatus(counts, total, skip);
stageStatuses.add(stageStatus);
@@ -392,7 +520,7 @@ public class CalculatedStatus {
*
* @return summary request status based on statuses of tasks in different states.
*/
- public static HostRoleStatus calculateSummaryStatusOfStage(Map<HostRoleStatus, Integer> counters,
+ public static HostRoleStatus calculateSummaryStatus(Map<HostRoleStatus, Integer> counters,
int total, boolean skippable) {
// when there are 0 tasks, return COMPLETED
@@ -435,6 +563,230 @@ public class CalculatedStatus {
}
/**
+ *
+ * @param counters counts of resources that are in various states
+ * @param skippable {Boolean} <code>TRUE<code/> if failure of any of the task should not fail the stage
+ * @return {@link HostRoleStatus}
+ */
+ public static HostRoleStatus calculateSummaryStatusFromPartialSet(Map<HostRoleStatus, Integer> counters,
+ boolean skippable) {
+
+ HostRoleStatus status = HostRoleStatus.PENDING;
+ // By definition, any tasks in a future stage must be held in a PENDING status.
+ if (counters.get(HostRoleStatus.HOLDING) > 0 || counters.get(HostRoleStatus.HOLDING_FAILED) > 0 || counters.get(HostRoleStatus.HOLDING_TIMEDOUT) > 0) {
+ status = counters.get(HostRoleStatus.HOLDING) > 0 ? HostRoleStatus.HOLDING :
+ counters.get(HostRoleStatus.HOLDING_FAILED) > 0 ? HostRoleStatus.HOLDING_FAILED :
+ HostRoleStatus.HOLDING_TIMEDOUT;
+ }
+
+ // Because tasks are not skippable, guaranteed to be FAILED
+ if (counters.get(HostRoleStatus.FAILED) > 0 && !skippable) {
+ status = HostRoleStatus.FAILED;
+ }
+
+ // Because tasks are not skippable, guaranteed to be TIMEDOUT
+ if (counters.get(HostRoleStatus.TIMEDOUT) > 0 && !skippable) {
+ status = HostRoleStatus.TIMEDOUT;
+ }
+
+ int inProgressTasks = counters.get(HostRoleStatus.QUEUED) + counters.get(HostRoleStatus.IN_PROGRESS);
+ if (inProgressTasks > 0) {
+ status = HostRoleStatus.IN_PROGRESS;
+ }
+
+ return status;
+ }
+
+
+ /**
+ *
+ * @param hostRoleCommands list of {@link HostRoleCommand} for a stage
+ * @param counters counts of resources that are in various states
+ * @param successFactors Map of roles to their successfactor for a stage
+ * @param skippable {Boolean} <code>TRUE<code/> if failure of any of the task should not fail the stage
+ * @return {@link HostRoleStatus} based on success factor
+ */
+ public static HostRoleStatus calculateStageStatus(List <HostRoleCommand> hostRoleCommands, Map<HostRoleStatus, Integer> counters, Map<Role, Float> successFactors,
+ boolean skippable) {
+
+ // when there are 0 tasks, return COMPLETED
+ int total = hostRoleCommands.size();
+ if (total == 0) {
+ return HostRoleStatus.COMPLETED;
+ }
+
+ if (counters.get(HostRoleStatus.PENDING) == total) {
+ return HostRoleStatus.PENDING;
+ }
+
+ // By definition, any tasks in a future stage must be held in a PENDING status.
+ if (counters.get(HostRoleStatus.HOLDING) > 0 || counters.get(HostRoleStatus.HOLDING_FAILED) > 0 || counters.get(HostRoleStatus.HOLDING_TIMEDOUT) > 0) {
+ return counters.get(HostRoleStatus.HOLDING) > 0 ? HostRoleStatus.HOLDING :
+ counters.get(HostRoleStatus.HOLDING_FAILED) > 0 ? HostRoleStatus.HOLDING_FAILED :
+ HostRoleStatus.HOLDING_TIMEDOUT;
+ }
+
+
+ if (counters.get(HostRoleStatus.FAILED) > 0 && !skippable) {
+ Set<Role> rolesWithFailedTasks = getRolesOfFailedTasks(hostRoleCommands);
+ Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithFailedTasks, successFactors);
+ if (didStageFailed) return HostRoleStatus.FAILED;
+ }
+
+
+ if (counters.get(HostRoleStatus.TIMEDOUT) > 0 && !skippable) {
+ Set<Role> rolesWithTimedOutTasks = getRolesOfTimedOutTasks(hostRoleCommands);
+ Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithTimedOutTasks, successFactors);
+ if (didStageFailed) return HostRoleStatus.TIMEDOUT;
+ }
+
+ int numActiveTasks = counters.get(HostRoleStatus.PENDING) + counters.get(HostRoleStatus.QUEUED) + counters.get(HostRoleStatus.IN_PROGRESS);
+
+ if (numActiveTasks > 0) {
+ return HostRoleStatus.IN_PROGRESS;
+ } else if (counters.get(HostRoleStatus.ABORTED) > 0) {
+ Set<Role> rolesWithTimedOutTasks = getRolesOfAbortedTasks(hostRoleCommands);
+ Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithTimedOutTasks, successFactors);
+ if (didStageFailed) return HostRoleStatus.ABORTED;
+ }
+
+ return HostRoleStatus.COMPLETED;
+ }
+
+ /**
+ * Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#FAILED}
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @return Set of {@link Role}
+ */
+ protected static Set<Role> getRolesOfFailedTasks(List <HostRoleCommand> hostRoleCommands) {
+ return getRolesOfTasks(hostRoleCommands, HostRoleStatus.FAILED);
+ }
+
+ /**
+ * Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#TIMEDOUT}
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @return Set of {@link Role}
+ */
+ protected static Set<Role> getRolesOfTimedOutTasks(List <HostRoleCommand> hostRoleCommands) {
+ return getRolesOfTasks(hostRoleCommands, HostRoleStatus.TIMEDOUT);
+ }
+
+ /**
+ * Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#ABORTED}
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @return Set of {@link Role}
+ */
+ protected static Set<Role> getRolesOfAbortedTasks(List <HostRoleCommand> hostRoleCommands) {
+ return getRolesOfTasks(hostRoleCommands, HostRoleStatus.ABORTED);
+ }
+
+ /**
+ * Get all {@link Role} any of whose tasks are in given {@code status}
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @param status {@link HostRoleStatus}
+ * @return Set of {@link Role}
+ */
+ protected static Set<Role> getRolesOfTasks(List <HostRoleCommand> hostRoleCommands, final HostRoleStatus status) {
+
+ Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+ @Override
+ public boolean apply(HostRoleCommand hrc) {
+ return hrc.getStatus() == status;
+ }
+ };
+
+ Function<HostRoleCommand, Role> transform = new Function<HostRoleCommand, Role>() {
+ @Override
+ public Role apply(HostRoleCommand hrc) {
+ return hrc.getRole();
+ }
+ };
+ return FluentIterable.from(hostRoleCommands)
+ .filter(predicate)
+ .transform(transform)
+ .toSet();
+ }
+
+ /**
+ *
+ * @param hostRoleCommands list of {@link HostRoleCommand} for a stage
+ * @param roles set of roles to be checked for meeting success criteria
+ * @param successFactors map of role to it's success factor
+ * @return {Boolean} <code>TRUE</code> if stage failed due to hostRoleCommands of any role not meeting success criteria
+ */
+ protected static Boolean didStageFailed(List<HostRoleCommand> hostRoleCommands, Set<Role> roles, Map<Role, Float> successFactors) {
+ Boolean isFailed = Boolean.FALSE;
+ for (Role role: roles) {
+ List <HostRoleCommand> hostRoleCommandsOfRole = getHostRoleCommandsOfRole(hostRoleCommands, role);
+ List <HostRoleCommand> failedHostRoleCommands = getFailedHostRoleCommands(hostRoleCommandsOfRole);
+ float successRatioForRole = (hostRoleCommandsOfRole.size() - failedHostRoleCommands.size())/hostRoleCommandsOfRole.size();
+ Float successFactorForRole = successFactors.get(role) == null ? 1.0f : successFactors.get(role);
+ if (successRatioForRole < successFactorForRole) {
+ isFailed = Boolean.TRUE;
+ break;
+ }
+ }
+ return isFailed;
+ }
+
+ /**
+ *
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @param role {@link Role}
+ * @return list of {@link HostRoleCommand} that belongs to {@link Role}
+ */
+ protected static List<HostRoleCommand> getHostRoleCommandsOfRole(List <HostRoleCommand> hostRoleCommands, final Role role) {
+ Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+ @Override
+ public boolean apply(HostRoleCommand hrc) {
+ return hrc.getRole() == role;
+ }
+ };
+ return FluentIterable.from(hostRoleCommands)
+ .filter(predicate)
+ .toList();
+ }
+
+ /**
+ *
+ * @param hostRoleCommands list of {@link HostRoleCommand}
+ * @return list of {@link HostRoleCommand} with failed status
+ */
+ protected static List<HostRoleCommand> getFailedHostRoleCommands(List <HostRoleCommand> hostRoleCommands) {
+ Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() {
+ @Override
+ public boolean apply(HostRoleCommand hrc) {
+ return hrc.getStatus().isFailedAndNotSkippableState();
+ }
+ };
+ return FluentIterable.from(hostRoleCommands)
+ .filter(predicate)
+ .toList();
+ }
+
+
+ /**
+ * Calculate overall status from collection of statuses
+ * @param hostRoleStatuses list of all stage's {@link HostRoleStatus}
+ * @return overall status of a request
+ */
+ public static HostRoleStatus getOverallStatusForRequest (Collection<HostRoleStatus> hostRoleStatuses) {
+ Map<HostRoleStatus, Integer> statusCount = calculateStatusCounts(hostRoleStatuses);
+ return calculateSummaryStatus(statusCount, hostRoleStatuses.size(), false);
+ }
+
+ /**
+ * Calculate overall display status from collection of statuses
+ * @param hostRoleStatuses list of all stage's {@link HostRoleStatus}
+ * @return overall display status of a request
+ */
+ public static HostRoleStatus getOverallDisplayStatusForRequest (Collection<HostRoleStatus> hostRoleStatuses) {
+ Map<HostRoleStatus, Integer> statusCount = calculateStatusCounts(hostRoleStatuses);
+ return calculateSummaryDisplayStatus(statusCount, hostRoleStatuses.size(), false);
+ }
+
+
+ /**
* Calculate overall status of an upgrade.
*
* @param counters counts of resources that are in various states
@@ -444,7 +796,7 @@ public class CalculatedStatus {
*/
protected static HostRoleStatus calculateSummaryStatusOfUpgrade(
Map<HostRoleStatus, Integer> counters, int total) {
- return calculateSummaryStatusOfStage(counters, total, false);
+ return calculateSummaryStatus(counters, total, false);
}
/**
@@ -456,10 +808,28 @@ public class CalculatedStatus {
*
* @return summary request status based on statuses of tasks in different states.
*/
- protected static HostRoleStatus calculateSummaryDisplayStatus(
+ public static HostRoleStatus calculateSummaryDisplayStatus(
Map<HostRoleStatus, Integer> counters, int total, boolean skippable) {
- return counters.get(HostRoleStatus.SKIPPED_FAILED) > 0 ? HostRoleStatus.SKIPPED_FAILED :
- counters.get(HostRoleStatus.FAILED) > 0 ? HostRoleStatus.FAILED:
- calculateSummaryStatusOfStage(counters, total, skippable);
+ return counters.get(HostRoleStatus.FAILED) > 0 ? HostRoleStatus.FAILED:
+ counters.get(HostRoleStatus.TIMEDOUT) > 0 ? HostRoleStatus.TIMEDOUT:
+ counters.get(HostRoleStatus.SKIPPED_FAILED) > 0 ? HostRoleStatus.SKIPPED_FAILED :
+ calculateSummaryStatus(counters, total, skippable);
+ }
+
+ /**
+ * kind of {@link HostRoleStatus} persisted by {@link Stage} and {@link Request}
+ */
+ public enum StatusType {
+ STATUS("status"),
+ DISPLAY_STATUS("display_status");
+ private String value;
+
+ StatusType(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
new file mode 100644
index 0000000..9d73122
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
+
+/**
+ * The {@link TaskCreateEvent} is to be fired every time
+ * when any request is to be tracked as running requests in
+ * {@link TaskStatusListener}
+ * This usually happens when new request is created by user action or
+ * when ambari-server starts with some stages in non-completed state
+ */
+public class TaskCreateEvent extends TaskEvent {
+
+
+ /**
+ * Constructor.
+ *
+ * @param hostRoleCommandList
+ * all hostRoleCommands for all requests
+ */
+ public TaskCreateEvent(List<HostRoleCommand> hostRoleCommandList) {
+ super(hostRoleCommandList);
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
new file mode 100644
index 0000000..ca351d7
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * {@link TaskEvent} is the base for all events related to create/update of tasks
+ * that might result in update of stage/request status
+ */
+public class TaskEvent {
+ /**
+ * List of {@link HostRoleCommand}
+ */
+ private List<HostRoleCommand> hostRoleCommands;
+
+ /**
+ * Constructor.
+ *
+ * @param hostRoleCommands
+ * list of HRCs which have been reported back by the agents.
+ */
+ public TaskEvent(List<HostRoleCommand> hostRoleCommands) {
+ this.hostRoleCommands = hostRoleCommands;
+ }
+
+ /**
+ * Gets hostRoleCommands that created event
+ * @return List of {@link HostRoleCommand}
+ */
+ public List<HostRoleCommand> getHostRoleCommands() {
+ return hostRoleCommands;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ String hostRoleCommands = StringUtils.join(this.hostRoleCommands, ", ");
+ StringBuilder buffer = new StringBuilder("TaskEvent{");
+ buffer.append("hostRoleCommands=").append(hostRoleCommands);
+ buffer.append("}");
+ return buffer.toString();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
new file mode 100644
index 0000000..84f67f5
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.List;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+
+/**
+ * The {@link TaskUpdateEvent} is to be fired every time
+ * when host role commands are merged to the database
+ */
+public class TaskUpdateEvent extends TaskEvent{
+
+ public TaskUpdateEvent(List<HostRoleCommand> hostRoleCommandList) {
+ super(hostRoleCommandList);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
new file mode 100644
index 0000000..bc146ef
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -0,0 +1,609 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.listeners.tasks;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.controller.internal.CalculatedStatus;
+import org.apache.ambari.server.events.TaskCreateEvent;
+import org.apache.ambari.server.events.TaskUpdateEvent;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.dao.StageDAO;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * The {@link TaskStatusListener} is used to constantly update status of running Stages and Requests
+ * {@link TaskUpdateEvent} listens for all incoming events. These events are fired when either host role commands are created/updated
+ * This listener maintains map of all running tasks, stages and requests
+ */
+@Singleton
+@EagerSingleton
+public class TaskStatusListener {
+ /**
+ * Logger.
+ */
+ private final static Logger LOG = LoggerFactory.getLogger(TaskStatusListener.class);
+
+ /**
+ * Maps task id to its {@link HostRoleCommand} Object.
+ * Map has entries of all tasks of all active(ongoing) requests
+ * NOTE: Partial loading of tasks for any request may lead to incorrect update of the request status
+ */
+ private Map<Long,HostRoleCommand> activeTasksMap = new ConcurrentHashMap<>();
+
+ /**
+ * Maps all ongoing request id to its {@link ActiveRequest}
+ */
+ private Map<Long, ActiveRequest> activeRequestMap = new ConcurrentHashMap<>();
+
+ /**
+ * Maps {@link StageEntityPK} of all ongoing requests to its {@link ActiveStage}
+ * with updated {@link ActiveStage#status} and {@link ActiveStage#displayStatus}.
+ */
+ private Map<StageEntityPK, ActiveStage> activeStageMap = new ConcurrentHashMap<>();
+
+ private StageDAO stageDAO;
+
+ private RequestDAO requestDAO;
+
+
+ @Inject
+ public TaskStatusListener(TaskEventPublisher taskEventPublisher, StageDAO stageDAO, RequestDAO requestDAO) {
+ this.stageDAO = stageDAO;
+ this.requestDAO = requestDAO;
+ taskEventPublisher.register(this);
+ }
+
+ public Map<Long,HostRoleCommand> getActiveTasksMap() {
+ return activeTasksMap;
+ }
+
+ public Map<Long, ActiveRequest> getActiveRequestMap() {
+ return activeRequestMap;
+ }
+
+ public Map<StageEntityPK, ActiveStage> getActiveStageMap() {
+ return activeStageMap;
+ }
+
+ /**
+ * On receiving task update event, update related entries of the running request, stage and task in the maps
+ * Event containing newly created tasks is expected to contain complete set of all tasks for a request
+ * @param event Consumes {@link TaskUpdateEvent}.
+ */
+ @Subscribe
+ public void onTaskUpdateEvent(TaskUpdateEvent event) {
+ LOG.debug("Received task update event {}", event);
+ List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
+ List<HostRoleCommand> hostRoleCommandWithReceivedStatus = new ArrayList<>();
+ Set<StageEntityPK> stagesWithReceivedTaskStatus = new HashSet<>();
+ Set<Long> requestIdsWithReceivedTaskStatus = new HashSet<>();
+ for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) {
+ Long reportedTaskId = hostRoleCommand.getTaskId();
+ HostRoleCommand activeTask = activeTasksMap.get(reportedTaskId);
+ if (activeTask == null) {
+ LOG.error(String.format("Received update for a task %d which is not being tracked as running task", reportedTaskId));
+ } else {
+ hostRoleCommandWithReceivedStatus.add(hostRoleCommand);
+ StageEntityPK stageEntityPK = new StageEntityPK();
+ stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+ stageEntityPK.setStageId(hostRoleCommand.getStageId());
+ stagesWithReceivedTaskStatus.add(stageEntityPK);
+ requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
+ }
+ }
+
+ updateActiveTasksMap(hostRoleCommandWithReceivedStatus);
+ Boolean didAnyStageStatusUpdated = updateActiveStagesStatus(stagesWithReceivedTaskStatus, hostRoleCommandListAll);
+ // Presumption: If there is no update in any of the running stage's status
+ // then none of the running request status needs to be updated
+ if (didAnyStageStatusUpdated) {
+ updateActiveRequestsStatus(requestIdsWithReceivedTaskStatus, stagesWithReceivedTaskStatus);
+ }
+
+ }
+
+ /**
+ * On receiving task create event, create entries in the running request, stage and task in the maps
+ * @param event Consumes {@link TaskCreateEvent}.
+ */
+ @Subscribe
+ public void onTaskCreateEvent(TaskCreateEvent event) {
+ LOG.debug("Received task create event {}", event);
+ List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
+
+ for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) {
+ activeTasksMap.put(hostRoleCommand.getTaskId(), hostRoleCommand);
+ addStagePK(hostRoleCommand);
+ addRequestId(hostRoleCommand);
+ }
+ }
+
+
+ /**
+ * update changed host role command status
+ * @param hostRoleCommandWithReceivedStatus list of host role commands reported
+ */
+ private void updateActiveTasksMap(List<HostRoleCommand> hostRoleCommandWithReceivedStatus) {
+ for (HostRoleCommand hostRoleCommand : hostRoleCommandWithReceivedStatus) {
+ Long taskId = hostRoleCommand.getTaskId();
+ activeTasksMap.put(taskId , hostRoleCommand);
+ }
+ }
+
+
+ /**
+ * Adds new {@link StageEntityPK} to be tracked as running stage in {@link #activeStageMap}
+ * @param hostRoleCommand newly created {@link HostRoleCommand} in {@link #activeTasksMap}
+ */
+ private void addStagePK(HostRoleCommand hostRoleCommand) {
+ StageEntityPK stageEntityPK = new StageEntityPK();
+ stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+ stageEntityPK.setStageId(hostRoleCommand.getStageId());
+ if (activeStageMap.containsKey(stageEntityPK)) {
+ activeStageMap.get(stageEntityPK).addTaskId(hostRoleCommand.getTaskId());
+ } else {
+ StageEntity stageEntity = stageDAO.findByPK(stageEntityPK);
+ // Stage entity of the hostrolecommand should be persisted before publishing task create event
+ assert stageEntity != null;
+ Map<Role, Float> successFactors = new HashMap<>();
+ Collection<RoleSuccessCriteriaEntity> roleSuccessCriteriaEntities = stageEntity.getRoleSuccessCriterias();
+ for (RoleSuccessCriteriaEntity successCriteriaEntity : roleSuccessCriteriaEntities) {
+ successFactors.put(successCriteriaEntity.getRole(), successCriteriaEntity.getSuccessFactor().floatValue());
+ }
+ Set<Long> taskIdSet = Sets.newHashSet(hostRoleCommand.getTaskId());
+
+ ActiveStage reportedStage = new ActiveStage(stageEntity.getStatus(), stageEntity.getDisplayStatus(),
+ successFactors, stageEntity.isSkippable(), taskIdSet);
+ activeStageMap.put(stageEntityPK, reportedStage);
+ }
+ }
+
+ /**
+ * update and persist all changed stage status
+ * @param stagesWithReceivedTaskStatus set of stages that has received task status
+ * @param hostRoleCommandListAll list of all task updates received from agent
+ * @return <code>true</code> if any of the stage has changed it's existing status;
+ * <code>false</code> otherwise
+ */
+ private Boolean updateActiveStagesStatus(final Set<StageEntityPK> stagesWithReceivedTaskStatus, List<HostRoleCommand> hostRoleCommandListAll) {
+ Boolean didAnyStageStatusUpdated = Boolean.FALSE;
+ for (StageEntityPK reportedStagePK : stagesWithReceivedTaskStatus) {
+ if (activeStageMap.containsKey(reportedStagePK)) {
+ Boolean didStatusChange = updateStageStatus(reportedStagePK, hostRoleCommandListAll);
+ if (didStatusChange) {
+ ActiveStage reportedStage = activeStageMap.get(reportedStagePK);
+ stageDAO.updateStatus(reportedStagePK, reportedStage.getStatus(), reportedStage.getDisplayStatus());
+ didAnyStageStatusUpdated = Boolean.TRUE;
+ }
+ } else {
+ LOG.error(String.format("Received update for a task whose stage is not being tracked as running stage: %s", reportedStagePK.toString()));
+ }
+
+ }
+ return didAnyStageStatusUpdated;
+ }
+
+ /**
+ * Adds new request id to be tracked as running request in {@link #activeRequestMap}
+ * @param hostRoleCommand newly created {@link HostRoleCommand} in {@link #activeTasksMap}
+ */
+ private void addRequestId(HostRoleCommand hostRoleCommand) {
+ Long requestId = hostRoleCommand.getRequestId();
+ StageEntityPK stageEntityPK = new StageEntityPK();
+ stageEntityPK.setRequestId(hostRoleCommand.getRequestId());
+ stageEntityPK.setStageId(hostRoleCommand.getStageId());
+ if (activeRequestMap.containsKey(requestId)) {
+ activeRequestMap.get(requestId).addStageEntityPK(stageEntityPK);
+ } else {
+ RequestEntity requestEntity = requestDAO.findByPK(requestId);
+ // Request entity of the hostrolecommand should be persisted before publishing task create event
+ assert requestEntity != null;
+ Set<StageEntityPK> stageEntityPKs = Sets.newHashSet(stageEntityPK);
+ ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(), stageEntityPKs);
+ activeRequestMap.put(requestId, request);
+ }
+ }
+
+
+ /**
+ * update and persist changed request status
+ * @param requestIdsWithReceivedTaskStatus set of request ids that has received tasks status
+ * @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status
+ */
+ private void updateActiveRequestsStatus(final Set<Long> requestIdsWithReceivedTaskStatus, Set<StageEntityPK> stagesWithChangedTaskStatus) {
+ for (Long reportedRequestId : requestIdsWithReceivedTaskStatus) {
+ if (activeRequestMap.containsKey(reportedRequestId)) {
+ ActiveRequest request = activeRequestMap.get(reportedRequestId);
+ Boolean didStatusChange = updateRequestStatus(reportedRequestId, stagesWithChangedTaskStatus);
+ if (didStatusChange) {
+ requestDAO.updateStatus(reportedRequestId, request.getStatus(), request.getDisplayStatus());
+ }
+ if (request.isCompleted() && isAllTasksCompleted(reportedRequestId)) {
+ // Request is considered ton have been finished if request status and all of it's tasks status are completed
+ // in that case, request and it's stages
+ // and tasks should no longer be tracked as active(running)
+ removeRequestStageAndTasks(reportedRequestId);
+ }
+ } else {
+ LOG.error(String.format("Received update for a task whose request %d is not being tracked as running request", reportedRequestId));
+ }
+
+ }
+ }
+
+ /**
+ *
+ * @param requestId request Id
+ * @return <code>false</code> if any of the task belonging to requestId has incomplete status
+ * <code>true</code> otherwise
+ */
+ private Boolean isAllTasksCompleted(Long requestId) {
+ Boolean result = Boolean.TRUE;
+ for (Map.Entry<Long, HostRoleCommand> entry : activeTasksMap.entrySet()) {
+ if (entry.getValue().getRequestId() == requestId && !entry.getValue().getStatus().isCompletedState()) {
+ result = Boolean.FALSE;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Removes entries from {@link #activeTasksMap},{@link #activeStageMap} and {@link #activeRequestMap}
+ * @param requestId request id whose entry and it's stage and task entries is to be removed
+ */
+ private void removeRequestStageAndTasks(Long requestId) {
+ removeTasks(requestId);
+ removeStages(requestId);
+ removeRequest(requestId);
+ }
+
+
+ /**
+ * Filters list of {@link Stage} to list of {@link StageEntityPK}
+ * @param requestID requestId
+ * @return list of StageEntityPK
+ */
+ private List<StageEntityPK> getAllStageEntityPKForRequest(final Long requestID) {
+ Predicate<StageEntityPK> predicate = new Predicate<StageEntityPK>() {
+ @Override
+ public boolean apply(StageEntityPK stageEntityPK) {
+ return stageEntityPK.getRequestId().equals(requestID);
+ }
+ };
+ return FluentIterable.from(activeStageMap.keySet())
+ .filter(predicate)
+ .toList();
+ }
+
+
+
+ /**
+ * Returns the computed status of the stage from the status of it's host role commands
+ * @param stagePK {@link StageEntityPK} primary key for the stage entity
+ * @param hostRoleCommandListAll list of all hrc received whose status has been received from agent
+ * @return {@link Boolean} <code>TRUE</code> if status of the given stage changed.
+ */
+ private Boolean updateStageStatus(final StageEntityPK stagePK, List<HostRoleCommand> hostRoleCommandListAll) {
+ Boolean didAnyStatusChanged = Boolean.FALSE;
+ ActiveStage reportedStage = activeStageMap.get(stagePK);
+ HostRoleStatus stageCurrentStatus = reportedStage.getStatus();
+ HostRoleStatus stageCurrentDisplayStatus = reportedStage.getDisplayStatus();
+
+
+ // if stage is already marked to be completed then do not calculate reported status from host role commands
+ // Presumption: There will be no status transition of the host role command from one completed state to another
+ if (!stageCurrentDisplayStatus.isCompletedState() || !stageCurrentStatus.isCompletedState()) {
+ Map<HostRoleStatus, Integer> receivedTaskStatusCount = CalculatedStatus.calculateStatusCountsForTasks(hostRoleCommandListAll, stagePK);
+ HostRoleStatus statusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(receivedTaskStatusCount, reportedStage.getSkippable());
+ HostRoleStatus displayStatusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(receivedTaskStatusCount, Boolean.FALSE);
+ if (statusFromPartialSet == HostRoleStatus.PENDING || displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+ Function<Long,HostRoleCommand> transform = new Function<Long,HostRoleCommand>(){
+ @Override
+ public HostRoleCommand apply(Long taskId) {
+ return activeTasksMap.get(taskId);
+ }
+ };
+
+ List<HostRoleCommand> activeHostRoleCommandsOfStage = FluentIterable.from(reportedStage.getTaskIds())
+ .transform(transform).toList();
+ Map<HostRoleStatus, Integer> statusCount = CalculatedStatus.calculateStatusCountsForTasks(activeHostRoleCommandsOfStage);
+ if (displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+ // calculate and get new display status of the stage as per the new status of received host role commands
+ HostRoleStatus display_status = CalculatedStatus.calculateSummaryDisplayStatus(statusCount, activeHostRoleCommandsOfStage.size(), reportedStage.getSkippable());
+ if (display_status != stageCurrentDisplayStatus) {
+ reportedStage.setDisplayStatus(display_status);
+ didAnyStatusChanged = Boolean.TRUE;
+ }
+
+ } else {
+ reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+ didAnyStatusChanged = Boolean.TRUE;
+ }
+
+ if (statusFromPartialSet == HostRoleStatus.PENDING) {
+ // calculate status of the stage as per the new status of received host role commands
+ HostRoleStatus status = CalculatedStatus.calculateStageStatus(activeHostRoleCommandsOfStage, statusCount, reportedStage.getSuccessFactors(), reportedStage.getSkippable());
+ if (status != stageCurrentStatus) {
+ reportedStage.setStatus(status);
+ didAnyStatusChanged = Boolean.TRUE;
+ }
+ } else {
+ reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+ didAnyStatusChanged = Boolean.TRUE;
+ }
+ } else {
+ reportedStage.setStatus(statusFromPartialSet);
+ reportedStage.setDisplayStatus(displayStatusFromPartialSet);
+ didAnyStatusChanged = Boolean.TRUE;
+ }
+ }
+
+ return didAnyStatusChanged;
+ }
+
+ /**
+ *
+ * @param requestId {@link Request} whose status is to be updated
+ * @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status
+ * @return {Boolean} <code>TRUE</code> if request status has changed from existing
+ */
+ private Boolean updateRequestStatus (final Long requestId, Set<StageEntityPK> stagesWithChangedTaskStatus) {
+ Boolean didStatusChanged = Boolean.FALSE;
+ ActiveRequest request = activeRequestMap.get(requestId);
+ HostRoleStatus requestCurrentStatus = request.getStatus();
+ HostRoleStatus requestCurrentDisplayStatus = request.getDisplayStatus();
+
+ if (!requestCurrentDisplayStatus.isCompletedState() || !requestCurrentStatus.isCompletedState()) {
+ List <ActiveStage> activeStagesWithChangesTaskStatus = new ArrayList<>();
+ for (StageEntityPK stageEntityPK:stagesWithChangedTaskStatus) {
+ if (requestId.equals(stageEntityPK.getRequestId())) {
+ ActiveStage activeStage = activeStageMap.get(stageEntityPK);
+ activeStagesWithChangesTaskStatus.add(activeStage);
+ }
+ }
+
+
+ Map<CalculatedStatus.StatusType,Map<HostRoleStatus, Integer>> stageStatusCountFromPartialSet = CalculatedStatus.calculateStatusCountsForStage(activeStagesWithChangesTaskStatus);
+ HostRoleStatus statusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(stageStatusCountFromPartialSet.get(CalculatedStatus.StatusType.STATUS), Boolean.FALSE);
+ HostRoleStatus displayStatusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(stageStatusCountFromPartialSet.get(CalculatedStatus.StatusType.DISPLAY_STATUS), Boolean.FALSE);
+
+ if (statusFromPartialSet == HostRoleStatus.PENDING || displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+ List <ActiveStage> allActiveStages = new ArrayList<>();
+ for (StageEntityPK stageEntityPK:request.getStageEntityPks()) {
+ ActiveStage activeStage = activeStageMap.get(stageEntityPK);
+ allActiveStages.add(activeStage);
+ }
+ Map<CalculatedStatus.StatusType,Map<HostRoleStatus, Integer>> stageStatusCount = CalculatedStatus.calculateStatusCountsForStage(allActiveStages);
+
+ if (displayStatusFromPartialSet == HostRoleStatus.PENDING) {
+ // calculate and get new display status of the stage as per the new status of received host role commands
+
+ HostRoleStatus display_status = CalculatedStatus.calculateSummaryDisplayStatus(stageStatusCount.get(CalculatedStatus.StatusType.DISPLAY_STATUS), allActiveStages.size(), false);
+ if (display_status != requestCurrentDisplayStatus) {
+ request.setDisplayStatus(display_status);
+ didStatusChanged = Boolean.TRUE;
+ }
+
+ } else {
+ request.setDisplayStatus(displayStatusFromPartialSet);
+ didStatusChanged = Boolean.TRUE;
+ }
+
+ if (statusFromPartialSet == HostRoleStatus.PENDING) {
+ // calculate status of the stage as per the new status of received host role commands
+ HostRoleStatus status = CalculatedStatus.calculateSummaryStatus(stageStatusCount.get(CalculatedStatus.StatusType.STATUS), allActiveStages.size(), false);
+ if (status != requestCurrentStatus) {
+ request.setStatus(status);
+ didStatusChanged = Boolean.TRUE;
+ }
+ } else {
+ request.setDisplayStatus(displayStatusFromPartialSet);
+ didStatusChanged = Boolean.TRUE;
+ }
+ } else {
+ request.setStatus(statusFromPartialSet);
+ request.setDisplayStatus(displayStatusFromPartialSet);
+ didStatusChanged = Boolean.TRUE;
+ }
+ }
+
+ return didStatusChanged;
+ }
+
+
+ /**
+ * Removes list of {@link HostRoleCommand} entries from {@link #activeTasksMap}
+ * @param requestId request id
+ */
+ private void removeTasks(Long requestId) {
+ Iterator<Map.Entry<Long, HostRoleCommand>> iter = activeTasksMap.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Long, HostRoleCommand> entry = iter.next();
+ HostRoleCommand hrc = entry.getValue();
+ if (hrc.getRequestId() == requestId) {
+ if (!hrc.getStatus().isCompletedState()) {
+ LOG.error(String.format("Task %d should have been completed before being removed from running task cache(activeTasksMap)", hrc.getTaskId()));
+ }
+ iter.remove();
+ }
+ }
+ }
+
+
+ /**
+ * Removes list of {@link StageEntityPK} entries from {@link #activeStageMap}
+ * @param requestId request Id
+ */
+ private void removeStages(Long requestId) {
+ List <StageEntityPK> stageEntityPKs = getAllStageEntityPKForRequest(requestId);
+ for (StageEntityPK stageEntityPK: stageEntityPKs) {
+ activeStageMap.remove(stageEntityPK);
+ }
+ }
+
+
+ /**
+ * Removes request id from {@link #activeRequestMap}
+ * @param requestId request Id
+ */
+ private void removeRequest(Long requestId) {
+ activeRequestMap.remove(requestId);
+ }
+
+
+ /**
+ * This class stores {@link Request#status} and {@link Request#displayStatus} information
+ * This information is cached for all running {@link Request} at {@link #activeRequestMap}
+ */
+ protected class ActiveRequest {
+ private HostRoleStatus status;
+ private HostRoleStatus displayStatus;
+ private Set <StageEntityPK> stageEntityPks;
+
+ public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks) {
+ this.status = status;
+ this.displayStatus = displayStatus;
+ this.stageEntityPks = stageEntityPks;
+ }
+
+ public HostRoleStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(HostRoleStatus status) {
+ this.status = status;
+ }
+
+ public HostRoleStatus getDisplayStatus() {
+ return displayStatus;
+ }
+
+ public void setDisplayStatus(HostRoleStatus displayStatus) {
+ this.displayStatus = displayStatus;
+ }
+
+ public Boolean isCompleted() {
+ return status.isCompletedState() && displayStatus.isCompletedState();
+ }
+
+ public Set <StageEntityPK> getStageEntityPks() {
+ return stageEntityPks;
+ }
+
+ public void addStageEntityPK(StageEntityPK stageEntityPK) {
+ stageEntityPks.add(stageEntityPK);
+ }
+
+ }
+
+ /**
+ * This class stores information needed to determine {@link Stage#status} and {@link Stage#displayStatus}
+ * This information is cached for all {@link Stage} of all running {@link Request} at {@link #activeStageMap}
+ */
+ public class ActiveStage {
+ private HostRoleStatus status;
+ private HostRoleStatus displayStatus;
+ private Boolean skippable;
+ private Set <Long> taskIds;
+
+ //Map of roles to successFactors for this stage. Default is 1 i.e. 100%
+ private Map<Role, Float> successFactors = new HashMap<Role, Float>();
+
+ public ActiveStage(HostRoleStatus status, HostRoleStatus displayStatus,
+ Map<Role, Float> successFactors, Boolean skippable, Set<Long> taskIds) {
+ this.status = status;
+ this.displayStatus = displayStatus;
+ this.successFactors = successFactors;
+ this.skippable = skippable;
+ this.taskIds = taskIds;
+ }
+
+ public HostRoleStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(HostRoleStatus status) {
+ this.status = status;
+ }
+
+ public HostRoleStatus getDisplayStatus() {
+ return displayStatus;
+ }
+
+ public void setDisplayStatus(HostRoleStatus displayStatus) {
+ this.displayStatus = displayStatus;
+ }
+
+ public Boolean getSkippable() {
+ return skippable;
+ }
+
+ public void setSkippable(Boolean skippable) {
+ this.skippable = skippable;
+ }
+
+ public Map<Role, Float> getSuccessFactors() {
+ return successFactors;
+ }
+
+ public void setSuccessFactors(Map<Role, Float> successFactors) {
+ this.successFactors = successFactors;
+ }
+
+ public Set <Long> getTaskIds() {
+ return taskIds;
+ }
+
+ public void addTaskId(Long taskId) {
+ taskIds.add(taskId);
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
new file mode 100644
index 0000000..fdc41e5
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import org.apache.ambari.server.events.TaskEvent;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link TaskEventPublisher} is used to publish instances of
+ * {@link TaskEvent} to any {@link com.google.common.eventbus.Subscribe} interested.
+ * It uses a single-threaded, serial {@link EventBus}.
+ */
+@Singleton
+public class TaskEventPublisher {
+
+ /**
+ * A single threaded, synchronous event bus for processing task events.
+ */
+ private final EventBus m_eventBus = new EventBus("ambari-task-report-event-bus");
+
+
+ /**
+ * Publishes the specified event to all registered listeners that
+ * {@link Subscribe} to {@link TaskEvent} instances.
+ *
+ * @param event {@link TaskEvent}
+ */
+ public void publish(TaskEvent event) {
+ m_eventBus.post(event);
+ }
+
+ /**
+ * Register a listener to receive events. The listener should use the
+ * {@link Subscribe} annotation.
+ *
+ * @param object
+ * the listener to receive events.
+ */
+ public void register(Object object) {
+ m_eventBus.register(object);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 02c4091..e834045 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -40,6 +40,8 @@ import org.apache.ambari.annotations.TransactionalLock;
import org.apache.ambari.annotations.TransactionalLock.LockArea;
import org.apache.ambari.annotations.TransactionalLock.LockType;
import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.api.query.JpaPredicateVisitor;
import org.apache.ambari.server.api.query.JpaSortBuilder;
@@ -49,6 +51,9 @@ import org.apache.ambari.server.controller.spi.Predicate;
import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.SortRequest;
import org.apache.ambari.server.controller.utilities.PredicateHelper;
+import org.apache.ambari.server.events.TaskCreateEvent;
+import org.apache.ambari.server.events.TaskUpdateEvent;
+import org.apache.ambari.server.events.publishers.TaskEventPublisher;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.TransactionalLocks;
import org.apache.ambari.server.orm.entities.HostEntity;
@@ -58,9 +63,11 @@ import org.apache.ambari.server.orm.entities.StageEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Provider;
@@ -144,6 +151,13 @@ public class HostRoleCommandDAO {
@Inject
private Configuration configuration;
+
+ @Inject
+ HostRoleCommandFactory hostRoleCommandFactory;
+
+ @Inject
+ private TaskEventPublisher taskEventPublisher;
+
/**
* Used to ensure that methods which rely on the completion of
* {@link Transactional} can detect when they are able to run.
@@ -629,11 +643,17 @@ public class HostRoleCommandDAO {
@Transactional
@TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
public HostRoleCommandEntity merge(HostRoleCommandEntity entity) {
+ entity = mergeWithoutPublishEvent(entity);
+ publishTaskUpdateEvent(Collections.singletonList(hostRoleCommandFactory.createExisting(entity)));
+ return entity;
+ }
+
+ @Transactional
+ @TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
+ public HostRoleCommandEntity mergeWithoutPublishEvent(HostRoleCommandEntity entity) {
EntityManager entityManager = entityManagerProvider.get();
entity = entityManager.merge(entity);
-
invalidateHostRoleCommandStatusSummaryCache(entity);
-
return entity;
}
@@ -667,10 +687,51 @@ public class HostRoleCommandDAO {
}
invalidateHostRoleCommandStatusSummaryCache(requestsToInvalidate);
-
+ publishTaskUpdateEvent(getHostRoleCommands(entities));
return managedList;
}
+ /**
+ *
+ * @param entities
+ */
+ public List<HostRoleCommand> getHostRoleCommands(Collection<HostRoleCommandEntity> entities) {
+ Function<HostRoleCommandEntity, HostRoleCommand> transform = new Function<HostRoleCommandEntity, HostRoleCommand> () {
+ @Override
+ public HostRoleCommand apply(HostRoleCommandEntity entity) {
+ return hostRoleCommandFactory.createExisting(entity);
+ }
+ };
+ return FluentIterable.from(entities)
+ .transform(transform)
+ .toList();
+
+ }
+
+ /**
+ *
+ * @param hostRoleCommands
+ */
+ public void publishTaskUpdateEvent(List<HostRoleCommand> hostRoleCommands) {
+ if (!hostRoleCommands.isEmpty()) {
+ TaskUpdateEvent taskUpdateEvent = new TaskUpdateEvent(hostRoleCommands);
+ taskEventPublisher.publish(taskUpdateEvent);
+ }
+ }
+
+ /**
+ *
+ * @param hostRoleCommands
+ */
+ public void publishTaskCreateEvent(List<HostRoleCommand> hostRoleCommands) {
+ if (!hostRoleCommands.isEmpty()) {
+ TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
+ taskEventPublisher.publish(taskCreateEvent);
+ }
+ }
+
+
+
@Transactional
@TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
public void remove(HostRoleCommandEntity entity) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 1c4d0a3..2696f66 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -144,6 +144,14 @@ public class RequestDAO {
}
@Transactional
+ public void updateStatus(long requestId, HostRoleStatus status, HostRoleStatus displayStatus) {
+ RequestEntity requestEntity = findByPK(requestId);
+ requestEntity.setStatus(status);
+ requestEntity.setDisplayStatus(displayStatus);
+ merge(requestEntity);
+ }
+
+ @Transactional
public void create(RequestEntity requestEntity) {
entityManagerProvider.get().persist(requestEntity);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index d2f899f..126468a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -18,6 +18,7 @@
package org.apache.ambari.server.orm.dao;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@@ -173,11 +174,15 @@ public class StageDAO {
return daoUtils.selectList(query);
}
+ /**
+ *
+ * @param statuses {@link HostRoleStatus}
+ * @return list of stage entities
+ */
@RequiresSession
- public List<StageEntity> findByCommandStatuses(
- Collection<HostRoleStatus> statuses) {
+ public List<StageEntity> findByStatuses(Collection<HostRoleStatus> statuses) {
TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(
- "StageEntity.findByCommandStatuses", StageEntity.class);
+ "StageEntity.findByStatuses", StageEntity.class);
query.setParameter("statuses", statuses);
return daoUtils.selectList(query);
@@ -280,8 +285,8 @@ public class StageDAO {
* the stage entity to update
* @param desiredStatus
* the desired stage status
- * @param controller
- * the ambari management controller
+ * @param actionManager
+ * the action manager
*
* @throws java.lang.IllegalArgumentException
* if the transition to the desired status is not a legal transition
@@ -301,9 +306,11 @@ public class StageDAO {
if (desiredStatus == HostRoleStatus.ABORTED) {
actionManager.cancelRequest(stage.getRequestId(), "User aborted.");
} else {
+ List <HostRoleCommandEntity> hrcWithChangedStatus = new ArrayList<HostRoleCommandEntity>();
for (HostRoleCommandEntity hostRoleCommand : tasks) {
HostRoleStatus hostRoleStatus = hostRoleCommand.getStatus();
if (hostRoleStatus.equals(currentStatus)) {
+ hrcWithChangedStatus.add(hostRoleCommand);
hostRoleCommand.setStatus(desiredStatus);
if (desiredStatus == HostRoleStatus.PENDING) {
@@ -316,6 +323,21 @@ public class StageDAO {
}
/**
+ *
+ * @param stageEntityPK {@link StageEntityPK}
+ * @param status {@link HostRoleStatus}
+ * @param displayStatus {@link HostRoleStatus}
+ */
+ @Transactional
+ public void updateStatus(StageEntityPK stageEntityPK, HostRoleStatus status, HostRoleStatus displayStatus) {
+ StageEntity stageEntity = findByPK(stageEntityPK);
+ stageEntity.setStatus(status);
+ stageEntity.setDisplayStatus(displayStatus);
+ merge(stageEntity);
+ }
+
+
+ /**
* Determine whether or not it is valid to transition from this stage status
* to the given status.
*
http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index 74271b9..a809295 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -105,9 +105,9 @@ public class HostRoleCommandEntity {
@Basic
private Integer exitcode = 0;
- @Column(name = "status")
+ @Column(name = "status", nullable = false)
@Enumerated(EnumType.STRING)
- private HostRoleStatus status;
+ private HostRoleStatus status = HostRoleStatus.PENDING;
@Column(name = "std_error")
@Lob