You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2017/04/18 18:26:59 UTC
ambari git commit: AMBARI-18576. When multiple requests are running,
aborting any will incorrectly abort all requests instead the desired
one (alejandro)
Repository: ambari
Updated Branches:
refs/heads/branch-2.4 f4d7a3fdb -> 36226da09
AMBARI-18576. When multiple requests are running, aborting any will incorrectly abort all requests instead the desired one (alejandro)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/36226da0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/36226da0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/36226da0
Branch: refs/heads/branch-2.4
Commit: 36226da099b1e6311595fcc732d0d06681ce105b
Parents: f4d7a3f
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Tue Apr 18 11:30:16 2017 -0700
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Tue Apr 18 11:30:28 2017 -0700
----------------------------------------------------------------------
.../ambari/server/actionmanager/ActionDBAccessor.java | 13 +++++++++++++
.../server/actionmanager/ActionDBAccessorImpl.java | 14 ++++++++++++++
.../ambari/server/actionmanager/ActionScheduler.java | 4 ++--
.../org/apache/ambari/server/orm/dao/StageDAO.java | 10 ++++++++++
.../ambari/server/orm/entities/StageEntity.java | 5 ++++-
.../server/actionmanager/TestActionScheduler.java | 12 ++++++------
6 files changed, 49 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 0e78cbc..8aef70d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -86,6 +86,19 @@ public interface ActionDBAccessor {
public List<Stage> getStagesInProgress();
/**
+ * Returns all the pending stages in a request, including queued and not-queued. A stage is
+ * considered in progress if it is in progress for any host.
+ * <p/>
+ * The results will be sorted by stage ID making this call
+ * expensive in some scenarios. Use {@link #getCommandsInProgressCount()} in
+ * order to determine if there are stages that are in progress before getting
+ * the stages from this method.
+ *
+ * @see HostRoleStatus#IN_PROGRESS_STATUSES
+ */
+ public List<Stage> getStagesInProgressForRequest(Long requestId);
+
+ /**
* Gets the number of commands in progress.
*
* @return the number of commands in progress.
http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index c31ca7e..2c87583 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -270,10 +270,24 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
*/
@Override
@Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
+ public List<Stage> getStagesInProgressForRequest(Long requestId) {
+ List<StageEntity> stageEntities = stageDAO.findByRequestIdAndCommandStatuses(requestId, HostRoleStatus.IN_PROGRESS_STATUSES);
+ return getStagesForEntities(stageEntities);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
public List<Stage> getStagesInProgress() {
List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(
HostRoleStatus.IN_PROGRESS_STATUSES);
+ return getStagesForEntities(stageEntities);
+ }
+ @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
+ private List<Stage> getStagesForEntities(List<StageEntity> stageEntities) {
// experimentally enable parallel stage processing
@Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
boolean useConcurrentStageProcessing = configuration.isExperimentalConcurrentStageProcessingEnabled();
http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index e380ae4..8cbfb1e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -1139,10 +1139,10 @@ class ActionScheduler implements Runnable {
cancelHostRoleCommands(tasksToDequeue, reason);
}
- // abort any stages in progress; don't execute this for all stages since
+ // abort any stages in progress that belong to this request; don't execute this for all stages since
// that could lead to OOM errors on large requests, like those for
// upgrades
- List<Stage> stagesInProgress = db.getStagesInProgress();
+ List<Stage> stagesInProgress = db.getStagesInProgressForRequest(requestId);
for (Stage stageInProgress : stagesInProgress) {
abortOperationsForStage(stageInProgress);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index 8ef4a1b..d2f899f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -164,6 +164,16 @@ public class StageDAO {
}
@RequiresSession
+ public List<StageEntity> findByRequestIdAndCommandStatuses(Long requestId, Collection<HostRoleStatus> statuses) {
+ TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(
+ "StageEntity.findByRequestIdAndCommandStatuses", StageEntity.class);
+
+ query.setParameter("requestId", requestId);
+ query.setParameter("statuses", statuses);
+ return daoUtils.selectList(query);
+ }
+
+ @RequiresSession
public List<StageEntity> findByCommandStatuses(
Collection<HostRoleStatus> statuses) {
TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(
http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index 7659a23..eaea913 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -42,7 +42,10 @@ import javax.persistence.Table;
@NamedQueries({
@NamedQuery(
name = "StageEntity.findByCommandStatuses",
- query = "SELECT stage from StageEntity stage WHERE EXISTS (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER by stage.requestId, stage.stageId"),
+ query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.requestId, stage.stageId"),
+ @NamedQuery(
+ name = "StageEntity.findByRequestIdAndCommandStatuses",
+ query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.requestId = :requestId AND roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.stageId"),
@NamedQuery(
name = "StageEntity.findIdsByRequestId",
query = "SELECT stage.stageId FROM StageEntity stage WHERE stage.requestId = :requestId ORDER BY stage.stageId ASC") })
http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 9f12a94..38a32b4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -2203,10 +2203,9 @@ public class TestActionScheduler {
hosts.put(hostname, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
- long requestId = 1;
-
- // create 3 stages, each with a single task - the first stage will be completed and should not
+ // Create a single request with 3 stages, each with a single task - the first stage will be completed and should not
// be included when cancelling the unfinished tasks of the request
+ long requestId = 1;
final List<Stage> allStages = new ArrayList<Stage>();
final List<Stage> stagesInProgress = new ArrayList<Stage>();
final List<HostRoleCommand> tasksInProgress = new ArrayList<>();
@@ -2218,7 +2217,7 @@ public class TestActionScheduler {
Stage stageWithTask = getStageWithSingleTask(
hostname, "cluster1", Role.SECONDARY_NAMENODE, RoleCommand.START,
- Service.Type.HDFS, secondaryNamenodeCmdTaskId, 1, (int)requestId);
+ Service.Type.HDFS, secondaryNamenodeCmdTaskId, 1, (int) requestId);
// complete the first stage
stageWithTask.getOrderedHostRoleCommands().get(0).setStatus(HostRoleStatus.COMPLETED);
@@ -2226,7 +2225,7 @@ public class TestActionScheduler {
stageWithTask = getStageWithSingleTask(
hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
- Service.Type.HDFS, namenodeCmdTaskId, 2, (int)requestId);
+ Service.Type.HDFS, namenodeCmdTaskId, 2, (int) requestId);
tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands());
stagesInProgress.add(stageWithTask);
@@ -2234,7 +2233,7 @@ public class TestActionScheduler {
stageWithTask = getStageWithSingleTask(
hostname, "cluster1", Role.DATANODE, RoleCommand.START,
- Service.Type.HDFS, datanodeCmdTaskId, 3, (int)requestId);
+ Service.Type.HDFS, datanodeCmdTaskId, 3, (int) requestId);
tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands());
stagesInProgress.add(stageWithTask);
@@ -2270,6 +2269,7 @@ public class TestActionScheduler {
when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
when(db.getStagesInProgress()).thenReturn(stagesInProgress);
+ when(db.getStagesInProgressForRequest(requestId)).thenReturn(stagesInProgress);
when(db.getAllStages(anyLong())).thenReturn(allStages);
List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();