You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2015/03/04 22:43:04 UTC
ambari git commit: AMBARI-9334 - Ambari
StageDAO.findByCommandStatuses causes Postgress HIGH CPU (jonathanhurley)
Repository: ambari
Updated Branches:
refs/heads/branch-1.7.0 3271d1d1f -> 7566e570a
AMBARI-9334 - Ambari StageDAO.findByCommandStatuses causes Postgress HIGH CPU (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7566e570
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7566e570
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7566e570
Branch: refs/heads/branch-1.7.0
Commit: 7566e570aff4622e4eee7023325583244564a3af
Parents: 3271d1d
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Wed Mar 4 16:06:02 2015 -0500
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Wed Mar 4 16:06:02 2015 -0500
----------------------------------------------------------------------
.../server/actionmanager/ActionDBAccessor.java | 33 ++--
.../actionmanager/ActionDBAccessorImpl.java | 85 ++++++----
.../server/actionmanager/ActionManager.java | 29 ++--
.../server/actionmanager/ActionScheduler.java | 47 ++++--
.../server/actionmanager/HostRoleStatus.java | 7 +
.../server/orm/dao/HostRoleCommandDAO.java | 66 ++++++--
.../apache/ambari/server/orm/dao/StageDAO.java | 42 ++---
.../orm/entities/HostRoleCommandEntity.java | 87 ++++++++---
.../ambari/server/orm/entities/StageEntity.java | 66 ++++++--
.../actionmanager/TestActionDBAccessorImpl.java | 156 ++++++++++++++++---
.../actionmanager/TestActionScheduler.java | 100 ++++++------
11 files changed, 497 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 1f99b4a..725fa69 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -17,15 +17,15 @@
*/
package org.apache.ambari.server.actionmanager;
-import com.google.inject.persist.Transactional;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import com.google.inject.persist.Transactional;
public interface ActionDBAccessor {
@@ -57,12 +57,26 @@ public interface ActionDBAccessor {
public void timeoutHostRole(String host, long requestId, long stageId, String role);
/**
- * Returns all the pending stages, including queued and not-queued.
- * A stage is considered in progress if it is in progress for any host.
+ * Returns all the pending stages, including queued and not-queued. A stage is
+ * considered in progress if it is in progress for any host.
+ * <p/>
+ * The results will be sorted by request ID and then stage ID making this call
+ * expensive in some scenarios. Use {@link #getCommandsInProgressCount()} in
+ * order to determine if there are stages that are in progress before getting
+ * the stages from this method.
+ *
+ * @see HostRoleStatus#IN_PROGRESS_STATUSES
*/
public List<Stage> getStagesInProgress();
/**
+ * Gets the number of commands in progress.
+ *
+ * @return the number of commands in progress.
+ */
+ public int getCommandsInProgressCount();
+
+ /**
* Persists all tasks for a given request
* @param request request object
*/
@@ -149,11 +163,6 @@ public interface ActionDBAccessor {
public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds);
/**
- * Get all stages that contain tasks with specified host role statuses
- */
- public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses);
-
- /**
* Gets the host role command corresponding to the task id
*/
public HostRoleCommand getTask(long taskId);
http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 5e879cc..220fb95 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -17,12 +17,18 @@
*/
package org.apache.ambari.server.actionmanager;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.google.inject.name.Named;
-import com.google.inject.persist.Transactional;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
@@ -48,49 +54,55 @@ import org.apache.ambari.server.utils.StageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import com.google.inject.persist.Transactional;
@Singleton
public class ActionDBAccessorImpl implements ActionDBAccessor {
private static final Logger LOG = LoggerFactory.getLogger(ActionDBAccessorImpl.class);
+
private long requestId;
+
@Inject
ClusterDAO clusterDAO;
+
@Inject
HostDAO hostDAO;
+
@Inject
RequestDAO requestDAO;
+
@Inject
StageDAO stageDAO;
+
@Inject
HostRoleCommandDAO hostRoleCommandDAO;
+
@Inject
ExecutionCommandDAO executionCommandDAO;
+
@Inject
RoleSuccessCriteriaDAO roleSuccessCriteriaDAO;
+
@Inject
StageFactory stageFactory;
+
@Inject
RequestFactory requestFactory;
+
@Inject
HostRoleCommandFactory hostRoleCommandFactory;
+
@Inject
Clusters clusters;
+
@Inject
RequestScheduleDAO requestScheduleDAO;
-
-
private Cache<Long, HostRoleCommand> hostRoleCommandCache;
private long cacheLimit; //may be exceeded to store tasks from one request
@@ -186,21 +198,34 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
endRequestIfCompleted(requestId);
}
- /* (non-Javadoc)
- * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getPendingStages()
+ /**
+ * {@inheritDoc}
*/
@Override
public List<Stage> getStagesInProgress() {
List<Stage> stages = new ArrayList<Stage>();
- List<HostRoleStatus> statuses =
- Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS,
- HostRoleStatus.PENDING);
- for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) {
+ List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(HostRoleStatus.IN_PROGRESS_STATUSES);
+
+ for (StageEntity stageEntity : stageEntities) {
stages.add(stageFactory.createExisting(stageEntity));
}
+
return stages;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int getCommandsInProgressCount() {
+ Number count = hostRoleCommandDAO.getCountByStatus(HostRoleStatus.IN_PROGRESS_STATUSES);
+ if (null == count) {
+ return 0;
+ }
+
+ return count.intValue();
+ }
+
@Override
@Transactional
public void persistActions(Request request) throws AmbariException {
@@ -212,7 +237,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
if (clusterEntity != null) {
clusterId = clusterEntity.getClusterId();
}
-
+
requestEntity.setClusterId(clusterId);
requestDAO.create(requestEntity);
@@ -550,14 +575,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
}
@Override
- public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) {
- List<Stage> stages = new ArrayList<Stage>();
- for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) {
- stages.add(stageFactory.createExisting(stageEntity));
- }
- return stages;
- }
-
public HostRoleCommand getTask(long taskId) {
HostRoleCommandEntity commandEntity = hostRoleCommandDAO.findByPK((int) taskId);
if (commandEntity == null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index e2fad5f..fb0b3aa 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -17,14 +17,16 @@
*/
package org.apache.ambari.server.actionmanager;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.google.inject.name.Named;
-import com.google.inject.persist.UnitOfWork;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.CommandReport;
-import org.apache.ambari.server.api.services.BaseRequest;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.ExecuteActionRequest;
import org.apache.ambari.server.controller.HostsMap;
@@ -34,13 +36,10 @@ import org.apache.ambari.server.utils.StageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import com.google.inject.persist.UnitOfWork;
/**
@@ -62,7 +61,7 @@ public class ActionManager {
ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap,
ServerActionManager serverActionManager, UnitOfWork unitOfWork,
RequestFactory requestFactory, Configuration configuration) {
- this.actionQueue = aq;
+ actionQueue = aq;
this.db = db;
scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db,
actionQueue, fsm, 2, hostsMap, serverActionManager, unitOfWork, configuration);
@@ -204,10 +203,6 @@ public class ActionManager {
return db.getTasks(taskIds);
}
- public List<Stage> getRequestsByHostRoleStatus(Set<HostRoleStatus> statuses) {
- return db.getStagesByHostRoleStatus(statuses);
- }
-
/**
* Get first or last maxResults requests that are in the specified status
*
http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 81fee75..e043d79 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -128,22 +128,22 @@ class ActionScheduler implements Runnable {
ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager,
UnitOfWork unitOfWork, Configuration configuration) {
- this.sleepTime = sleepTimeMilliSec;
+ sleepTime = sleepTimeMilliSec;
this.hostsMap = hostsMap;
- this.actionTimeout = actionTimeoutMilliSec;
+ actionTimeout = actionTimeoutMilliSec;
this.db = db;
this.actionQueue = actionQueue;
this.fsmObject = fsmObject;
this.maxAttempts = (short) maxAttempts;
this.serverActionManager = serverActionManager;
this.unitOfWork = unitOfWork;
- this.clusterHostInfoCache = CacheBuilder.newBuilder().
+ clusterHostInfoCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
- this.commandParamsStageCache = CacheBuilder.newBuilder().
+ commandParamsStageCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
- this.hostParamsStageCache = CacheBuilder.newBuilder().
+ hostParamsStageCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
this.configuration = configuration;
@@ -202,6 +202,18 @@ class ActionScheduler implements Runnable {
// The first thing to do is to abort requests that are cancelled
processCancelledRequestsList();
+ // !!! getting the stages in progress could be a very expensive call due
+ // to the join being used; there's no need to make it if there are
+ // no commands in progress
+ if (db.getCommandsInProgressCount() == 0) {
+ // Nothing to do
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("There are no stages currently in progress.");
+ }
+
+ return;
+ }
+
Set<Long> runningRequestIds = new HashSet<Long>();
List<Stage> stages = db.getStagesInProgress();
if (LOG.isDebugEnabled()) {
@@ -209,14 +221,14 @@ class ActionScheduler implements Runnable {
LOG.debug("Processing {} in progress stages ", stages.size());
}
if (stages.isEmpty()) {
- //Nothing to do
+ // Nothing to do
if (LOG.isDebugEnabled()) {
- LOG.debug("No stage in progress..nothing to do");
+ LOG.debug("There are no stages currently in progress.");
}
return;
}
int i_stage = 0;
-
+
stages = filterParallelPerHostStages(stages);
boolean exclusiveRequestIsGoing = false;
@@ -420,7 +432,7 @@ class ActionScheduler implements Runnable {
s.setHostRoleStatus(hostName, roleName, HostRoleStatus.QUEUED);
db.hostRoleScheduled(s, hostName, roleName);
String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME);
- this.serverActionManager.executeAction(actionName, cmd.getCommandParams());
+ serverActionManager.executeAction(actionName, cmd.getCommandParams());
reportServerActionSuccess(s, cmd);
} catch (AmbariException e) {
@@ -569,12 +581,12 @@ class ActionScheduler implements Runnable {
// Check that service host component is not deleted
if (hostDeleted) {
-
+
String message = String.format(
"Host not found when trying to schedule an execution command. " +
"The most probable reason for that is that host or host component " +
"has been deleted recently. The command has been aborted and dequeued." +
- "Execution command details: " +
+ "Execution command details: " +
"cmdId: %s; taskId: %s; roleCommand: %s",
c.getCommandId(), c.getTaskId(), c.getRoleCommand());
LOG.warn("Host {} has been detected as non-available. {}", host, message);
@@ -609,7 +621,7 @@ class ActionScheduler implements Runnable {
LOG.trace("===>commandsToSchedule(first_time)=" + commandsToSchedule.size());
}
- this.updateRoleStats(status, roleStats.get(roleStr));
+ updateRoleStats(status, roleStats.get(roleStr));
}
}
LOG.debug("Collected {} commands to schedule in this wakeup.", commandsToSchedule.size());
@@ -772,7 +784,7 @@ class ActionScheduler implements Runnable {
}
cmd.setClusterHostInfo(clusterHostInfo);
-
+
//Try to get commandParams from cache and merge them with command-level parameters
Map<String, String> commandParams = commandParamsStageCache.getIfPresent(stagePk);
@@ -888,10 +900,10 @@ class ActionScheduler implements Runnable {
LOG.error("Unknown status " + status.name());
}
}
-
-
+
+
public void setTaskTimeoutAdjustment(boolean val) {
- this.taskTimeoutAdjustment = val;
+ taskTimeoutAdjustment = val;
}
static class RoleStats {
@@ -906,7 +918,7 @@ class ActionScheduler implements Runnable {
final float successFactor;
RoleStats(int total, float successFactor) {
- this.totalHosts = total;
+ totalHosts = total;
this.successFactor = successFactor;
}
@@ -938,6 +950,7 @@ class ActionScheduler implements Runnable {
}
}
+ @Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("numQueued="+numQueued);
http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
index 447aead..b0ebe83 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
@@ -19,6 +19,7 @@ package org.apache.ambari.server.actionmanager;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
public enum HostRoleStatus {
@@ -34,6 +35,12 @@ public enum HostRoleStatus {
private static List<HostRoleStatus> COMPLETED_STATES = Arrays.asList(FAILED, TIMEDOUT, ABORTED, COMPLETED);
private static List<HostRoleStatus> FAILED_STATES = Arrays.asList(FAILED, TIMEDOUT, ABORTED);
+ /**
+ * The {@link HostRoleStatus}s that represent any commands which are
+ * considered to be "In Progress".
+ */
+ public static final EnumSet<HostRoleStatus> IN_PROGRESS_STATUSES = EnumSet.of(
+ HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS, HostRoleStatus.PENDING);
private HostRoleStatus(int status) {
this.status = status;
http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 6920a9e..dce8961 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -18,26 +18,30 @@
package org.apache.ambari.server.orm.dao;
-import com.google.common.collect.Lists;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-import com.google.inject.persist.Transactional;
-import org.apache.ambari.server.actionmanager.HostRoleStatus;
-import org.apache.ambari.server.orm.RequiresSession;
-import org.apache.ambari.server.orm.entities.HostEntity;
-import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
-import org.apache.ambari.server.orm.entities.StageEntity;
-import javax.persistence.EntityManager;
-import javax.persistence.TypedQuery;
+import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE;
+import static org.apache.ambari.server.orm.dao.DaoUtils.ORACLE_LIST_LIMIT;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE;
-import static org.apache.ambari.server.orm.dao.DaoUtils.ORACLE_LIST_LIMIT;
+
+import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
+
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.orm.RequiresSession;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.google.inject.persist.Transactional;
@Singleton
public class HostRoleCommandDAO {
@@ -194,6 +198,40 @@ public class HostRoleCommandDAO {
return daoUtils.selectList(query, requestId);
}
+ /**
+ * Gets the commands in a particular status.
+ *
+ * @param statuses
+ * the statuses to include (not {@code null}).
+ * @return the commands in the given set of statuses.
+ */
+ @RequiresSession
+ public List<HostRoleCommandEntity> findByStatus(
+ Collection<HostRoleStatus> statuses) {
+ TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery(
+ "HostRoleCommandEntity.findByCommandStatuses",
+ HostRoleCommandEntity.class);
+
+ query.setParameter("statuses", statuses);
+ return daoUtils.selectList(query);
+ }
+
+ /**
+ * Gets the number of commands in a particular status.
+ *
+ * @param statuses
+ * the statuses to include (not {@code null}).
+ * @return the count of commands in the given set of statuses.
+ */
+ @RequiresSession
+ public Number getCountByStatus(Collection<HostRoleStatus> statuses) {
+ TypedQuery<Number> query = entityManagerProvider.get().createNamedQuery(
+ "HostRoleCommandEntity.findCountByCommandStatuses", Number.class);
+
+ query.setParameter("statuses", statuses);
+ return daoUtils.selectSingle(query);
+ }
+
@RequiresSession
public List<HostRoleCommandEntity> findAll() {
return daoUtils.selectAll(entityManagerProvider.get(), HostRoleCommandEntity.class);
http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index 900dbeb..621ff1c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -18,21 +18,24 @@
package org.apache.ambari.server.orm.dao;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-import com.google.inject.persist.Transactional;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
+
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.orm.entities.StageEntityPK;
import org.apache.ambari.server.utils.StageUtils;
-import javax.persistence.EntityManager;
-import javax.persistence.TypedQuery;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Collection;
-import java.util.Map;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.google.inject.persist.Transactional;
@Singleton
public class StageDAO {
@@ -82,12 +85,13 @@ public class StageDAO {
}
@RequiresSession
- public List<StageEntity> findByCommandStatuses(Collection<HostRoleStatus> statuses) {
- TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage " +
- "FROM StageEntity stage WHERE stage.stageId IN (SELECT hrce.stageId FROM " +
- "HostRoleCommandEntity hrce WHERE stage.requestId = hrce.requestId and hrce.status IN ?1 ) " +
- "ORDER BY stage.requestId, stage.stageId", StageEntity.class);
- return daoUtils.selectList(query, statuses);
+ public List<StageEntity> findByCommandStatuses(
+ Collection<HostRoleStatus> statuses) {
+ TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery(
+ "StageEntity.findByCommandStatuses", StageEntity.class);
+
+ query.setParameter("statuses", statuses);
+ return daoUtils.selectList(query);
}
@RequiresSession
@@ -114,10 +118,12 @@ public class StageDAO {
"SELECT stage.requestContext " + "FROM StageEntity stage " +
"WHERE stage.requestId=?1", String.class);
String result = daoUtils.selectOne(query, requestId);
- if (result != null)
+ if (result != null) {
return result;
- else
+ }
+ else {
return ""; // Since it is defined as empty string in the StageEntity
+ }
}
@Transactional
http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index 599156a..375d895 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -36,6 +36,8 @@ import javax.persistence.JoinColumn;
import javax.persistence.JoinColumns;
import javax.persistence.Lob;
import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
import javax.persistence.OneToOne;
import javax.persistence.Table;
import javax.persistence.TableGenerator;
@@ -45,15 +47,17 @@ import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.commons.lang.ArrayUtils;
-@Table(name = "host_role_command")
@Entity
+@Table(name = "host_role_command")
@TableGenerator(name = "host_role_command_id_generator",
table = "ambari_sequences", pkColumnName = "sequence_name", valueColumnName = "sequence_value"
, pkColumnValue = "host_role_command_id_seq"
, initialValue = 1
, allocationSize = 50
)
-
+@NamedQueries({
+ @NamedQuery(name = "HostRoleCommandEntity.findCountByCommandStatuses", query = "SELECT COUNT(command.taskId) FROM HostRoleCommandEntity command WHERE command.status IN :statuses"),
+ @NamedQuery(name = "HostRoleCommandEntity.findByCommandStatuses", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.status IN :statuses ORDER BY command.requestId, command.stageId") })
public class HostRoleCommandEntity {
private static int MAX_COMMAND_DETAIL_LENGTH = 250;
@@ -190,7 +194,7 @@ public class HostRoleCommandEntity {
}
public Role getRole() {
- return Role.valueOf(this.role);
+ return Role.valueOf(role);
}
public void setRole(Role role) {
@@ -317,29 +321,66 @@ public class HostRoleCommandEntity {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
HostRoleCommandEntity that = (HostRoleCommandEntity) o;
- if (attemptCount != null ? !attemptCount.equals(that.attemptCount) : that.attemptCount != null) return false;
- if (event != null ? !event.equals(that.event) : that.event != null) return false;
- if (exitcode != null ? !exitcode.equals(that.exitcode) : that.exitcode != null) return false;
- if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) return false;
- if (lastAttemptTime != null ? !lastAttemptTime.equals(that.lastAttemptTime) : that.lastAttemptTime != null)
+ if (attemptCount != null ? !attemptCount.equals(that.attemptCount) : that.attemptCount != null) {
+ return false;
+ }
+ if (event != null ? !event.equals(that.event) : that.event != null) {
+ return false;
+ }
+ if (exitcode != null ? !exitcode.equals(that.exitcode) : that.exitcode != null) {
+ return false;
+ }
+ if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) {
+ return false;
+ }
+ if (lastAttemptTime != null ? !lastAttemptTime.equals(that.lastAttemptTime) : that.lastAttemptTime != null) {
+ return false;
+ }
+ if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) {
+ return false;
+ }
+ if (role != null ? !role.equals(that.role) : that.role != null) {
+ return false;
+ }
+ if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) {
return false;
- if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false;
- if (role != null ? !role.equals(that.role) : that.role != null) return false;
- if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) return false;
- if (startTime != null ? !startTime.equals(that.startTime) : that.startTime != null) return false;
- if (status != null ? !status.equals(that.status) : that.status != null) return false;
- if (stdError != null ? !Arrays.equals(stdError, that.stdError) : that.stdError != null) return false;
- if (stdOut != null ? !Arrays.equals(stdOut, that.stdOut) : that.stdOut != null) return false;
- if (outputLog != null ? !outputLog.equals(that.outputLog) : that.outputLog != null) return false;
- if (errorLog != null ? !errorLog.equals(that.errorLog) : that.errorLog != null) return false;
- if (taskId != null ? !taskId.equals(that.taskId) : that.taskId != null) return false;
- if (structuredOut != null ? !Arrays.equals(structuredOut, that.structuredOut) : that.structuredOut != null) return false;
- if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != null) return false;
+ }
+ if (startTime != null ? !startTime.equals(that.startTime) : that.startTime != null) {
+ return false;
+ }
+ if (status != null ? !status.equals(that.status) : that.status != null) {
+ return false;
+ }
+ if (stdError != null ? !Arrays.equals(stdError, that.stdError) : that.stdError != null) {
+ return false;
+ }
+ if (stdOut != null ? !Arrays.equals(stdOut, that.stdOut) : that.stdOut != null) {
+ return false;
+ }
+ if (outputLog != null ? !outputLog.equals(that.outputLog) : that.outputLog != null) {
+ return false;
+ }
+ if (errorLog != null ? !errorLog.equals(that.errorLog) : that.errorLog != null) {
+ return false;
+ }
+ if (taskId != null ? !taskId.equals(that.taskId) : that.taskId != null) {
+ return false;
+ }
+ if (structuredOut != null ? !Arrays.equals(structuredOut, that.structuredOut) : that.structuredOut != null) {
+ return false;
+ }
+ if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != null) {
+ return false;
+ }
return true;
}
@@ -371,7 +412,7 @@ public class HostRoleCommandEntity {
}
public void setExecutionCommand(ExecutionCommandEntity executionCommandsByTaskId) {
- this.executionCommand = executionCommandsByTaskId;
+ executionCommand = executionCommandsByTaskId;
}
public StageEntity getStage() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index a7bc948..e87e28b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -18,14 +18,28 @@
package org.apache.ambari.server.orm.entities;
-import javax.persistence.*;
+import static org.apache.commons.lang.StringUtils.defaultString;
+
import java.util.Collection;
-import static org.apache.commons.lang.StringUtils.defaultString;
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.IdClass;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.Table;
-@IdClass(org.apache.ambari.server.orm.entities.StageEntityPK.class)
-@Table(name = "stage")
@Entity
+@Table(name = "stage")
+@IdClass(org.apache.ambari.server.orm.entities.StageEntityPK.class)
+@NamedQueries({ @NamedQuery(name = "StageEntity.findByCommandStatuses", query = "SELECT stage from StageEntity stage WHERE EXISTS (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER by stage.requestId, stage.stageId") })
public class StageEntity {
@Column(name = "cluster_id", updatable = false, nullable = false)
@@ -47,11 +61,11 @@ public class StageEntity {
@Column(name = "request_context")
@Basic
private String requestContext = "";
-
+
@Column(name = "cluster_host_info")
@Basic
private byte[] clusterHostInfo;
-
+
@Column(name = "command_params")
@Basic
private byte[] commandParamsStage;
@@ -63,7 +77,7 @@ public class StageEntity {
@ManyToOne
@JoinColumn(name = "request_id", referencedColumnName = "request_id", nullable = false)
private RequestEntity request;
-
+
@OneToMany(mappedBy = "stage", cascade = CascadeType.REMOVE, fetch = FetchType.LAZY)
private Collection<HostRoleCommandEntity> hostRoleCommands;
@@ -114,7 +128,7 @@ public class StageEntity {
public void setClusterHostInfo(String clusterHostInfo) {
this.clusterHostInfo = clusterHostInfo.getBytes();
}
-
+
public String getCommandParamsStage() {
return commandParamsStage == null ? new String() : new String(commandParamsStage);
}
@@ -139,18 +153,36 @@ public class StageEntity {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
StageEntity that = (StageEntity) o;
- if (clusterId != null ? !clusterId.equals(that.clusterId) : that.clusterId != null) return false;
- if (logInfo != null ? !logInfo.equals(that.logInfo) : that.logInfo != null) return false;
- if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false;
- if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) return false;
- if (clusterHostInfo != null ? !clusterHostInfo.equals(that.clusterHostInfo) : that.clusterHostInfo != null) return false;
- if (commandParamsStage != null ? !commandParamsStage.equals(that.commandParamsStage) : that.commandParamsStage != null) return false;
- if (hostParamsStage != null ? !hostParamsStage.equals(that.hostParamsStage) : that.hostParamsStage != null) return false;
+ if (clusterId != null ? !clusterId.equals(that.clusterId) : that.clusterId != null) {
+ return false;
+ }
+ if (logInfo != null ? !logInfo.equals(that.logInfo) : that.logInfo != null) {
+ return false;
+ }
+ if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) {
+ return false;
+ }
+ if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) {
+ return false;
+ }
+ if (clusterHostInfo != null ? !clusterHostInfo.equals(that.clusterHostInfo) : that.clusterHostInfo != null) {
+ return false;
+ }
+ if (commandParamsStage != null ? !commandParamsStage.equals(that.commandParamsStage) : that.commandParamsStage != null) {
+ return false;
+ }
+ if (hostParamsStage != null ? !hostParamsStage.equals(that.hostParamsStage) : that.hostParamsStage != null) {
+ return false;
+ }
return !(requestContext != null ? !requestContext.equals(that.requestContext) : that.requestContext != null);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index 36acbc2..d751f2d 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -17,15 +17,19 @@
*/
package org.apache.ambari.server.actionmanager;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-import com.google.inject.persist.PersistService;
-import com.google.inject.persist.UnitOfWork;
-import com.google.inject.util.Modules;
+import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.persistence.EntityManager;
+
import junit.framework.Assert;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
@@ -40,6 +44,7 @@ import org.apache.ambari.server.orm.DBAccessor;
import org.apache.ambari.server.orm.DBAccessorImpl;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.dao.DaoUtils;
import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
@@ -51,13 +56,16 @@ import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.google.inject.persist.PersistService;
+import com.google.inject.persist.UnitOfWork;
+import com.google.inject.util.Modules;
public class TestActionDBAccessorImpl {
private static final Logger log = LoggerFactory.getLogger(TestActionDBAccessorImpl.class);
@@ -73,11 +81,19 @@ public class TestActionDBAccessorImpl {
@Inject
private Clusters clusters;
+
@Inject
private ExecutionCommandDAO executionCommandDAO;
+
@Inject
private HostRoleCommandDAO hostRoleCommandDAO;
+ @Inject
+ private Provider<EntityManager> entityManagerProvider;
+
+ @Inject
+ private DaoUtils daoUtils;
+
@Before
public void setup() throws AmbariException {
InMemoryDefaultTestModule defaultTestModule = new InMemoryDefaultTestModule();
@@ -157,30 +173,106 @@ public class TestActionDBAccessorImpl {
"(command report status should be ignored)",
HostRoleStatus.ABORTED,s.getHostRoleStatus(hostname, "HBASE_MASTER"));
}
-
+
@Test
public void testGetStagesInProgress() throws AmbariException {
- String hostname = "host1";
List<Stage> stages = new ArrayList<Stage>();
- stages.add(createStubStage(hostname, requestId, stageId));
- stages.add(createStubStage(hostname, requestId, stageId + 1));
+ stages.add(createStubStage(hostName, requestId, stageId));
+ stages.add(createStubStage(hostName, requestId, stageId + 1));
Request request = new Request(stages, clusters);
db.persistActions(request);
assertEquals(2, stages.size());
}
-
+
@Test
public void testGetStagesInProgressWithFailures() throws AmbariException {
- String hostname = "host1";
- populateActionDB(db, hostname, requestId, stageId);
- populateActionDB(db, hostname, requestId+1, stageId);
- db.abortOperation(requestId);
+ populateActionDB(db, hostName, requestId, stageId);
+ populateActionDB(db, hostName, requestId + 1, stageId);
List<Stage> stages = db.getStagesInProgress();
+ assertEquals(2, stages.size());
+
+ db.abortOperation(requestId);
+ stages = db.getStagesInProgress();
assertEquals(1, stages.size());
assertEquals(requestId+1, stages.get(0).getRequestId());
}
@Test
+ public void testGetStagesInProgressWithManyStages() throws AmbariException {
+ // create 3 request; each request will have 3 stages, each stage 2 commands
+ populateActionDBMultipleStages(3, db, hostName, requestId, stageId);
+ populateActionDBMultipleStages(3, db, hostName, requestId + 1, stageId + 3);
+ populateActionDBMultipleStages(3, db, hostName, requestId + 2, stageId + 3);
+
+ // verify stages and proper ordering
+ int commandsInProgressCount = db.getCommandsInProgressCount();
+ List<Stage> stages = db.getStagesInProgress();
+ assertEquals(18, commandsInProgressCount);
+ assertEquals(9, stages.size());
+
+ long lastRequestId = Integer.MIN_VALUE;
+ for (Stage stage : stages) {
+ assertTrue(stage.getRequestId() >= lastRequestId);
+ lastRequestId = stage.getRequestId();
+ }
+
+ // cancel the first one, removing 3 stages
+ db.abortOperation(requestId);
+
+ // verify stages and proper ordering
+ commandsInProgressCount = db.getCommandsInProgressCount();
+ stages = db.getStagesInProgress();
+ assertEquals(12, commandsInProgressCount);
+ assertEquals(6, stages.size());
+
+ // find the first stage, and change one command to COMPLETED
+ stages.get(0).setHostRoleStatus(hostName, Role.HBASE_MASTER.toString(),
+ HostRoleStatus.COMPLETED);
+
+ db.hostRoleScheduled(stages.get(0), hostName, Role.HBASE_MASTER.toString());
+
+ // the first stage still has at least 1 command IN_PROGRESS
+ commandsInProgressCount = db.getCommandsInProgressCount();
+ stages = db.getStagesInProgress();
+ assertEquals(11, commandsInProgressCount);
+ assertEquals(6, stages.size());
+
+ // find the first stage, and change the other command to COMPLETED
+ stages.get(0).setHostRoleStatus(hostName,
+ Role.HBASE_REGIONSERVER.toString(), HostRoleStatus.COMPLETED);
+
+ db.hostRoleScheduled(stages.get(0), hostName,
+ Role.HBASE_REGIONSERVER.toString());
+
+ // verify stages and proper ordering
+ commandsInProgressCount = db.getCommandsInProgressCount();
+ stages = db.getStagesInProgress();
+ assertEquals(10, commandsInProgressCount);
+ assertEquals(5, stages.size());
+ }
+
+ @Test
+ public void testGetStagesInProgressWithManyCommands() throws AmbariException {
+ // 1000 hosts
+ for (int i = 0; i < 1000; i++) {
+ String hostName = "c64-" + i;
+ clusters.addHost(hostName);
+ clusters.getHost(hostName).persist();
+ }
+
+ // create 1 request, 3 stages per host, each with 2 commands
+ for (int i = 0; i < 1000; i++) {
+ String hostName = "c64-" + i;
+ populateActionDBMultipleStages(3, db, hostName, requestId + i, stageId);
+ }
+
+ int commandsInProgressCount = db.getCommandsInProgressCount();
+ List<Stage> stages = db.getStagesInProgress();
+ assertEquals(6000, commandsInProgressCount);
+ assertEquals(3000, stages.size());
+ }
+
+ @Test
public void testPersistActions() throws AmbariException {
populateActionDB(db, hostName, requestId, stageId);
for (Stage stage : db.getAllStages(requestId)) {
@@ -310,7 +402,7 @@ public class TestActionDBAccessorImpl {
populateActionDB(db, hostName, requestId + 1, stageId);
List<Long> requestIdsResult =
db.getRequestsByStatus(null, BaseRequest.DEFAULT_PAGE_SIZE, false);
-
+
assertNotNull("List of request IDs is null", requestIdsResult);
assertEquals("Request IDs not matches", requestIds, requestIdsResult);
}
@@ -488,6 +580,20 @@ public class TestActionDBAccessorImpl {
db.persistActions(request);
}
+ private void populateActionDBMultipleStages(int numberOfStages,
+ ActionDBAccessor db, String hostname, long requestId, long stageId)
+ throws AmbariException {
+
+ List<Stage> stages = new ArrayList<Stage>();
+ for (int i = 0; i < numberOfStages; i++) {
+ Stage stage = createStubStage(hostname, requestId, stageId + i);
+ stages.add(stage);
+ }
+
+ Request request = new Request(stages, clusters);
+ db.persistActions(request);
+ }
+
private Stage createStubStage(String hostname, long requestId, long stageId) {
Stage s = new Stage(requestId, "/a/b", "cluster1", 1L, "action db accessor test",
"clusterHostInfo", "commandParamsStage", "hostParamsStage");
http://git-wip-us.apache.org/repos/asf/ambari/blob/7566e570/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 7224924..e1db9f8 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -17,13 +17,21 @@
*/
package org.apache.ambari.server.actionmanager;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollectionOf;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
@@ -36,9 +44,8 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import com.google.common.reflect.TypeToken;
-import com.google.inject.persist.UnitOfWork;
import junit.framework.Assert;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
@@ -69,7 +76,6 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEve
import org.apache.ambari.server.utils.StageUtils;
import org.easymock.Capture;
import org.easymock.EasyMock;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
@@ -77,6 +83,9 @@ import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.reflect.TypeToken;
+import com.google.inject.persist.UnitOfWork;
+
public class TestActionScheduler {
private static final Logger log = LoggerFactory.getLogger(TestActionScheduler.class);
@@ -96,7 +105,7 @@ public class TestActionScheduler {
*/
@Test
public void testActionSchedule() throws Exception {
-
+
Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
@@ -116,7 +125,7 @@ public class TestActionScheduler {
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-
+
Host host = mock(Host.class);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
@@ -132,6 +141,7 @@ public class TestActionScheduler {
Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
stages.add(s);
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
Request request = mock(Request.class);
@@ -167,7 +177,7 @@ public class TestActionScheduler {
int expectedQueueSize, ActionScheduler scheduler) {
int cycleCount = 0;
while (cycleCount++ <= MAX_CYCLE_ITERATIONS) {
- List<AgentCommand> ac = aq.dequeueAll(hostname);
+ List<AgentCommand> ac = aq.dequeueAll(hostname);
if (ac != null) {
if (ac.size() == expectedQueueSize) {
return ac;
@@ -220,6 +230,7 @@ public class TestActionScheduler {
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
@@ -292,7 +303,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer() {
@Override
@@ -379,7 +390,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer() {
@@ -508,7 +519,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer() {
@Override
@@ -572,7 +583,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer() {
@Override
@@ -680,7 +691,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
Properties properties = new Properties();
@@ -764,7 +775,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
Properties properties = new Properties();
@@ -805,7 +816,7 @@ public class TestActionScheduler {
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-
+
String hostname1 = "ahost.ambari.apache.org";
String hostname2 = "bhost.ambari.apache.org";
HashMap<String, ServiceComponentHost> hosts =
@@ -813,48 +824,48 @@ public class TestActionScheduler {
hosts.put(hostname1, sch);
hosts.put(hostname2, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
-
+
List<Stage> stages = new ArrayList<Stage>();
Stage backgroundStage = null;
stages.add(//stage with background command
backgroundStage = getStageWithSingleTask(
hostname1, "cluster1", Role.NAMENODE, RoleCommand.CUSTOM_COMMAND, "REBALANCEHDFS", Service.Type.HDFS, 1, 1, 1));
-
+
Assert.assertEquals(AgentCommandType.BACKGROUND_EXECUTION_COMMAND ,backgroundStage.getExecutionCommands(hostname1).get(0).getExecutionCommand().getCommandType());
-
+
stages.add( // Stage with the same hostname, should be scheduled
getStageWithSingleTask(
hostname1, "cluster1", Role.GANGLIA_MONITOR,
RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
-
+
stages.add(
getStageWithSingleTask(
hostname2, "cluster1", Role.DATANODE,
RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
-
-
+
+
ActionDBAccessor db = mock(ActionDBAccessor.class);
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
-
+
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), new ServerActionManagerImpl(fsm),
unitOfWork, conf);
-
+
ActionManager am = new ActionManager(
2, 2, aq, fsm, db, new HostsMap((String) null),
new ServerActionManagerImpl(fsm), unitOfWork,
requestFactory, conf);
-
+
scheduler.doWork();
-
+
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "NAMENODE"));
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
@@ -901,7 +912,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer() {
@Override
@@ -1082,7 +1093,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer() {
@Override
@@ -1259,7 +1270,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer() {
@Override
@@ -1419,7 +1430,7 @@ public class TestActionScheduler {
assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.NAMENODE)));
assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.GANGLIA_SERVER)));
}
-
+
@Test
public void testSuccessCriteria() {
RoleStats rs1 = new RoleStats(1, (float)0.5);
@@ -1427,37 +1438,37 @@ public class TestActionScheduler {
assertTrue(rs1.isSuccessFactorMet());
rs1.numSucceeded = 0;
assertFalse(rs1.isSuccessFactorMet());
-
+
RoleStats rs2 = new RoleStats(2, (float)0.5);
rs2.numSucceeded = 1;
assertTrue(rs2.isSuccessFactorMet());
-
+
RoleStats rs3 = new RoleStats(3, (float)0.5);
rs3.numSucceeded = 2;
assertTrue(rs2.isSuccessFactorMet());
rs3.numSucceeded = 1;
assertFalse(rs3.isSuccessFactorMet());
-
+
RoleStats rs4 = new RoleStats(3, (float)1.0);
rs4.numSucceeded = 2;
assertFalse(rs3.isSuccessFactorMet());
}
-
+
/**
* This test sends verifies that ActionScheduler returns up-to-date cluster host info and caching works correctly.
*/
@Test
public void testClusterHostInfoCache() throws Exception {
-
+
Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
-
+
//Data for stages
Map<String, Set<String>> clusterHostInfo1 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
Map<String, Set<String>> clusterHostInfo2 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO_UPDATED, type);
int stageId = 1;
int requestId1 = 1;
int requestId2 = 2;
-
+
ActionQueue aq = new ActionQueue();
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
@@ -1492,6 +1503,7 @@ public class TestActionScheduler {
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
Stage s2 = StageUtils.getATestStage(requestId2, stageId, hostname, CLUSTER_HOST_INFO_UPDATED,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
+ when(db.getCommandsInProgressCount()).thenReturn(1);
when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s1));
//Keep large number of attempts so that the task is not expired finally
@@ -1504,12 +1516,12 @@ public class TestActionScheduler {
assertTrue(ac.get(0) instanceof ExecutionCommand);
assertEquals(String.valueOf(requestId1) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId());
-
+
assertEquals(clusterHostInfo1, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
-
+ when(db.getCommandsInProgressCount()).thenReturn(1);
when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2));
-
+
//Verify that ActionSheduler does not return cached value of cluster host info for new requestId
ac = waitForQueueSize(hostname, aq, 1, scheduler);
assertTrue(ac.get(0) instanceof ExecutionCommand);
@@ -1572,7 +1584,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
@@ -1655,7 +1667,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer() {
@Override
@@ -1728,7 +1740,7 @@ public class TestActionScheduler {
Request request = mock(Request.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequest(anyLong())).thenReturn(request);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
@@ -1894,7 +1906,7 @@ public class TestActionScheduler {
when(host3.getHostName()).thenReturn(hostname);
ActionDBAccessor db = mock(ActionDBAccessor.class);
-
+ when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
when(db.getStagesInProgress()).thenReturn(stagesInProgress);
List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();