You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/04/01 07:07:46 UTC
[02/17] ambari git commit: AMBARI-20646 - Large Long Running Requests
Can Slow Down the ActionScheduler (jonathanhurley)
AMBARI-20646 - Large Long Running Requests Can Slow Down the ActionScheduler (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/aba473e8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/aba473e8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/aba473e8
Branch: refs/heads/branch-3.0-perf
Commit: aba473e84a5a24d12b29a2bf9e858019c023f6fd
Parents: 805af82
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Fri Mar 31 12:35:25 2017 -0400
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Sat Apr 1 10:07:36 2017 +0300
----------------------------------------------------------------------
.../server/actionmanager/ActionDBAccessor.java | 16 ++-
.../actionmanager/ActionDBAccessorImpl.java | 13 +-
.../server/actionmanager/ActionScheduler.java | 2 +-
.../apache/ambari/server/orm/dao/StageDAO.java | 68 ++++-----
.../ambari/server/orm/entities/StageEntity.java | 9 +-
.../serveraction/ServerActionExecutor.java | 114 +++++++--------
.../actionmanager/TestActionDBAccessorImpl.java | 27 ++--
.../actionmanager/TestActionScheduler.java | 139 +++++++++++--------
.../ambari/server/orm/dao/RequestDAOTest.java | 21 ++-
.../serveraction/ServerActionExecutorTest.java | 2 +-
.../services/RetryUpgradeActionServiceTest.java | 12 +-
11 files changed, 227 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/aba473e8/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 9325d03..b0550c0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -76,17 +76,19 @@ public interface ActionDBAccessor {
boolean skipSupported, boolean hostUnknownState);
/**
- * Returns all the pending stages, including queued and not-queued. A stage is
- * considered in progress if it is in progress for any host.
+ * Returns the next stage which is in-progress for every in-progress request
+ * in the system. Since stages are always synchronous, there is no reason to
+ * return more than the most recent stage per request. Returning every single
+ * stage in the requesrt would be extremely inffecient and wasteful. However,
+ * since requests can run in parallel, this method must return the most recent
+ * stage for every request. The results will be sorted by request ID.
* <p/>
- * The results will be sorted by request ID and then stage ID making this call
- * expensive in some scenarios. Use {@link #getCommandsInProgressCount()} in
- * order to determine if there are stages that are in progress before getting
- * the stages from this method.
+ * Use {@link #getCommandsInProgressCount()} in order to determine if there
+ * are stages that are in progress before getting the stages from this method.
*
* @see HostRoleStatus#IN_PROGRESS_STATUSES
*/
- public List<Stage> getStagesInProgress();
+ public List<Stage> getFirstStageInProgressPerRequest();
/**
* Returns all the pending stages in a request, including queued and not-queued. A stage is
http://git-wip-us.apache.org/repos/asf/ambari/blob/aba473e8/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index ab4feaa..8c4eae8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -285,11 +285,16 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
* {@inheritDoc}
*/
@Override
- @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
- public List<Stage> getStagesInProgress() {
- List<StageEntity> stageEntities = stageDAO.findByStatuses(
+ public List<Stage> getFirstStageInProgressPerRequest() {
+ List<StageEntity> stageEntities = stageDAO.findFirstStageByStatus(
HostRoleStatus.IN_PROGRESS_STATUSES);
- return getStagesForEntities(stageEntities);
+
+ List<Stage> stages = new ArrayList<>(stageEntities.size());
+ for (StageEntity stageEntity : stageEntities) {
+ stages.add(stageFactory.createExisting(stageEntity));
+ }
+
+ return stages;
}
@Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
http://git-wip-us.apache.org/repos/asf/ambari/blob/aba473e8/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 0984c5c..758db35 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -355,7 +355,7 @@ class ActionScheduler implements Runnable {
}
Set<Long> runningRequestIds = new HashSet<>();
- List<Stage> stages = db.getStagesInProgress();
+ List<Stage> stages = db.getFirstStageInProgressPerRequest();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduler wakes up");
LOG.debug("Processing {} in progress stages ", stages.size());
http://git-wip-us.apache.org/repos/asf/ambari/blob/aba473e8/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index 5151fb3..c2919b2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -22,10 +22,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
@@ -129,41 +127,6 @@ public class StageDAO {
return daoUtils.selectList(query, requestId);
}
- /**
- * Gets all of the stage IDs associated with a request.
- *
- * @param requestId
- * @return the list of stage IDs.
- */
- @RequiresSession
- public List<Long> findIdsByRequestId(long requestId) {
- TypedQuery<Long> query = entityManagerProvider.get().createNamedQuery(
- "StageEntity.findIdsByRequestId", Long.class);
-
- query.setParameter("requestId", requestId);
- return daoUtils.selectList(query);
- }
-
- /**
- * Get the list of stage entities for the given request id and stage ids.
- *
- * @param requestId the request ids
- * @param stageIds the set of stage ids
- *
- * @return the set of entities for the given ids
- */
- @RequiresSession
- public List<StageEntity> findByStageIds(Long requestId, Set<Long> stageIds) {
- List<StageEntity> stageEntities = new LinkedList<>();
-
- for (StageEntity stage : findByRequestId(requestId)) {
- if (stageIds.contains(stage.getStageId())) {
- stageEntities.add(stage);
- }
- }
- return stageEntities;
- }
-
@RequiresSession
public List<StageEntity> findByRequestIdAndCommandStatuses(Long requestId, Collection<HostRoleStatus> statuses) {
TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(
@@ -175,17 +138,36 @@ public class StageDAO {
}
/**
+ * Finds the first stage matching any of the specified statuses for every
+ * request. For example, to find the first {@link HostRoleStatus#IN_PROGRESS}
+ * stage for every request, pass in
+ * {@link HostRoleStatus#IN_PROGRESS_STATUSES}.
*
- * @param statuses {@link HostRoleStatus}
- * @return list of stage entities
+ * @param statuses
+ * {@link HostRoleStatus}
+ * @return the list of the first matching stage for the given statuses for
+ * every request.
*/
@RequiresSession
- public List<StageEntity> findByStatuses(Collection<HostRoleStatus> statuses) {
- TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(
- "StageEntity.findByStatuses", StageEntity.class);
+ public List<StageEntity> findFirstStageByStatus(Collection<HostRoleStatus> statuses) {
+ TypedQuery<Object[]> query = entityManagerProvider.get().createNamedQuery(
+ "StageEntity.findFirstStageByStatus", Object[].class);
query.setParameter("statuses", statuses);
- return daoUtils.selectList(query);
+
+ List<Object[]> results = daoUtils.selectList(query);
+ List<StageEntity> stages = new ArrayList<>();
+
+ for (Object[] result : results) {
+ StageEntityPK stagePK = new StageEntityPK();
+ stagePK.setRequestId((Long) result[0]);
+ stagePK.setStageId((Long) result[1]);
+
+ StageEntity stage = findByPK(stagePK);
+ stages.add(stage);
+ }
+
+ return stages;
}
@RequiresSession
http://git-wip-us.apache.org/repos/asf/ambari/blob/aba473e8/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index f68338f..49c1594 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -46,17 +46,14 @@ import org.apache.ambari.server.actionmanager.HostRoleStatus;
@IdClass(org.apache.ambari.server.orm.entities.StageEntityPK.class)
@NamedQueries({
@NamedQuery(
- name = "StageEntity.findByStatuses",
- query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses ORDER BY stage.requestId, stage.stageId"),
+ name = "StageEntity.findFirstStageByStatus",
+ query = "SELECT stage.requestId, MIN(stage.stageId) from StageEntity stage, HostRoleCommandEntity hrc WHERE hrc.status IN :statuses AND hrc.stageId = stage.stageId AND hrc.requestId = stage.requestId GROUP by stage.requestId ORDER BY stage.requestId"),
@NamedQuery(
name = "StageEntity.findByPK",
query = "SELECT stage from StageEntity stage WHERE stage.requestId = :requestId AND stage.stageId = :stageId"),
@NamedQuery(
name = "StageEntity.findByRequestIdAndCommandStatuses",
- query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses AND stage.requestId = :requestId ORDER BY stage.stageId"),
- @NamedQuery(
- name = "StageEntity.findIdsByRequestId",
- query = "SELECT stage.stageId FROM StageEntity stage WHERE stage.requestId = :requestId ORDER BY stage.stageId ASC") })
+ query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses AND stage.requestId = :requestId ORDER BY stage.stageId") })
public class StageEntity {
@Column(name = "cluster_id", updatable = false, nullable = false)
http://git-wip-us.apache.org/repos/asf/ambari/blob/aba473e8/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java
index b0be6b3..68124fc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java
@@ -19,11 +19,15 @@
package org.apache.ambari.server.serveraction;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
@@ -32,13 +36,11 @@ import org.apache.ambari.server.actionmanager.ActionDBAccessor;
import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
-import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.Stage;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.configuration.Configuration;
-import org.apache.ambari.server.controller.internal.CalculatedStatus;
import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationToken;
-import org.apache.ambari.server.utils.StageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.core.context.SecurityContextHolder;
@@ -74,6 +76,7 @@ public class ServerActionExecutor {
@Inject
private static Configuration configuration;
+
/**
* Maps request IDs to "blackboards" of shared data.
* <p/>
@@ -84,13 +87,6 @@ public class ServerActionExecutor {
new HashMap<>();
/**
- * The hostname of the (Ambari) server.
- * <p/>
- * This hostname is cached so that cycles are spent querying for it more than once.
- */
- private final String serverHostName;
-
- /**
* Database accessor to query and update the database of action commands.
*/
private final ActionDBAccessor db;
@@ -117,6 +113,13 @@ public class ServerActionExecutor {
private Thread executorThread = null;
/**
+ * A timer used to clear out {@link #requestSharedDataMap}. Since this "cache"
+ * isn't timer- or access-based, then we must periodically check it in order
+ * to clear out any stale data.
+ */
+ private final Timer cacheTimer = new Timer("server-action-executor-cache-timer", true);
+
+ /**
* Statically initialize the Injector
* <p/>
* This should only be used for unit tests.
@@ -134,9 +137,12 @@ public class ServerActionExecutor {
* @param sleepTimeMS the time (in milliseconds) to wait between polling the database for more tasks
*/
public ServerActionExecutor(ActionDBAccessor db, long sleepTimeMS) {
- serverHostName = StageUtils.getHostName();
this.db = db;
this.sleepTimeMS = (sleepTimeMS < 1) ? POLLING_TIMEOUT_MS : sleepTimeMS;
+
+ // start in 1 hour, run every hour
+ cacheTimer.schedule(new ServerActionSharedRequestEvictor(), TimeUnit.HOURS.toMillis(1),
+ TimeUnit.HOURS.toMillis(1));
}
/**
@@ -242,48 +248,6 @@ public class ServerActionExecutor {
}
/**
- * Cleans up orphaned shared data Maps due to completed or failed request
- * contexts. We are unable to use {@link Request#getStatus()} since this field
- * is not populated in the database but, instead, calculated in realtime.
- */
- private void cleanRequestShareDataContexts() {
- // if the cache is empty, do nothing
- if (requestSharedDataMap.isEmpty()) {
- return;
- }
-
- try {
- // for every item in the map, get the request and check its status
- synchronized (requestSharedDataMap) {
- Set<Long> requestIds = requestSharedDataMap.keySet();
- List<Request> requests = db.getRequests(requestIds);
- for (Request request : requests) {
- // calcuate the status from the stages and then remove from the map if
- // necessary
- CalculatedStatus calculatedStatus = CalculatedStatus.statusFromStages(
- request.getStages());
-
- // calcuate the status of the request
- HostRoleStatus status = calculatedStatus.getStatus();
-
- // remove the request from the map if the request is COMPLETED or
- // FAILED
- switch (status) {
- case FAILED:
- case COMPLETED:
- requestSharedDataMap.remove(request.getRequestId());
- break;
- default:
- break;
- }
- }
- }
- } catch (Exception exception) {
- LOG.warn("Unable to clear the server-side action request cache", exception);
- }
- }
-
- /**
* A helper method to create CommandReports indicating the action/task is in progress
*
* @return a new CommandReport
@@ -450,8 +414,6 @@ public class ServerActionExecutor {
}
}
}
-
- cleanRequestShareDataContexts();
}
/**
@@ -599,4 +561,46 @@ public class ServerActionExecutor {
this.executionCommand = executionCommand;
}
}
+
+ /**
+ * The {@link ServerActionSharedRequestEvictor} is used to clear the shared
+ * request cache periodically. This service will only run periodically and,
+ * when it does, it will try to make the least expensive call to determine if
+ * entries need to be evicted.
+ */
+ private class ServerActionSharedRequestEvictor extends TimerTask {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+ // if the cache is empty, do nothing
+ if (requestSharedDataMap.isEmpty()) {
+ return;
+ }
+
+ // if the cache has requests, see if any are still in progress
+ try {
+ // find the requests in progress; there's no need to get the request
+ // itself since that could be a massive object; we just need the ID
+ Set<Long> requestsInProgress = new HashSet<>();
+ List<Stage> currentStageInProgressPerRequest = db.getFirstStageInProgressPerRequest();
+ for (Stage stage : currentStageInProgressPerRequest) {
+ requestsInProgress.add(stage.getRequestId());
+ }
+
+ // for every item in the map, get the request and check its status
+ synchronized (requestSharedDataMap) {
+ Set<Long> cachedRequestIds = requestSharedDataMap.keySet();
+ for (long cachedRequestId : cachedRequestIds) {
+ if (!requestsInProgress.contains(cachedRequestId)) {
+ requestSharedDataMap.remove(cachedRequestId);
+ }
+ }
+ }
+ } catch (Exception exception) {
+ LOG.warn("Unable to clear the server-side action request cache", exception);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/aba473e8/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index 81eef3b..c1056dd 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -202,11 +202,11 @@ public class TestActionDBAccessorImpl {
public void testGetStagesInProgressWithFailures() throws AmbariException {
populateActionDB(db, hostName, requestId, stageId);
populateActionDB(db, hostName, requestId + 1, stageId);
- List<Stage> stages = db.getStagesInProgress();
+ List<Stage> stages = db.getFirstStageInProgressPerRequest();
assertEquals(2, stages.size());
db.abortOperation(requestId);
- stages = db.getStagesInProgress();
+ stages = db.getFirstStageInProgressPerRequest();
assertEquals(1, stages.size());
assertEquals(requestId+1, stages.get(0).getRequestId());
}
@@ -220,9 +220,9 @@ public class TestActionDBAccessorImpl {
// verify stages and proper ordering
int commandsInProgressCount = db.getCommandsInProgressCount();
- List<Stage> stages = db.getStagesInProgress();
+ List<Stage> stages = db.getFirstStageInProgressPerRequest();
assertEquals(18, commandsInProgressCount);
- assertEquals(9, stages.size());
+ assertEquals(3, stages.size());
long lastRequestId = Integer.MIN_VALUE;
for (Stage stage : stages) {
@@ -235,9 +235,9 @@ public class TestActionDBAccessorImpl {
// verify stages and proper ordering
commandsInProgressCount = db.getCommandsInProgressCount();
- stages = db.getStagesInProgress();
+ stages = db.getFirstStageInProgressPerRequest();
assertEquals(12, commandsInProgressCount);
- assertEquals(6, stages.size());
+ assertEquals(2, stages.size());
// find the first stage, and change one command to COMPLETED
stages.get(0).setHostRoleStatus(hostName, Role.HBASE_MASTER.toString(),
@@ -247,9 +247,9 @@ public class TestActionDBAccessorImpl {
// the first stage still has at least 1 command IN_PROGRESS
commandsInProgressCount = db.getCommandsInProgressCount();
- stages = db.getStagesInProgress();
+ stages = db.getFirstStageInProgressPerRequest();
assertEquals(11, commandsInProgressCount);
- assertEquals(6, stages.size());
+ assertEquals(2, stages.size());
// find the first stage, and change the other command to COMPLETED
stages.get(0).setHostRoleStatus(hostName,
@@ -260,9 +260,9 @@ public class TestActionDBAccessorImpl {
// verify stages and proper ordering
commandsInProgressCount = db.getCommandsInProgressCount();
- stages = db.getStagesInProgress();
+ stages = db.getFirstStageInProgressPerRequest();
assertEquals(10, commandsInProgressCount);
- assertEquals(5, stages.size());
+ assertEquals(2, stages.size());
}
@Test
@@ -274,15 +274,16 @@ public class TestActionDBAccessorImpl {
}
// create 1 request, 3 stages per host, each with 2 commands
- for (int i = 0; i < 1000; i++) {
+ int requestCount = 1000;
+ for (int i = 0; i < requestCount; i++) {
String hostName = "c64-" + i;
populateActionDBMultipleStages(3, db, hostName, requestId + i, stageId);
}
int commandsInProgressCount = db.getCommandsInProgressCount();
- List<Stage> stages = db.getStagesInProgress();
+ List<Stage> stages = db.getFirstStageInProgressPerRequest();
assertEquals(6000, commandsInProgressCount);
- assertEquals(3000, stages.size());
+ assertEquals(requestCount, stages.size());
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/aba473e8/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 2b5d2f3..d7d3d40 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -210,13 +210,12 @@ public class TestActionScheduler {
ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
- List<Stage> stages = new ArrayList<>();
Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
- stages.add(s);
+ List<Stage> stages = Collections.singletonList(s);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -307,20 +306,19 @@ public class TestActionScheduler {
hostEntity.setHostName(hostname);
hostDAO.create(hostEntity);
- List<Stage> stages = new ArrayList<>();
final Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
s.addHostRoleExecutionCommand(hostname, Role.SECONDARY_NAMENODE, RoleCommand.INSTALL,
new ServiceComponentHostInstallEvent("SECONDARY_NAMENODE", hostname, System.currentTimeMillis(), "HDP-1.2.0"),
"cluster1", "HDFS", false, false);
s.setHostRoleStatus(hostname, "SECONDARY_NAMENODE", HostRoleStatus.IN_PROGRESS);
- stages.add(s);
+ List<Stage> stages = Collections.singletonList(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
@@ -397,10 +395,10 @@ public class TestActionScheduler {
when(host.getState()).thenReturn(HostState.HEARTBEAT_LOST);
when(host.getHostName()).thenReturn(hostname);
- final List<Stage> stages = new ArrayList<>();
final Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
- stages.add(s);
+
+ List<Stage> stages = Collections.singletonList(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
@@ -409,7 +407,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
@@ -484,14 +482,13 @@ public class TestActionScheduler {
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
- final List<Stage> stages = new ArrayList<>();
final Stage stage = stageFactory.createNew(1, "/tmp", "cluster1", 1L, "stageWith2Tasks",
CLUSTER_HOST_INFO, "{\"command_param\":\"param_value\"}", "{\"host_param\":\"param_value\"}");
addInstallTaskToStage(stage, hostname1, "cluster1", Role.DATANODE,
RoleCommand.INSTALL, Service.Type.HDFS, 1);
addInstallTaskToStage(stage, hostname2, "cluster1", Role.NAMENODE,
RoleCommand.INSTALL, Service.Type.HDFS, 2);
- stages.add(stage);
+ final List<Stage> stages = Collections.singletonList(stage);
ActionDBAccessor db = mock(ActionDBAccessor.class);
@@ -500,7 +497,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
@@ -606,10 +603,9 @@ public class TestActionScheduler {
Clusters fsm = mock(Clusters.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
- List<Stage> stages = new ArrayList<>();
Map<String, String> payload = new HashMap<>();
final Stage s = getStageWithServerAction(1, 977, payload, "test", 1200, false, false);
- stages.add(s);
+ List<Stage> stages = Collections.singletonList(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
@@ -620,7 +616,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -737,7 +733,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION.getKey(), "true");
@@ -766,11 +762,10 @@ public class TestActionScheduler {
Clusters fsm = mock(Clusters.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
- List<Stage> stages = new ArrayList<>();
Map<String, String> payload = new HashMap<>();
payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "timeout");
final Stage s = getStageWithServerAction(1, 977, payload, "test", 2, false, false);
- stages.add(s);
+ List<Stage> stages = Collections.singletonList(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
@@ -781,7 +776,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -981,11 +976,10 @@ public class TestActionScheduler {
Clusters fsm = mock(Clusters.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
- List<Stage> stages = new ArrayList<>();
Map<String, String> payload = new HashMap<>();
payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "exception");
final Stage s = getStageWithServerAction(1, 977, payload, "test", 300, false, false);
- stages.add(s);
+ List<Stage> stages = Collections.singletonList(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
@@ -996,7 +990,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
@@ -1146,7 +1140,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
@@ -1238,7 +1232,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION.getKey(), "false");
@@ -1315,7 +1309,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION.getKey(), "true");
@@ -1357,9 +1351,13 @@ public class TestActionScheduler {
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
final List<Stage> stages = new ArrayList<>();
+
stages.add(
getStageWithSingleTask(
hostname, "cluster1", Role.NAMENODE, RoleCommand.UPGRADE, Service.Type.HDFS, 1, 1, 1));
+
+ List<Stage> firstStageInProgress = Collections.singletonList(stages.get(0));
+
stages.add(
getStageWithSingleTask(
hostname, "cluster1", Role.DATANODE, RoleCommand.UPGRADE, Service.Type.HDFS, 2, 2, 1));
@@ -1376,7 +1374,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(firstStageInProgress);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -1517,8 +1515,6 @@ public class TestActionScheduler {
hostDAO.create(hostEntity1);
hostDAO.create(hostEntity2);
- final List<Stage> stages = new ArrayList<>();
-
long now = System.currentTimeMillis();
Stage stage = stageFactory.createNew(1, "/tmp", "cluster1", 1L,
"testRequestFailureBasedOnSuccessFactor", CLUSTER_HOST_INFO, "", "");
@@ -1545,7 +1541,7 @@ public class TestActionScheduler {
addHostRoleExecutionCommand(now, stage, Role.GANGLIA_MONITOR, Service.Type.GANGLIA,
RoleCommand.INSTALL, host2, "cluster1");
- stages.add(stage);
+ final List<Stage> stages = Collections.singletonList(stage);
HostRoleStatus[] statusesAtIterOne = {HostRoleStatus.QUEUED, HostRoleStatus.QUEUED,
HostRoleStatus.QUEUED, HostRoleStatus.QUEUED, HostRoleStatus.FAILED,
@@ -1572,7 +1568,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -1740,7 +1736,9 @@ public class TestActionScheduler {
"cluster1", Service.Type.HDFS.toString(), false, false);
stage.getExecutionCommandWrapper("host3",
Role.DATANODE.toString()).getExecutionCommand();
+
stages.add(stage);
+ List<Stage> stageInProgress = Collections.singletonList(stage);
stage.getOrderedHostRoleCommands().get(0).setTaskId(1);
stage.getOrderedHostRoleCommands().get(1).setTaskId(2);
@@ -1758,8 +1756,8 @@ public class TestActionScheduler {
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
- when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getCommandsInProgressCount()).thenReturn(stageInProgress.size());
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stageInProgress);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -2013,7 +2011,7 @@ public class TestActionScheduler {
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
when(db.getCommandsInProgressCount()).thenReturn(1);
- when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s1));
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(Collections.singletonList(s1));
//Keep large number of attempts so that the task is not expired finally
//Small action timeout to test rescheduling
@@ -2030,7 +2028,7 @@ public class TestActionScheduler {
assertEquals(clusterHostInfo1, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
when(db.getCommandsInProgressCount()).thenReturn(1);
- when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2));
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(Collections.singletonList(s2));
//Verify that ActionSheduler does not return cached value of cluster host info for new requestId
ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -2083,14 +2081,13 @@ public class TestActionScheduler {
"dummyService", "dummyComponent", "dummyHostname"));
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
- final List<Stage> stages = new ArrayList<>();
Stage stage1 = stageFactory.createNew(1, "/tmp", "cluster1", 1L, "stageWith2Tasks",
CLUSTER_HOST_INFO, "", "");
addInstallTaskToStage(stage1, hostname1, "cluster1", Role.HBASE_MASTER,
RoleCommand.INSTALL, Service.Type.HBASE, 1);
addInstallTaskToStage(stage1, hostname1, "cluster1", Role.HBASE_REGIONSERVER,
RoleCommand.INSTALL, Service.Type.HBASE, 2);
- stages.add(stage1);
+ final List<Stage> stages = Collections.singletonList(stage1);
ActionDBAccessor db = mock(ActionDBAccessor.class);
@@ -2099,7 +2096,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock,
@@ -2155,10 +2152,9 @@ public class TestActionScheduler {
Clusters fsm = mock(Clusters.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
- List<Stage> stages = new ArrayList<>();
Map<String, String> payload = new HashMap<>();
final Stage s = getStageWithServerAction(1, 977, payload, "test", 300, false, false);
- stages.add(s);
+ List<Stage> stages = Collections.singletonList(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
@@ -2169,7 +2165,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -2259,6 +2255,7 @@ public class TestActionScheduler {
long requestId = 1;
final List<Stage> allStages = new ArrayList<>();
final List<Stage> stagesInProgress = new ArrayList<>();
+ final List<Stage> firstStageInProgress = new ArrayList<>();
final List<HostRoleCommand> tasksInProgress = new ArrayList<>();
final List<HostRoleCommandEntity> hrcEntitiesInProgress = new ArrayList<>();
@@ -2279,6 +2276,7 @@ public class TestActionScheduler {
Service.Type.HDFS, namenodeCmdTaskId, 2, (int) requestId);
tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands());
+ firstStageInProgress.add(stageWithTask);
stagesInProgress.add(stageWithTask);
allStages.add(stageWithTask);
@@ -2319,7 +2317,7 @@ public class TestActionScheduler {
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
- when(db.getStagesInProgress()).thenReturn(stagesInProgress);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stagesInProgress);
when(db.getStagesInProgressForRequest(requestId)).thenReturn(stagesInProgress);
when(db.getAllStages(anyLong())).thenReturn(allStages);
@@ -2458,26 +2456,34 @@ public class TestActionScheduler {
long requestId2 = 2;
long requestId3 = 3;
+ final List<Stage> firstStageInProgressByRequest = new ArrayList<>();
final List<Stage> stagesInProgress = new ArrayList<>();
int namenodeCmdTaskId = 1;
- stagesInProgress.add(
- getStageWithSingleTask(
- hostname1, "cluster1", Role.NAMENODE, RoleCommand.START,
- Service.Type.HDFS, namenodeCmdTaskId, 1, (int) requestId1));
- stagesInProgress.add(
- getStageWithSingleTask(
- hostname1, "cluster1", Role.DATANODE, RoleCommand.START,
- Service.Type.HDFS, 2, 2, (int) requestId1));
- stagesInProgress.add(
- getStageWithSingleTask(
- hostname2, "cluster1", Role.DATANODE, RoleCommand.STOP, //Exclusive
- Service.Type.HDFS, 3, 3, (int) requestId2));
- stagesInProgress.add(
- getStageWithSingleTask(
- hostname3, "cluster1", Role.DATANODE, RoleCommand.START,
- Service.Type.HDFS, 4, 4, (int) requestId3));
+ Stage request1Stage1 = getStageWithSingleTask(hostname1, "cluster1", Role.NAMENODE,
+ RoleCommand.START,
+ Service.Type.HDFS, namenodeCmdTaskId, 1, (int) requestId1);
+
+ Stage request1Stage2 = getStageWithSingleTask(hostname1, "cluster1", Role.DATANODE,
+ RoleCommand.START,
+ Service.Type.HDFS, 2, 2, (int) requestId1);
+
+ Stage request2Stage1 = getStageWithSingleTask(hostname2, "cluster1", Role.DATANODE,
+ RoleCommand.STOP, // Exclusive
+ Service.Type.HDFS, 3, 3, (int) requestId2);
+
+ Stage request3Stage1 = getStageWithSingleTask(hostname3, "cluster1", Role.DATANODE,
+ RoleCommand.START,
+ Service.Type.HDFS, 4, 4, (int) requestId3);
+ firstStageInProgressByRequest.add(request1Stage1);
+ firstStageInProgressByRequest.add(request2Stage1);
+ firstStageInProgressByRequest.add(request3Stage1);
+
+ stagesInProgress.add(request1Stage1);
+ stagesInProgress.add(request1Stage2);
+ stagesInProgress.add(request2Stage1);
+ stagesInProgress.add(request3Stage1);
Host host1 = mock(Host.class);
when(fsm.getHost(anyString())).thenReturn(host1);
@@ -2498,7 +2504,7 @@ public class TestActionScheduler {
HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
- when(db.getStagesInProgress()).thenReturn(stagesInProgress);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(firstStageInProgressByRequest);
List<HostRoleCommand> requestTasks = new ArrayList<>();
for (Stage stage : stagesInProgress) {
@@ -2585,6 +2591,10 @@ public class TestActionScheduler {
Assert.assertFalse(startedRequests.containsKey(requestId3));
stagesInProgress.remove(0);
+ firstStageInProgressByRequest.clear();
+ firstStageInProgressByRequest.add(request1Stage2);
+ firstStageInProgressByRequest.add(request2Stage1);
+ firstStageInProgressByRequest.add(request3Stage1);
scheduler.doWork();
@@ -2595,6 +2605,9 @@ public class TestActionScheduler {
// Execution of request 2
stagesInProgress.remove(0);
+ firstStageInProgressByRequest.clear();
+ firstStageInProgressByRequest.add(request2Stage1);
+ firstStageInProgressByRequest.add(request3Stage1);
scheduler.doWork();
@@ -2605,6 +2618,8 @@ public class TestActionScheduler {
// Execution of request 3
stagesInProgress.remove(0);
+ firstStageInProgressByRequest.clear();
+ firstStageInProgressByRequest.add(request3Stage1);
scheduler.doWork();
@@ -2715,6 +2730,7 @@ public class TestActionScheduler {
Stage stage = null;
Stage stage2 = null;
final List<Stage> stages = new ArrayList<>();
+ final List<Stage> firstStageInProgress = new ArrayList<>();
stages.add(stage = getStageWithSingleTask(hostname1, "cluster1", Role.NAMENODE,
RoleCommand.STOP, Service.Type.HDFS, 1, 1, 1));
@@ -2735,6 +2751,9 @@ public class TestActionScheduler {
HostRoleCommand command = stage.getOrderedHostRoleCommands().iterator().next();
command.setStatus(HostRoleStatus.FAILED);
+ // still in progress even though 1 task has been failed
+ firstStageInProgress.add(stage);
+
ActionDBAccessor db = mock(ActionDBAccessor.class);
HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
Mockito.doNothing().when(hostRoleCommandDAOMock).publishTaskCreateEvent(anyListOf(HostRoleCommand.class));
@@ -2743,8 +2762,8 @@ public class TestActionScheduler {
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
- when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getCommandsInProgressCount()).thenReturn(firstStageInProgress.size());
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(firstStageInProgress);
doAnswer(new Answer<Void>() {
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/aba473e8/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/RequestDAOTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/RequestDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/RequestDAOTest.java
index 9b62671..17cebc3 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/RequestDAOTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/RequestDAOTest.java
@@ -42,6 +42,7 @@ import org.apache.ambari.server.orm.entities.RequestEntity;
import org.apache.ambari.server.orm.entities.ResourceEntity;
import org.apache.ambari.server.orm.entities.ResourceTypeEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
import org.apache.ambari.server.security.authorization.ResourceType;
import org.junit.After;
import org.junit.Assert;
@@ -124,7 +125,25 @@ public class RequestDAOTest {
group.add(4L);
// !!! accepted
- List<StageEntity> stages = stageDAO.findByStageIds(requestEntity.getRequestId(), group);
+ List<StageEntity> stages = new ArrayList<>();
+ StageEntityPK primaryKey = new StageEntityPK();
+ primaryKey.setRequestId(requestEntity.getRequestId());
+ primaryKey.setStageId(2L);
+
+ StageEntity stage = stageDAO.findByPK(primaryKey);
+ Assert.assertNotNull(stage);
+ stages.add(stage);
+
+ primaryKey.setStageId(3L);
+ stage = stageDAO.findByPK(primaryKey);
+ Assert.assertNotNull(stage);
+ stages.add(stage);
+
+ primaryKey.setStageId(4L);
+ stage = stageDAO.findByPK(primaryKey);
+ Assert.assertNotNull(stage);
+ stages.add(stage);
+
CalculatedStatus calc3 = CalculatedStatus.statusFromStageEntities(stages);
// !!! aggregated
http://git-wip-us.apache.org/repos/asf/ambari/blob/aba473e8/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java
index 44d5b63..2feef41 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java
@@ -268,7 +268,7 @@ public class ServerActionExecutorTest {
private ActionDBAccessor createMockActionDBAccessor(final Request request, final List<Stage> stages) {
ActionDBAccessor db = mock(ActionDBAccessor.class);
- when(db.getStagesInProgress()).thenReturn(stages);
+ when(db.getFirstStageInProgressPerRequest()).thenReturn(stages);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
http://git-wip-us.apache.org/repos/asf/ambari/blob/aba473e8/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
index e2ce6e7..2c0b507 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/services/RetryUpgradeActionServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.ambari.server.state.services;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import org.apache.ambari.server.AmbariException;
@@ -42,6 +41,7 @@ import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
import org.apache.ambari.server.orm.entities.RequestEntity;
import org.apache.ambari.server.orm.entities.StackEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.StageEntityPK;
import org.apache.ambari.server.orm.entities.UpgradeEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
@@ -148,9 +148,11 @@ public class RetryUpgradeActionServiceTest {
}
// Case 4: Cluster with an active upgrade that contains a failed task in HOLDING_FAILED that does NOT meet conditions to be retried.
- List<StageEntity> stages = stageDAO.findByStageIds(upgradeRequestId, new HashSet<Long>(){{ add(stageId); }});
- Assert.assertTrue(!stages.isEmpty() && stages.size() == 1);
- StageEntity stageEntity = stages.get(0);
+ StageEntityPK primaryKey = new StageEntityPK();
+ primaryKey.setRequestId(upgradeRequestId);
+ primaryKey.setStageId(stageId);
+
+ StageEntity stageEntity = stageDAO.findByPK(primaryKey);
HostRoleCommandEntity hrc2 = new HostRoleCommandEntity();
hrc2.setStage(stageEntity);
@@ -202,7 +204,7 @@ public class RetryUpgradeActionServiceTest {
// Ensure that task 2 transitioned from HOLDING_TIMEDOUT to PENDING
Assert.assertEquals(HostRoleStatus.PENDING, hostRoleCommandDAO.findByPK(hrc2.getTaskId()).getStatus());
-
+
// Case 7: Cluster with an active upgrade that contains a failed task in HOLDING_FAILED that was already retried and has now expired.
now = System.currentTimeMillis();
hrc2.setOriginalStartTime(now - (timeoutMins * 60000) - 1);