You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2017/04/18 18:26:59 UTC

ambari git commit: AMBARI-18576. When multiple requests are running, aborting any will incorrectly abort all requests instead the desired one (alejandro)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 f4d7a3fdb -> 36226da09


AMBARI-18576. When multiple requests are running, aborting any will incorrectly abort all requests instead the desired one (alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/36226da0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/36226da0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/36226da0

Branch: refs/heads/branch-2.4
Commit: 36226da099b1e6311595fcc732d0d06681ce105b
Parents: f4d7a3f
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Tue Apr 18 11:30:16 2017 -0700
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Tue Apr 18 11:30:28 2017 -0700

----------------------------------------------------------------------
 .../ambari/server/actionmanager/ActionDBAccessor.java | 13 +++++++++++++
 .../server/actionmanager/ActionDBAccessorImpl.java    | 14 ++++++++++++++
 .../ambari/server/actionmanager/ActionScheduler.java  |  4 ++--
 .../org/apache/ambari/server/orm/dao/StageDAO.java    | 10 ++++++++++
 .../ambari/server/orm/entities/StageEntity.java       |  5 ++++-
 .../server/actionmanager/TestActionScheduler.java     | 12 ++++++------
 6 files changed, 49 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 0e78cbc..8aef70d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -86,6 +86,19 @@ public interface ActionDBAccessor {
   public List<Stage> getStagesInProgress();
 
   /**
+   * Returns all the pending stages in a request, including queued and not-queued. A stage is
+   * considered in progress if it is in progress for any host.
+   * <p/>
+   * The results will be sorted by stage ID making this call
+   * expensive in some scenarios. Use {@link #getCommandsInProgressCount()} in
+   * order to determine if there are stages that are in progress before getting
+   * the stages from this method.
+   *
+   * @see HostRoleStatus#IN_PROGRESS_STATUSES
+   */
+  public List<Stage> getStagesInProgressForRequest(Long requestId);
+
+  /**
    * Gets the number of commands in progress.
    *
    * @return the number of commands in progress.

http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index c31ca7e..2c87583 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -270,10 +270,24 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    */
   @Override
   @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
+  public List<Stage> getStagesInProgressForRequest(Long requestId) {
+    List<StageEntity> stageEntities = stageDAO.findByRequestIdAndCommandStatuses(requestId, HostRoleStatus.IN_PROGRESS_STATUSES);
+    return getStagesForEntities(stageEntities);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
   public List<Stage> getStagesInProgress() {
     List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(
       HostRoleStatus.IN_PROGRESS_STATUSES);
+    return getStagesForEntities(stageEntities);
+  }
 
+  @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
+  private List<Stage> getStagesForEntities(List<StageEntity> stageEntities) {
     // experimentally enable parallel stage processing
     @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
     boolean useConcurrentStageProcessing = configuration.isExperimentalConcurrentStageProcessingEnabled();

http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index e380ae4..8cbfb1e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -1139,10 +1139,10 @@ class ActionScheduler implements Runnable {
           cancelHostRoleCommands(tasksToDequeue, reason);
         }
 
-        // abort any stages in progress; don't execute this for all stages since
+        // abort any stages in progress that belong to this request; don't execute this for all stages since
         // that could lead to OOM errors on large requests, like those for
         // upgrades
-        List<Stage> stagesInProgress = db.getStagesInProgress();
+        List<Stage> stagesInProgress = db.getStagesInProgressForRequest(requestId);
         for (Stage stageInProgress : stagesInProgress) {
           abortOperationsForStage(stageInProgress);
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index 8ef4a1b..d2f899f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -164,6 +164,16 @@ public class StageDAO {
   }
 
   @RequiresSession
+  public List<StageEntity> findByRequestIdAndCommandStatuses(Long requestId, Collection<HostRoleStatus> statuses) {
+    TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(
+        "StageEntity.findByRequestIdAndCommandStatuses", StageEntity.class);
+
+    query.setParameter("requestId", requestId);
+    query.setParameter("statuses", statuses);
+    return daoUtils.selectList(query);
+  }
+
+  @RequiresSession
   public List<StageEntity> findByCommandStatuses(
       Collection<HostRoleStatus> statuses) {
     TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(

http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index 7659a23..eaea913 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -42,7 +42,10 @@ import javax.persistence.Table;
 @NamedQueries({
     @NamedQuery(
         name = "StageEntity.findByCommandStatuses",
-        query = "SELECT stage from StageEntity stage WHERE EXISTS (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER by stage.requestId, stage.stageId"),
+        query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.requestId, stage.stageId"),
+    @NamedQuery(
+        name = "StageEntity.findByRequestIdAndCommandStatuses",
+        query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.requestId = :requestId AND roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.stageId"),
     @NamedQuery(
         name = "StageEntity.findIdsByRequestId",
         query = "SELECT stage.stageId FROM StageEntity stage WHERE stage.requestId = :requestId ORDER BY stage.stageId ASC") })

http://git-wip-us.apache.org/repos/asf/ambari/blob/36226da0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 9f12a94..38a32b4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -2203,10 +2203,9 @@ public class TestActionScheduler {
     hosts.put(hostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
-    long requestId = 1;
-
-    // create 3 stages, each with a single task - the first stage will be completed and should not
+    // Create a single request with 3 stages, each with a single task - the first stage will be completed and should not
     // be included when cancelling the unfinished tasks of the request
+    long requestId = 1;
     final List<Stage> allStages = new ArrayList<Stage>();
     final List<Stage> stagesInProgress = new ArrayList<Stage>();
     final List<HostRoleCommand> tasksInProgress = new ArrayList<>();
@@ -2218,7 +2217,7 @@ public class TestActionScheduler {
 
     Stage stageWithTask = getStageWithSingleTask(
         hostname, "cluster1", Role.SECONDARY_NAMENODE, RoleCommand.START,
-        Service.Type.HDFS, secondaryNamenodeCmdTaskId, 1, (int)requestId);
+        Service.Type.HDFS, secondaryNamenodeCmdTaskId, 1, (int) requestId);
 
     // complete the first stage
     stageWithTask.getOrderedHostRoleCommands().get(0).setStatus(HostRoleStatus.COMPLETED);
@@ -2226,7 +2225,7 @@ public class TestActionScheduler {
 
     stageWithTask = getStageWithSingleTask(
         hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
-        Service.Type.HDFS, namenodeCmdTaskId, 2, (int)requestId);
+        Service.Type.HDFS, namenodeCmdTaskId, 2, (int) requestId);
 
     tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands());
     stagesInProgress.add(stageWithTask);
@@ -2234,7 +2233,7 @@ public class TestActionScheduler {
 
     stageWithTask = getStageWithSingleTask(
         hostname, "cluster1", Role.DATANODE, RoleCommand.START,
-        Service.Type.HDFS, datanodeCmdTaskId, 3, (int)requestId);
+        Service.Type.HDFS, datanodeCmdTaskId, 3, (int) requestId);
 
     tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands());
     stagesInProgress.add(stageWithTask);
@@ -2270,6 +2269,7 @@ public class TestActionScheduler {
 
     when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
     when(db.getStagesInProgress()).thenReturn(stagesInProgress);
+    when(db.getStagesInProgressForRequest(requestId)).thenReturn(stagesInProgress);
     when(db.getAllStages(anyLong())).thenReturn(allStages);
 
     List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();