You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/01/09 14:26:48 UTC
[13/50] [abbrv] Merge branch 'api_refactoring' into javelin
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/api/ApiDispatcher.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/api/ApiDispatcher.java
index 3c1b0b0,60d8836..885cf87
--- a/server/src/com/cloud/api/ApiDispatcher.java
+++ b/server/src/com/cloud/api/ApiDispatcher.java
@@@ -27,15 -30,19 +30,20 @@@ import java.util.Map
import java.util.StringTokenizer;
import java.util.regex.Matcher;
- import javax.inject.Inject;
-
+ import com.cloud.dao.EntityManager;
+ import org.apache.cloudstack.acl.ControlledEntity;
+ import org.apache.cloudstack.acl.InfrastructureEntity;
+ import org.apache.cloudstack.acl.Role;
+ import org.apache.cloudstack.api.*;
import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
- import com.cloud.api.BaseCmd.CommandType;
- import com.cloud.api.commands.ListEventsCmd;
+ import org.apache.cloudstack.api.BaseCmd.CommandType;
+ import org.apache.cloudstack.api.command.user.event.ListEventsCmd;
import com.cloud.async.AsyncCommandQueued;
import com.cloud.async.AsyncJobManager;
+ import com.cloud.configuration.Config;
+ import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.AccountLimitException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.InvalidParameterValueException;
@@@ -118,10 -148,54 +149,54 @@@ public class ApiDispatcher
}
}
+ private void checkACLOnCommand(BaseCmd cmd) {
+ // TODO Auto-generated method stub
+ //need to write an commandACLChecker adapter framework to check ACL on commands - default one will use the static roles by referring to commands.properties.
+ //one can write another commandACLChecker to check access via custom roles.
+ }
+
+ private List<Role> determineRole(Account caller) {
+ // TODO Auto-generated method stub
+ List<Role> effectiveRoles = new ArrayList<Role>();
+ return effectiveRoles;
+
+ }
+
+ private void doAccessChecks(BaseCmd cmd, List<Object> entitiesToAccess) {
+ //owner
+ Account caller = UserContext.current().getCaller();
+ Account owner = _accountMgr.getActiveAccountById(cmd.getEntityOwnerId());
+
+ // REMOVE ME:
+ // List<Role> callerRoles = determineRole(caller);
+ // List<Role> ownerRoles = determineRole(owner);
+ // check permission to call this command for the caller
+ // this needs checking of static roles of the caller
+ // Role based acl is done in ApiServer before api gets to ApiDispatcher
+ // checkACLOnCommand(cmd);
+
+ if(cmd instanceof BaseAsyncCreateCmd) {
+ //check that caller can access the owner account.
+ _accountMgr.checkAccess(caller, null, true, owner);
+ }
+
+ if(!entitiesToAccess.isEmpty()){
+ //check that caller can access the owner account.
+ _accountMgr.checkAccess(caller, null, true, owner);
+ for(Object entity : entitiesToAccess) {
+ if (entity instanceof ControlledEntity) {
+ _accountMgr.checkAccess(caller, null, true, (ControlledEntity) entity);
+ }
+ else if (entity instanceof InfrastructureEntity) {
+ //do something here:D
+ }
+ }
+ }
+ }
+
- public void dispatch(BaseCmd cmd, Map<String, String> params) {
+ public void dispatch(BaseCmd cmd, Map<String, String> params) {
- setupParameters(cmd, params);
- ApiDispatcher.plugService(cmd);
try {
+ processParameters(cmd, params);
UserContext ctx = UserContext.current();
ctx.setAccountId(cmd.getEntityOwnerId());
if (cmd instanceof BaseAsyncCmd) {
@@@ -345,7 -435,155 +436,155 @@@
// and IllegalAccessException setting one of the parameters.
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Internal error executing API command " + cmd.getCommandName().substring(0, cmd.getCommandName().length() - 8));
}
+
+ //check access on the resource this field points to
+ try {
+ ACL checkAccess = field.getAnnotation(ACL.class);
+ CommandType fieldType = parameterAnnotation.type();
+
+ if (checkAccess != null) {
+ // Verify that caller can perform actions in behalf of vm owner
+ //acumulate all Controlled Entities together.
+
+ //parse the array of resource types and in case of map check access on key or value or both as specified in @acl
+ //implement external dao for classes that need findByName
+ //for maps, specify access to be checkd on key or value.
+
+ // find the controlled entity DBid by uuid
+ if (parameterAnnotation.entityType() != null) {
+ Class<?>[] entityList = parameterAnnotation.entityType()[0].getAnnotation(EntityReference.class).value();
+
+ for (Class entity : entityList) {
+ // Check if the parameter type is a single
+ // Id or list of id's/name's
+ switch (fieldType) {
+ case LIST:
+ CommandType listType = parameterAnnotation.collectionType();
+ switch (listType) {
+ case LONG:
+ case UUID:
+ List<Long> listParam = new ArrayList<Long>();
+ listParam = (List) field.get(cmd);
+ for (Long entityId : listParam) {
+ Object entityObj = s_instance._entityMgr.findById(entity, (Long) field.get(cmd));
+ entitiesToAccess.add(entityObj);
+ }
+ break;
+ /*
+ * case STRING: List<String> listParam =
+ * new ArrayList<String>(); listParam =
+ * (List)field.get(cmd); for(String
+ * entityName: listParam){
+ * ControlledEntity entityObj =
+ * (ControlledEntity
+ * )daoClassInstance(entityId);
+ * entitiesToAccess.add(entityObj); }
+ * break;
+ */
+ default:
+ break;
- }
+ }
+ break;
+ case LONG:
+ case UUID:
+ Object entityObj = s_instance._entityMgr.findById(entity, (Long) field.get(cmd));
+ entitiesToAccess.add(entityObj);
+ break;
+ default:
+ break;
+ }
+
+ if (ControlledEntity.class.isAssignableFrom(entity)) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("ControlledEntity name is:" + entity.getName());
+ }
+ }
+
+ if (InfrastructureEntity.class.isAssignableFrom(entity)) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("InfrastructureEntity name is:" + entity.getName());
+ }
+ }
+ }
+
+ }
+
+ }
+
+ } catch (IllegalArgumentException e) {
+ s_logger.error("Error initializing command " + cmd.getCommandName() + ", field " + field.getName() + " is not accessible.");
+ throw new CloudRuntimeException("Internal error initializing parameters for command " + cmd.getCommandName() + " [field " + field.getName() + " is not accessible]");
+ } catch (IllegalAccessException e) {
+ s_logger.error("Error initializing command " + cmd.getCommandName() + ", field " + field.getName() + " is not accessible.");
+ throw new CloudRuntimeException("Internal error initializing parameters for command " + cmd.getCommandName() + " [field " + field.getName() + " is not accessible]");
+ }
+
+ }
+
+ //check access on the entities.
+ s_instance.doAccessChecks(cmd, entitiesToAccess);
+
+ }
+
+ private static Long translateUuidToInternalId(String uuid, Parameter annotation)
+ {
+ if (uuid.equals("-1")) {
+ // FIXME: This is to handle a lot of hardcoded special cases where -1 is sent
+ // APITODO: Find and get rid of all hardcoded params in API Cmds and service layer
+ return -1L;
+ }
+ Long internalId = null;
+ // If annotation's empty, the cmd existed before 3.x try conversion to long
+ // FIXME: Fails if someone adds since field for any pre 3.x apis
+ boolean isPre3x = annotation.since().isEmpty();
+ // Match against Java's UUID regex to check if input is uuid string
+ boolean isUuid = uuid.matches("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$");
+ // Enforce that it's uuid for newly added apis from version 3.x
+ if (!isPre3x && !isUuid)
+ return null;
+ // Allow both uuid and internal id for pre3x apis
+ if (isPre3x && !isUuid) {
+ try {
+ internalId = Long.parseLong(uuid);
+ } catch(NumberFormatException e) {
+ // In case regex failed, and it's still uuid string
+ internalId = null;
+ }
+ if (internalId != null)
+ return internalId;
+ }
+ // There may be multiple entities defined on the @EntityReference of a Response.class
+ // UUID CommandType would expect only one entityType, so use the first entityType
+ Class<?>[] entities = annotation.entityType()[0].getAnnotation(EntityReference.class).value();
+ // Go through each entity which is an interface to a VO class and get a VO object
+ // Try to getId() for the object using reflection, break on first non-null value
+ for (Class<?> entity: entities) {
+ // findByUuid returns one VO object using uuid, use reflect to get the Id
+ Object objVO = s_instance._entityMgr.findByUuid(entity, uuid);
+ if (objVO == null) {
+ continue;
+ }
+ // Invoke the getId method, get the internal long ID
+ // If that fails hide exceptions as the uuid may not exist
+ try {
+ internalId = (Long) ((InternalIdentity)objVO).getId();
+ } catch (IllegalArgumentException e) {
+ } catch (NullPointerException e) {
+ }
+ // Return on first non-null Id for the uuid entity
+ if (internalId != null)
+ break;
+ }
+ if (internalId == null) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Object entity with uuid=" + uuid + " does not exist in the database.");
+ }
+ if (annotation.required()) {
+ throw new InvalidParameterValueException("Invalid parameter with uuid=" + uuid
+ + ". Entity not found, or an annotation bug.");
+ }
+ }
+ return internalId;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@@ -389,45 -627,58 +628,58 @@@
}
break;
case FLOAT:
+ // Assuming that the parameters have been checked for required before now,
+ // we ignore blank or null values and defer to the command to set a default
+ // value for optional parameters ...
+ if (paramObj != null && isNotBlank(paramObj.toString())) {
- field.set(cmdObj, Float.valueOf(paramObj.toString()));
+ field.set(cmdObj, Float.valueOf(paramObj.toString()));
+ }
break;
case INTEGER:
+ // Assuming that the parameters have been checked for required before now,
+ // we ignore blank or null values and defer to the command to set a default
+ // value for optional parameters ...
+ if (paramObj != null && isNotBlank(paramObj.toString())) {
- field.set(cmdObj, Integer.valueOf(paramObj.toString()));
+ field.set(cmdObj, Integer.valueOf(paramObj.toString()));
+ }
break;
- case LIST:
- List listParam = new ArrayList();
- StringTokenizer st = new StringTokenizer(paramObj.toString(), ",");
- while (st.hasMoreTokens()) {
- String token = st.nextToken();
- CommandType listType = annotation.collectionType();
- switch (listType) {
- case INTEGER:
- listParam.add(Integer.valueOf(token));
- break;
+ case LIST:
+ List listParam = new ArrayList();
+ StringTokenizer st = new StringTokenizer(paramObj.toString(), ",");
+ while (st.hasMoreTokens()) {
+ String token = st.nextToken();
+ CommandType listType = annotation.collectionType();
+ switch (listType) {
+ case INTEGER:
+ listParam.add(Integer.valueOf(token));
+ break;
+ case UUID:
+ if (token.isEmpty())
+ break;
+ Long internalId = translateUuidToInternalId(token, annotation);
+ listParam.add(internalId);
+ break;
- case LONG: {
+ case LONG: {
- Long val = null;
- if (identityMapper != null)
- val = s_instance._identityDao.getIdentityId(identityMapper, token);
- else
- val = Long.valueOf(token);
-
- listParam.add(val);
+ listParam.add(Long.valueOf(token));
- }
- break;
- case SHORT:
- listParam.add(Short.valueOf(token));
- case STRING:
- listParam.add(token);
- break;
- }
}
- field.set(cmdObj, listParam);
- break;
+ break;
+ case SHORT:
+ listParam.add(Short.valueOf(token));
+ case STRING:
+ listParam.add(token);
+ break;
+ }
+ }
+ field.set(cmdObj, listParam);
+ break;
+ case UUID:
+ if (paramObj.toString().isEmpty())
+ break;
+ Long internalId = translateUuidToInternalId(paramObj.toString(), annotation);
+ field.set(cmdObj, internalId);
+ break;
case LONG:
- if (identityMapper != null)
- field.set(cmdObj, s_instance._identityDao.getIdentityId(identityMapper, paramObj.toString()));
- else
- field.set(cmdObj, Long.valueOf(paramObj.toString()));
+ field.set(cmdObj, Long.valueOf(paramObj.toString()));
break;
case SHORT:
field.set(cmdObj, Short.valueOf(paramObj.toString()));
@@@ -467,46 -718,31 +719,31 @@@
return cal.getTime();
}
- public static void plugService(BaseCmd cmd) {
-
- if (!ApiServer.isPluggableServiceCommand(cmd.getClass().getName())) {
- return;
- }
- Class<?> clazz = cmd.getClass();
+ public static void plugService(Field field, BaseCmd cmd) {
ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name);
- do {
- Field[] fields = clazz.getDeclaredFields();
- for (Field field : fields) {
- PlugService plugService = field.getAnnotation(PlugService.class);
- if (plugService == null) {
- continue;
- }
+
- Class<?> fc = field.getType();
- Object instance = null;
- if (PluggableService.class.isAssignableFrom(fc)) {
- instance = locator.getPluggableService(fc);
- }
+ Class<?> fc = field.getType();
+ Object instance = null;
+ if (PluggableService.class.isAssignableFrom(fc)) {
+ instance = locator.getPluggableService(fc);
+ }
- if (instance == null) {
+ if (instance == null) {
- throw new CloudRuntimeException("Unable to plug service " + fc.getSimpleName() + " in command " + clazz.getSimpleName());
+ throw new CloudRuntimeException("Unable to plug service " + fc.getSimpleName() + " in command " + cmd.getClass().getSimpleName());
- }
-
- try {
- field.setAccessible(true);
- field.set(cmd, instance);
- } catch (IllegalArgumentException e) {
- s_logger.error("IllegalArgumentException at plugService for command " + cmd.getCommandName() + ", field " + field.getName());
- throw new CloudRuntimeException("Internal error at plugService for command " + cmd.getCommandName() + " [Illegal argumet at field " + field.getName() + "]");
- } catch (IllegalAccessException e) {
- s_logger.error("Error at plugService for command " + cmd.getCommandName() + ", field " + field.getName() + " is not accessible.");
- throw new CloudRuntimeException("Internal error at plugService for command " + cmd.getCommandName() + " [field " + field.getName() + " is not accessible]");
- }
- }
+ }
+ try {
+ field.setAccessible(true);
+ field.set(cmd, instance);
+ } catch (IllegalArgumentException e) {
+ s_logger.error("IllegalArgumentException at plugService for command " + cmd.getCommandName() + ", field " + field.getName());
+ throw new CloudRuntimeException("Internal error at plugService for command " + cmd.getCommandName() + " [Illegal argumet at field " + field.getName() + "]");
+ } catch (IllegalAccessException e) {
+ s_logger.error("Error at plugService for command " + cmd.getCommandName() + ", field " + field.getName() + " is not accessible.");
+ throw new CloudRuntimeException("Internal error at plugService for command " + cmd.getCommandName() + " [field " + field.getName() + " is not accessible]");
+ }
+ }
- clazz = clazz.getSuperclass();
- } while (clazz != Object.class && clazz != null);
- }
-
+
public static Long getIdentiyId(String tableName, String token) {
return s_instance._identityDao.getIdentityId(tableName, token);
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/api/ApiServer.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/api/ApiServer.java
index 457bba4,6209171..8a88cbb
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@@ -81,11 -88,17 +89,18 @@@ import org.apache.http.protocol.Respons
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
+ import org.apache.cloudstack.api.command.admin.host.ListHostsCmd;
+ import org.apache.cloudstack.api.command.admin.router.ListRoutersCmd;
+ import org.apache.cloudstack.api.command.admin.user.ListUsersCmd;
+ import org.apache.cloudstack.api.command.user.project.ListProjectInvitationsCmd;
+ import org.apache.cloudstack.api.command.user.project.ListProjectsCmd;
+ import org.apache.cloudstack.api.command.user.securitygroup.ListSecurityGroupsCmd;
+ import org.apache.cloudstack.api.command.user.tag.ListTagsCmd;
import com.cloud.api.response.ApiResponseSerializer;
- import com.cloud.api.response.ExceptionResponse;
- import com.cloud.api.response.ListResponse;
+ import org.apache.cloudstack.api.response.ExceptionResponse;
+ import org.apache.cloudstack.api.response.ListResponse;
import com.cloud.async.AsyncJob;
import com.cloud.async.AsyncJobManager;
import com.cloud.async.AsyncJobVO;
@@@ -277,12 -186,13 +188,13 @@@ public class ApiServer implements HttpR
_systemAccount = _accountMgr.getSystemAccount();
_systemUser = _accountMgr.getSystemUser();
_dispatcher = ApiDispatcher.getInstance();
-
+
Integer apiPort = null; // api port, null by default
-
- SearchCriteria<ConfigurationVO> sc = _configDao.createSearchCriteria();
+ ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name);
+ ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
+ SearchCriteria<ConfigurationVO> sc = configDao.createSearchCriteria();
sc.addAnd("name", SearchCriteria.Op.EQ, "integration.api.port");
- List<ConfigurationVO> values = configDao.search(sc, null);
+ List<ConfigurationVO> values = _configDao.search(sc, null);
if ((values != null) && (values.size() > 0)) {
ConfigurationVO apiPortConfig = values.get(0);
if (apiPortConfig.getValue() != null) {
@@@ -434,9 -356,9 +358,9 @@@
} catch (Exception ex) {
if (ex instanceof InvalidParameterValueException) {
InvalidParameterValueException ref = (InvalidParameterValueException)ex;
- ServerApiException e = new ServerApiException(BaseCmd.PARAM_ERROR, ex.getMessage());
+ ServerApiException e = new ServerApiException(BaseCmd.PARAM_ERROR, ex.getMessage());
// copy over the IdentityProxy information as well and throw the serverapiexception.
- ArrayList<IdentityProxy> idList = ref.getIdProxyList();
+ ArrayList<String> idList = ref.getIdProxyList();
if (idList != null) {
// Iterate through entire arraylist and copy over each proxy id.
for (int i = 0 ; i < idList.size(); i++) {
@@@ -576,15 -506,21 +508,21 @@@
if (job.getInstanceId() == null) {
continue;
}
+ String instanceUuid = ApiDBUtils.findJobInstanceUuid(job);
+ if (instanceUuid != null) {
+ objectJobMap.put(instanceUuid, job);
+ }
+ }
+
- for (ResponseObject response : responses) {
+ for (ResponseObject response : responses) {
- if (response.getObjectId() != null && job.getInstanceId().longValue() == response.getObjectId().longValue()) {
- response.setJobId(job.getId());
+ if (response.getObjectId() != null && objectJobMap.containsKey(response.getObjectId())) {
+ AsyncJob job = objectJobMap.get(response.getObjectId());
+ response.setJobId(job.getUuid());
- response.setJobStatus(job.getStatus());
+ response.setJobStatus(job.getStatus());
+ }
}
}
}
-- }
private void buildAuditTrail(StringBuffer auditTrailSb, String command, String result) {
if (result == null) {
@@@ -1060,34 -971,31 +973,31 @@@
// Cast the exception appropriately and retrieve the IdentityProxy
if (ex instanceof ServerApiException) {
ServerApiException ref = (ServerApiException) ex;
- ArrayList<IdentityProxy> idList = ref.getIdProxyList();
+ ArrayList<String> idList = ref.getIdProxyList();
if (idList != null) {
for (int i=0; i < idList.size(); i++) {
- IdentityProxy id = idList.get(i);
- apiResponse.addProxyObject(id.getTableName(), id.getValue(), id.getidFieldName());
+ apiResponse.addProxyObject(idList.get(i));
- }
+ }
}
// Also copy over the cserror code and the function/layer in which it was thrown.
apiResponse.setCSErrorCode(ref.getCSErrorCode());
} else if (ex instanceof PermissionDeniedException) {
PermissionDeniedException ref = (PermissionDeniedException) ex;
- ArrayList<IdentityProxy> idList = ref.getIdProxyList();
+ ArrayList<String> idList = ref.getIdProxyList();
if (idList != null) {
for (int i=0; i < idList.size(); i++) {
- IdentityProxy id = idList.get(i);
- apiResponse.addProxyObject(id.getTableName(), id.getValue(), id.getidFieldName());
+ apiResponse.addProxyObject(idList.get(i));
- }
+ }
}
// Also copy over the cserror code and the function/layer in which it was thrown.
apiResponse.setCSErrorCode(ref.getCSErrorCode());
} else if (ex instanceof InvalidParameterValueException) {
InvalidParameterValueException ref = (InvalidParameterValueException) ex;
- ArrayList<IdentityProxy> idList = ref.getIdProxyList();
+ ArrayList<String> idList = ref.getIdProxyList();
if (idList != null) {
for (int i=0; i < idList.size(); i++) {
- IdentityProxy id = idList.get(i);
- apiResponse.addProxyObject(id.getTableName(), id.getValue(), id.getidFieldName());
+ apiResponse.addProxyObject(idList.get(i));
- }
+ }
}
// Also copy over the cserror code and the function/layer in which it was thrown.
apiResponse.setCSErrorCode(ref.getCSErrorCode());
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/async/AsyncJobManagerImpl.java
index 00546fb,7bf5c5a..e4016f7
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@@ -34,9 -34,9 +34,10 @@@ import java.util.concurrent.TimeUnit
import javax.ejb.Local;
import javax.naming.ConfigurationException;
+ import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
+import org.springframework.stereotype.Component;
import com.cloud.api.ApiDispatcher;
import com.cloud.api.ApiGsonHelper;
@@@ -74,46 -73,36 +74,47 @@@ import com.cloud.utils.mgmt.JmxUtil
import com.cloud.utils.net.MacAddress;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-
-@Local(value={AsyncJobManager.class})
-public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListener {
- public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class.getName());
+
+@Component
+@Local(value={AsyncJobManager.class})
+public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListener {
+ public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class.getName());
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
-
- private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
+
+ private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
private static final int HEARTBEAT_INTERVAL = 2000;
private static final int GC_INTERVAL = 10000; // 10 seconds
-
- private String _name;
-
- private AsyncJobExecutorContext _context;
- private SyncQueueManager _queueMgr;
+
+ private String _name;
+
+ private AsyncJobExecutorContext _context;
+ private SyncQueueManager _queueMgr;
private ClusterManager _clusterMgr;
- private AccountManager _accountMgr;
+ private AccountManager _accountMgr;
private AccountDao _accountDao;
private AsyncJobDao _jobDao;
- private long _jobExpireSeconds = 86400; // 1 day
+ private long _jobExpireSeconds = 86400; // 1 day
- private long _jobCancelThresholdSeconds = 3600; // 1 hour
+ private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs)
+
- private ApiDispatcher _dispatcher;
-
- private final ScheduledExecutorService _heartbeatScheduler =
- Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
- private ExecutorService _executor;
-
- @Override
- public AsyncJobExecutorContext getExecutorContext() {
- return _context;
+ private ApiDispatcher _dispatcher;
+
+ private final ScheduledExecutorService _heartbeatScheduler =
+ Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
+ private ExecutorService _executor;
+
+ @Override
+ public AsyncJobExecutorContext getExecutorContext() {
+ return _context;
+ }
+
+ @Override
+ public AsyncJobVO getAsyncJob(long jobId) {
+ return _jobDao.findById(jobId);
+ }
+
+ @Override
+ public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
+ return _jobDao.findInstancePendingAsyncJob(instanceType, instanceId);
}
@Override
@@@ -181,81 -180,81 +182,81 @@@
if (resultObject != null) {
job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject));
- }
-
- job.setLastUpdated(DateUtil.currentGMTTime());
- _jobDao.update(jobId, job);
- txt.commit();
- } catch(Exception e) {
- s_logger.error("Unexpected exception while completing async job-" + jobId, e);
- txt.rollback();
- }
- }
-
- @Override @DB
- public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject) {
+ }
+
+ job.setLastUpdated(DateUtil.currentGMTTime());
+ _jobDao.update(jobId, job);
+ txt.commit();
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception while completing async job-" + jobId, e);
+ txt.rollback();
+ }
+ }
+
+ @Override @DB
+ public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject) {
if(s_logger.isDebugEnabled()) {
- s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus +
+ s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus +
", result: " + resultObject);
- }
-
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
- AsyncJobVO job = _jobDao.findById(jobId);
- if(job == null) {
+ }
+
+ Transaction txt = Transaction.currentTxn();
+ try {
+ txt.start();
+ AsyncJobVO job = _jobDao.findById(jobId);
+ if(job == null) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
- }
-
- txt.rollback();
- return;
- }
-
- job.setProcessStatus(processStatus);
+ }
+
+ txt.rollback();
+ return;
+ }
+
+ job.setProcessStatus(processStatus);
if(resultObject != null) {
job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject));
- }
- job.setLastUpdated(DateUtil.currentGMTTime());
- _jobDao.update(jobId, job);
- txt.commit();
- } catch(Exception e) {
- s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
- txt.rollback();
- }
- }
-
- @Override @DB
- public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
- ", instanceId: " + instanceId);
- }
-
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
-
- AsyncJobVO job = _jobDao.createForUpdate();
- //job.setInstanceType(instanceType);
- job.setInstanceId(instanceId);
- job.setLastUpdated(DateUtil.currentGMTTime());
- _jobDao.update(jobId, job);
-
- txt.commit();
- } catch(Exception e) {
- s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
- txt.rollback();
- }
- }
-
- @Override
+ }
+ job.setLastUpdated(DateUtil.currentGMTTime());
+ _jobDao.update(jobId, job);
+ txt.commit();
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
+ txt.rollback();
+ }
+ }
+
+ @Override @DB
+ public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
+ ", instanceId: " + instanceId);
+ }
+
+ Transaction txt = Transaction.currentTxn();
+ try {
+ txt.start();
+
+ AsyncJobVO job = _jobDao.createForUpdate();
+ //job.setInstanceType(instanceType);
+ job.setInstanceId(instanceId);
+ job.setLastUpdated(DateUtil.currentGMTTime());
+ _jobDao.update(jobId, job);
+
+ txt.commit();
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
+ txt.rollback();
+ }
+ }
+
+ @Override
- public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId) {
+ public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) {
- // This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router,
- // when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this).
- // This method will get called every time their business logic executes. The first time it exectues for a job
- // there will be no sync source, but on subsequent execution there will be a sync souce. If this is the first
- // time the job executes we queue the job, otherwise we just return so that the business logic can execute.
+ // This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router,
+ // when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this).
+ // This method will get called every time their business logic executes. The first time it exectues for a job
+ // there will be no sync source, but on subsequent execution there will be a sync souce. If this is the first
+ // time the job executes we queue the job, otherwise we just return so that the business logic can execute.
if (job.getSyncSource() != null) {
return;
}
@@@ -270,9 -269,9 +271,9 @@@
// we retry five times until we throw an exception
Random random = new Random();
- for(int i = 0; i < 5; i++) {
+ for(int i = 0; i < 5; i++) {
- queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId());
+ queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
- if(queue != null) {
+ if(queue != null) {
break;
}
@@@ -599,65 -598,78 +600,78 @@@
return new Runnable() {
@Override
public void run() {
- GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
- try {
- if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
- try {
- reallyRun();
- } finally {
- scanLock.unlock();
- }
- }
- } finally {
- scanLock.releaseRef();
- }
- }
-
+ GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
+ try {
+ if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
+ try {
+ reallyRun();
+ } finally {
+ scanLock.unlock();
+ }
+ }
+ } finally {
+ scanLock.releaseRef();
+ }
+ }
+
- private void reallyRun() {
+ public void reallyRun() {
- try {
- s_logger.trace("Begin cleanup expired async-jobs");
-
- Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000);
-
- // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
- // hopefully this will be fast enough to balance potential growth of job table
- List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
- if(l != null && l.size() > 0) {
- for(AsyncJobVO job : l) {
+ try {
+ s_logger.trace("Begin cleanup expired async-jobs");
+
+ Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000);
+
+ // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
+ // hopefully this will be fast enough to balance potential growth of job table
+ List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
+ if(l != null && l.size() > 0) {
+ for(AsyncJobVO job : l) {
- _jobDao.expunge(job.getId());
+ expungeAsyncJob(job);
- }
- }
-
+ }
+ }
+
- // forcely cancel blocking queue items if they've been staying there for too long
+ // forcefully cancel blocking queue items if they've been staying there for too long
- List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
- if(blockItems != null && blockItems.size() > 0) {
- for(SyncQueueItemVO item : blockItems) {
+ List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
+ if(blockItems != null && blockItems.size() > 0) {
+ for(SyncQueueItemVO item : blockItems) {
- if(item.getContentType().equalsIgnoreCase("AsyncJob")) {
- completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
+ if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
+ completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0,
+ getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
}
-
- // purge the item and resume queue processing
- _queueMgr.purgeItem(item.getId());
- }
- }
-
- s_logger.trace("End cleanup expired async-jobs");
- } catch(Throwable e) {
- s_logger.error("Unexpected exception when trying to execute queue item, ", e);
- } finally {
- StackMaid.current().exitCleanup();
- }
- }
+
+ // purge the item and resume queue processing
+ _queueMgr.purgeItem(item.getId());
+ }
+ }
+
+ s_logger.trace("End cleanup expired async-jobs");
+ } catch(Throwable e) {
+ s_logger.error("Unexpected exception when trying to execute queue item, ", e);
+ } finally {
+ StackMaid.current().exitCleanup();
+ }
+ }
+
+
- };
- }
-
+ };
+ }
+
+ @DB
+ protected void expungeAsyncJob(AsyncJobVO job) {
+ Transaction txn = Transaction.currentTxn();
+ txn.start();
+ _jobDao.expunge(job.getId());
+ //purge corresponding sync queue item
+ _queueMgr.purgeAsyncJobQueueItemId(job.getId());
+ txn.commit();
+ }
+
- private long getMsid() {
- if(_clusterMgr != null) {
+ private long getMsid() {
+ if(_clusterMgr != null) {
return _clusterMgr.getManagementNodeId();
- }
-
- return MacAddress.getMacAddress().toLong();
- }
+ }
+
+ return MacAddress.getMacAddress().toLong();
+ }
private void cleanupPendingJobs(List<SyncQueueItemVO> l) {
if(l != null && l.size() > 0) {
@@@ -665,61 -677,61 +679,61 @@@
if(s_logger.isInfoEnabled()) {
s_logger.info("Discard left-over queue item: " + item.toString());
}
-
- String contentType = item.getContentType();
+
+ String contentType = item.getContentType();
- if(contentType != null && contentType.equals("AsyncJob")) {
+ if(contentType != null && contentType.equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
- Long jobId = item.getContentId();
- if(jobId != null) {
- s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
- completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
- }
- }
- _queueMgr.purgeItem(item.getId());
- }
- }
- }
-
- @Override
- public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
- _name = name;
-
- ComponentLocator locator = ComponentLocator.getCurrentLocator();
-
- ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
- if (configDao == null) {
- throw new ConfigurationException("Unable to get the configuration dao.");
- }
-
- int expireMinutes = NumbersUtil.parseInt(
- configDao.getValue(Config.JobExpireMinutes.key()), 24*60);
- _jobExpireSeconds = (long)expireMinutes*60;
-
- _jobCancelThresholdSeconds = NumbersUtil.parseInt(
- configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60);
- _jobCancelThresholdSeconds *= 60;
-
- _accountDao = locator.getDao(AccountDao.class);
- if (_accountDao == null) {
- throw new ConfigurationException("Unable to get " + AccountDao.class.getName());
- }
- _jobDao = locator.getDao(AsyncJobDao.class);
- if (_jobDao == null) {
- throw new ConfigurationException("Unable to get "
- + AsyncJobDao.class.getName());
- }
-
- _context = locator.getManager(AsyncJobExecutorContext.class);
- if (_context == null) {
- throw new ConfigurationException("Unable to get "
- + AsyncJobExecutorContext.class.getName());
+ Long jobId = item.getContentId();
+ if(jobId != null) {
+ s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
+ completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
+ }
+ }
+ _queueMgr.purgeItem(item.getId());
+ }
}
+ }
+
+ @Override
+ public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+ _name = name;
+
+ ComponentLocator locator = ComponentLocator.getCurrentLocator();
- _queueMgr = locator.getManager(SyncQueueManager.class);
- if(_queueMgr == null) {
- throw new ConfigurationException("Unable to get "
- + SyncQueueManager.class.getName());
+ ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
+ if (configDao == null) {
+ throw new ConfigurationException("Unable to get the configuration dao.");
}
+
+ int expireMinutes = NumbersUtil.parseInt(
+ configDao.getValue(Config.JobExpireMinutes.key()), 24*60);
+ _jobExpireSeconds = (long)expireMinutes*60;
+ _jobCancelThresholdSeconds = NumbersUtil.parseInt(
+ configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60);
+ _jobCancelThresholdSeconds *= 60;
+
+ _accountDao = locator.getDao(AccountDao.class);
+ if (_accountDao == null) {
+ throw new ConfigurationException("Unable to get " + AccountDao.class.getName());
+ }
+ _jobDao = locator.getDao(AsyncJobDao.class);
+ if (_jobDao == null) {
+ throw new ConfigurationException("Unable to get "
+ + AsyncJobDao.class.getName());
+ }
+
+ _context = locator.getManager(AsyncJobExecutorContext.class);
+ if (_context == null) {
+ throw new ConfigurationException("Unable to get "
+ + AsyncJobExecutorContext.class.getName());
+ }
+
+ _queueMgr = locator.getManager(SyncQueueManager.class);
+ if(_queueMgr == null) {
+ throw new ConfigurationException("Unable to get "
+ + SyncQueueManager.class.getName());
+ }
+
_clusterMgr = locator.getManager(ClusterManager.class);
_accountMgr = locator.getManager(AccountManager.class);
@@@ -750,46 -762,44 +764,44 @@@
}
@Override
- public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
- for(ManagementServerHostVO msHost : nodeList) {
- Transaction txn = Transaction.open(Transaction.CLOUD_DB);
- try {
- txn.start();
- List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
- cleanupPendingJobs(items);
- _jobDao.resetJobProcess(msHost.getId(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
- txn.commit();
- } catch(Throwable e) {
- s_logger.warn("Unexpected exception ", e);
- txn.rollback();
- } finally {
- txn.close();
- }
- }
+ public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+ for(ManagementServerHostVO msHost : nodeList) {
+ Transaction txn = Transaction.open(Transaction.CLOUD_DB);
+ try {
+ txn.start();
+ List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
+ cleanupPendingJobs(items);
- _queueMgr.resetQueueProcess(msHost.getId());
+ _jobDao.resetJobProcess(msHost.getId(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
+ txn.commit();
+ } catch(Throwable e) {
+ s_logger.warn("Unexpected exception ", e);
+ txn.rollback();
+ } finally {
+ txn.close();
+ }
+ }
}
@Override
public void onManagementNodeIsolated() {
}
-
- @Override
+
+ @Override
public boolean start() {
- try {
- List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
- cleanupPendingJobs(l);
- _jobDao.resetJobProcess(getMsid(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
- } catch(Throwable e) {
- s_logger.error("Unexpected exception " + e.getMessage(), e);
- }
-
- _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL,
- HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
- _heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL,
- GC_INTERVAL, TimeUnit.MILLISECONDS);
-
- return true;
+ try {
+ List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
+ cleanupPendingJobs(l);
- _queueMgr.resetQueueProcess(getMsid());
+ _jobDao.resetJobProcess(getMsid(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
+ } catch(Throwable e) {
+ s_logger.error("Unexpected exception " + e.getMessage(), e);
+ }
+
+ _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL,
+ HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
+ _heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL,
+ GC_INTERVAL, TimeUnit.MILLISECONDS);
+
+ return true;
}
private static ExceptionResponse getResetResultResponse(String errorMessage) {
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/async/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/async/SyncQueueManagerImpl.java
index f316ff6,4d15065..94fcf0b
--- a/server/src/com/cloud/async/SyncQueueManagerImpl.java
+++ b/server/src/com/cloud/async/SyncQueueManagerImpl.java
@@@ -35,166 -34,173 +35,173 @@@ import com.cloud.utils.db.DB
import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
- @Component
+
-@Local(value={SyncQueueManager.class})
-public class SyncQueueManagerImpl implements SyncQueueManager {
- public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
-
- private String _name;
-
- private SyncQueueDao _syncQueueDao;
- private SyncQueueItemDao _syncQueueItemDao;
-
- @Override
- @DB
+@Local(value={SyncQueueManager.class})
+public class SyncQueueManagerImpl implements SyncQueueManager {
+ public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
+
+ private String _name;
+
+ private SyncQueueDao _syncQueueDao;
+ private SyncQueueItemDao _syncQueueItemDao;
+
+ @Override
+ @DB
- public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId) {
+ public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) {
- Transaction txn = Transaction.currentTxn();
- try {
- txn.start();
-
- _syncQueueDao.ensureQueue(syncObjType, syncObjId);
- SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
- if(queueVO == null)
- throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
+ Transaction txn = Transaction.currentTxn();
+ try {
+ txn.start();
+
+ _syncQueueDao.ensureQueue(syncObjType, syncObjId);
+ SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
+ if(queueVO == null)
+ throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
+ queueVO.setQueueSizeLimit(queueSizeLimit);
+ _syncQueueDao.update(queueVO.getId(), queueVO);
-
- Date dt = DateUtil.currentGMTTime();
- SyncQueueItemVO item = new SyncQueueItemVO();
- item.setQueueId(queueVO.getId());
- item.setContentType(itemType);
- item.setContentId(itemId);
- item.setCreated(dt);
-
- _syncQueueItemDao.persist(item);
- txn.commit();
-
- return queueVO;
- } catch(Exception e) {
- s_logger.error("Unexpected exception: ", e);
- txn.rollback();
- }
- return null;
- }
-
- @Override
- @DB
- public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
-
- SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
- if(queueVO == null) {
- s_logger.error("Sync queue(id: " + queueId + ") does not exist");
- txt.commit();
- return null;
- }
-
+
+ Date dt = DateUtil.currentGMTTime();
+ SyncQueueItemVO item = new SyncQueueItemVO();
+ item.setQueueId(queueVO.getId());
+ item.setContentType(itemType);
+ item.setContentId(itemId);
+ item.setCreated(dt);
+
+ _syncQueueItemDao.persist(item);
+ txn.commit();
+
+ return queueVO;
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception: ", e);
+ txn.rollback();
+ }
+ return null;
+ }
+
+ @Override
+ @DB
+ public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
+ Transaction txt = Transaction.currentTxn();
+ try {
+ txt.start();
+
+ SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
+ if(queueVO == null) {
+ s_logger.error("Sync queue(id: " + queueId + ") does not exist");
+ txt.commit();
+ return null;
+ }
+
- if(queueVO.getLastProcessTime() == null) {
+ if(queueReadyToProcess(queueVO)) {
- SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
- if(itemVO != null) {
- Long processNumber = queueVO.getLastProcessNumber();
- if(processNumber == null)
- processNumber = new Long(1);
- else
- processNumber = processNumber + 1;
- Date dt = DateUtil.currentGMTTime();
- queueVO.setLastProcessNumber(processNumber);
+ SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
+ if(itemVO != null) {
+ Long processNumber = queueVO.getLastProcessNumber();
+ if(processNumber == null)
+ processNumber = new Long(1);
+ else
+ processNumber = processNumber + 1;
+ Date dt = DateUtil.currentGMTTime();
- queueVO.setLastProcessMsid(msid);
+ queueVO.setLastProcessNumber(processNumber);
- queueVO.setLastProcessTime(dt);
queueVO.setLastUpdated(dt);
+ queueVO.setQueueSize(queueVO.getQueueSize() + 1);
- _syncQueueDao.update(queueVO.getId(), queueVO);
-
- itemVO.setLastProcessMsid(msid);
+ _syncQueueDao.update(queueVO.getId(), queueVO);
+
+ itemVO.setLastProcessMsid(msid);
itemVO.setLastProcessNumber(processNumber);
+ itemVO.setLastProcessTime(dt);
- _syncQueueItemDao.update(itemVO.getId(), itemVO);
-
- txt.commit();
- return itemVO;
- } else {
- if(s_logger.isDebugEnabled())
- s_logger.debug("Sync queue (" + queueId + ") is currently empty");
- }
- } else {
- if(s_logger.isDebugEnabled())
- s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
- }
- txt.commit();
- } catch(Exception e) {
- s_logger.error("Unexpected exception: ", e);
- txt.rollback();
- }
-
- return null;
- }
-
- @Override
- @DB
- public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
-
- List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
-
- List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
- if(l != null && l.size() > 0) {
- for(SyncQueueItemVO item : l) {
- SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
- SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
+ _syncQueueItemDao.update(itemVO.getId(), itemVO);
+
+ txt.commit();
+ return itemVO;
+ } else {
+ if(s_logger.isDebugEnabled())
+ s_logger.debug("Sync queue (" + queueId + ") is currently empty");
+ }
+ } else {
+ if(s_logger.isDebugEnabled())
+ s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
+ }
+ txt.commit();
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception: ", e);
+ txt.rollback();
+ }
+
+ return null;
+ }
+
+ @Override
+ @DB
+ public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
+
+ List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
+ Transaction txt = Transaction.currentTxn();
+ try {
+ txt.start();
+
+ List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
+ if(l != null && l.size() > 0) {
+ for(SyncQueueItemVO item : l) {
+ SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
+ SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
- if(queueVO.getLastProcessTime() == null && itemVO.getLastProcessNumber() == null) {
+ if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
- Long processNumber = queueVO.getLastProcessNumber();
- if(processNumber == null)
- processNumber = new Long(1);
- else
- processNumber = processNumber + 1;
-
- Date dt = DateUtil.currentGMTTime();
+ Long processNumber = queueVO.getLastProcessNumber();
+ if(processNumber == null)
+ processNumber = new Long(1);
+ else
+ processNumber = processNumber + 1;
+
+ Date dt = DateUtil.currentGMTTime();
- queueVO.setLastProcessMsid(msid);
queueVO.setLastProcessNumber(processNumber);
- queueVO.setLastProcessTime(dt);
queueVO.setLastUpdated(dt);
+ queueVO.setQueueSize(queueVO.getQueueSize() + 1);
- _syncQueueDao.update(queueVO.getId(), queueVO);
-
- itemVO.setLastProcessMsid(msid);
+ _syncQueueDao.update(queueVO.getId(), queueVO);
+
+ itemVO.setLastProcessMsid(msid);
itemVO.setLastProcessNumber(processNumber);
+ itemVO.setLastProcessTime(dt);
- _syncQueueItemDao.update(item.getId(), itemVO);
-
- resultList.add(item);
- }
- }
- }
- txt.commit();
- return resultList;
- } catch(Exception e) {
- s_logger.error("Unexpected exception: ", e);
- txt.rollback();
- }
- return null;
- }
-
- @Override
- @DB
- public void purgeItem(long queueItemId) {
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
-
- SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
- if(itemVO != null) {
- SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
-
+ _syncQueueItemDao.update(item.getId(), itemVO);
+
+ resultList.add(item);
+ }
+ }
+ }
+ txt.commit();
+ return resultList;
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception: ", e);
+ txt.rollback();
+ }
+ return null;
+ }
+
+ @Override
+ @DB
+ public void purgeItem(long queueItemId) {
+ Transaction txt = Transaction.currentTxn();
+ try {
+ txt.start();
+
+ SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
+ if(itemVO != null) {
+ SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
+
_syncQueueItemDao.expunge(itemVO.getId());
- queueVO.setLastProcessTime(null);
+ //if item is active, reset queue information
+ if (itemVO.getLastProcessMsid() != null) {
- queueVO.setLastUpdated(DateUtil.currentGMTTime());
+ queueVO.setLastUpdated(DateUtil.currentGMTTime());
+ //decrement the count
+ assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
+ queueVO.setQueueSize(queueVO.getQueueSize() - 1);
- _syncQueueDao.update(queueVO.getId(), queueVO);
- }
+ _syncQueueDao.update(queueVO.getId(), queueVO);
+ }
+ }
- txt.commit();
- } catch(Exception e) {
- s_logger.error("Unexpected exception: ", e);
- txt.rollback();
- }
+ txt.commit();
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception: ", e);
+ txt.rollback();
+ }
}
@Override
@@@ -233,44 -239,50 +240,50 @@@
return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
}
- @Override
- public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
- _name = name;
+ @Override
- public void resetQueueProcess(long msid) {
- _syncQueueDao.resetQueueProcessing(msid);
- }
-
- @Override
+ public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+ _name = name;
ComponentLocator locator = ComponentLocator.getCurrentLocator();
-
- _syncQueueDao = locator.getDao(SyncQueueDao.class);
- if (_syncQueueDao == null) {
- throw new ConfigurationException("Unable to get "
- + SyncQueueDao.class.getName());
- }
-
- _syncQueueItemDao = locator.getDao(SyncQueueItemDao.class);
- if (_syncQueueItemDao == null) {
- throw new ConfigurationException("Unable to get "
- + SyncQueueDao.class.getName());
+
+ _syncQueueDao = locator.getDao(SyncQueueDao.class);
+ if (_syncQueueDao == null) {
+ throw new ConfigurationException("Unable to get "
+ + SyncQueueDao.class.getName());
}
-
- return true;
- }
-
- @Override
- public boolean start() {
- return true;
- }
-
- @Override
- public boolean stop() {
- return true;
- }
-
- @Override
- public String getName() {
- return _name;
+
+ _syncQueueItemDao = locator.getDao(SyncQueueItemDao.class);
+ if (_syncQueueItemDao == null) {
+ throw new ConfigurationException("Unable to get "
+ + SyncQueueDao.class.getName());
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean start() {
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ return true;
}
+ @Override
+ public String getName() {
+ return _name;
+ }
- }
+
+ private boolean queueReadyToProcess(SyncQueueVO queueVO) {
+ return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+ }
+
+ @Override
+ public void purgeAsyncJobQueueItemId(long asyncJobId) {
+ Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
+ if (itemId != null) {
+ purgeItem(itemId);
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
index 599e345,bfe8c0f..7b4c182
--- a/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
+++ b/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
@@@ -40,12 -38,13 +40,13 @@@ public class SyncQueueDaoImpl extends G
private static final Logger s_logger = Logger.getLogger(SyncQueueDaoImpl.class.getName());
SearchBuilder<SyncQueueVO> TypeIdSearch = createSearchBuilder();
-
- @Override
- public void ensureQueue(String syncObjType, long syncObjId) {
- Date dt = DateUtil.currentGMTTime();
+
+ @Override
+ public void ensureQueue(String syncObjType, long syncObjId) {
+ Date dt = DateUtil.currentGMTTime();
- String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated) values(?, ?, ?, ?)";
+ String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated)" +
+ " values(?, ?, ?, ?)";
-
+
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
try {
@@@ -56,40 -55,23 +57,23 @@@
pstmt.setString(4, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), dt));
pstmt.execute();
} catch (SQLException e) {
- s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e);
+ s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e);
} catch (Throwable e) {
- s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e);
+ s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e);
}
- }
-
- @Override
- public SyncQueueVO find(String syncObjType, long syncObjId) {
- SearchCriteria<SyncQueueVO> sc = TypeIdSearch.create();
- sc.setParameters("syncObjType", syncObjType);
- sc.setParameters("syncObjId", syncObjId);
+ }
+
+ @Override
+ public SyncQueueVO find(String syncObjType, long syncObjId) {
+ SearchCriteria<SyncQueueVO> sc = TypeIdSearch.create();
+ sc.setParameters("syncObjType", syncObjType);
+ sc.setParameters("syncObjId", syncObjId);
return findOneBy(sc);
- }
+ }
- @Override @DB
- public void resetQueueProcessing(long msid) {
- String sql = "UPDATE sync_queue set queue_proc_msid=NULL, queue_proc_time=NULL where queue_proc_msid=?";
-
- Transaction txn = Transaction.currentTxn();
- PreparedStatement pstmt = null;
- try {
- pstmt = txn.prepareAutoCloseStatement(sql);
- pstmt.setLong(1, msid);
- pstmt.execute();
- } catch (SQLException e) {
- s_logger.warn("Unable to reset sync queue for management server " + msid, e);
- } catch (Throwable e) {
- s_logger.warn("Unable to reset sync queue for management server " + msid, e);
- }
- }
-
- protected SyncQueueDaoImpl() {
- super();
- TypeIdSearch = createSearchBuilder();
+ protected SyncQueueDaoImpl() {
+ super();
+ TypeIdSearch = createSearchBuilder();
TypeIdSearch.and("syncObjType", TypeIdSearch.entity().getSyncObjType(), SearchCriteria.Op.EQ);
TypeIdSearch.and("syncObjId", TypeIdSearch.entity().getSyncObjId(), SearchCriteria.Op.EQ);
TypeIdSearch.done();
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
index 6d7342c,5e75756..8ee21f3
--- a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
+++ b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
@@@ -25,58 -26,63 +26,66 @@@ import java.util.List
import java.util.TimeZone;
import javax.ejb.Local;
+import javax.inject.Inject;
import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
import com.cloud.async.SyncQueueItemVO;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
- import com.cloud.utils.db.JoinBuilder;
+ import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
+ import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
+@Component
@Local(value = { SyncQueueItemDao.class })
public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
+ final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
+
+ protected SyncQueueItemDaoImpl() {
+ super();
+ queueIdSearch = createSearchBuilder(Long.class);
+ queueIdSearch.and("contentId", queueIdSearch.entity().getContentId(), Op.EQ);
+ queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ);
+ queueIdSearch.selectField(queueIdSearch.entity().getId());
+ queueIdSearch.done();
+ }
+
- // private final SyncQueueDao _syncQueueDao = new SyncQueueDaoImpl();
- @Inject private SyncQueueDao _syncQueueDao;
-
- @Override
- public SyncQueueItemVO getNextQueueItem(long queueId) {
-
- SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
+ @Override
+ public SyncQueueItemVO getNextQueueItem(long queueId) {
+
+ SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
sb.and("queueId", sb.entity().getQueueId(), SearchCriteria.Op.EQ);
- sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), SearchCriteria.Op.NULL);
+ sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), SearchCriteria.Op.NULL);
sb.done();
- SearchCriteria<SyncQueueItemVO> sc = sb.create();
- sc.setParameters("queueId", queueId);
-
- Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L);
+ SearchCriteria<SyncQueueItemVO> sc = sb.create();
+ sc.setParameters("queueId", queueId);
+
+ Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L);
List<SyncQueueItemVO> l = listBy(sc, filter);
if(l != null && l.size() > 0)
- return l.get(0);
-
- return null;
- }
-
- @Override
- public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
- List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
-
- String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
- " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
+ return l.get(0);
+
+ return null;
+ }
+
+ @Override
+ public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
+ List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
+
+ String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
+ " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
- " WHERE q.queue_proc_time IS NULL AND i.queue_proc_number IS NULL " +
+ " WHERE q.queue_size < q.queue_size_limit AND i.queue_proc_number IS NULL " +
- " GROUP BY q.id " +
- " ORDER BY i.id " +
- " LIMIT 0, ?";
+ " GROUP BY q.id " +
+ " ORDER BY i.id " +
+ " LIMIT 0, ?";
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
@@@ -85,59 -91,55 +94,55 @@@
pstmt.setInt(1, maxItems);
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
- SyncQueueItemVO item = new SyncQueueItemVO();
- item.setId(rs.getLong(1));
- item.setQueueId(rs.getLong(2));
- item.setContentType(rs.getString(3));
- item.setContentId(rs.getLong(4));
- item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5)));
- l.add(item);
+ SyncQueueItemVO item = new SyncQueueItemVO();
+ item.setId(rs.getLong(1));
+ item.setQueueId(rs.getLong(2));
+ item.setContentType(rs.getString(3));
+ item.setContentId(rs.getLong(4));
+ item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5)));
+ l.add(item);
}
} catch (SQLException e) {
- s_logger.error("Unexpected sql excetpion, ", e);
+ s_logger.error("Unexpected sql excetpion, ", e);
} catch (Throwable e) {
- s_logger.error("Unexpected excetpion, ", e);
+ s_logger.error("Unexpected excetpion, ", e);
}
- return l;
- }
-
- @Override
- public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
- SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
+ return l;
+ }
+
+ @Override
+ public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
+ SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(),
- SearchCriteria.Op.EQ);
+ SearchCriteria.Op.EQ);
sb.done();
- SearchCriteria<SyncQueueItemVO> sc = sb.create();
- sc.setParameters("lastProcessMsid", msid);
-
- Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null);
-
- if(exclusive)
- return lockRows(sc, filter, true);
+ SearchCriteria<SyncQueueItemVO> sc = sb.create();
+ sc.setParameters("lastProcessMsid", msid);
+
+ Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null);
+
+ if(exclusive)
+ return lockRows(sc, filter, true);
return listBy(sc, filter);
- }
-
+ }
+
+
@Override
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
Date cutTime = DateUtil.currentGMTTime();
- cutTime = new Date(cutTime.getTime() - thresholdMs);
-
- SearchBuilder<SyncQueueVO> sbQueue = _syncQueueDao.createSearchBuilder();
- sbQueue.and("lastProcessTime", sbQueue.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
- sbQueue.and("lastProcessTime2", sbQueue.entity().getLastProcessTime(), SearchCriteria.Op.LT);
SearchBuilder<SyncQueueItemVO> sbItem = createSearchBuilder();
- sbItem.join("queueItemJoinQueue", sbQueue, sbQueue.entity().getId(), sbItem.entity().getQueueId(), JoinBuilder.JoinType.INNER);
sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL);
+ sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
+ sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT);
-
+
- sbQueue.done();
sbItem.done();
-
+
SearchCriteria<SyncQueueItemVO> sc = sbItem.create();
- sc.setJoinParameters("queueItemJoinQueue", "lastProcessTime2", cutTime);
+ sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs));
-
+
if(exclusive)
return lockRows(sc, null, true);
return listBy(sc, null);
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/BareMetalDiscoverer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/BareMetalResourceBase.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/BareMetalTemplateAdapter.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/baremetal/BareMetalTemplateAdapter.java
index e61b942,25298a9..cd3f2cb
--- a/server/src/com/cloud/baremetal/BareMetalTemplateAdapter.java
+++ b/server/src/com/cloud/baremetal/BareMetalTemplateAdapter.java
@@@ -20,14 -20,12 +20,14 @@@ import java.util.Date
import java.util.List;
import javax.ejb.Local;
+import javax.inject.Inject;
+ import org.apache.cloudstack.api.command.user.iso.DeleteIsoCmd;
+ import org.apache.cloudstack.api.command.user.iso.RegisterIsoCmd;
+ import org.apache.cloudstack.api.command.user.template.RegisterTemplateCmd;
import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
- import com.cloud.api.commands.DeleteIsoCmd;
- import com.cloud.api.commands.RegisterIsoCmd;
- import com.cloud.api.commands.RegisterTemplateCmd;
import com.cloud.configuration.Resource.ResourceType;
import com.cloud.dc.DataCenterVO;
import com.cloud.event.EventTypes;
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/BareMetalVmManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/baremetal/BareMetalVmManagerImpl.java
index 02775bd,2e325ec..c419f5a
--- a/server/src/com/cloud/baremetal/BareMetalVmManagerImpl.java
+++ b/server/src/com/cloud/baremetal/BareMetalVmManagerImpl.java
@@@ -22,14 -22,15 +22,19 @@@ import java.util.List
import java.util.Map;
import java.util.concurrent.Executors;
+import javax.annotation.PostConstruct;
import javax.ejb.Local;
+import javax.inject.Inject;
import javax.naming.ConfigurationException;
+ import org.apache.cloudstack.api.command.user.template.CreateTemplateCmd;
+ import org.apache.cloudstack.api.command.user.vm.DeployVMCmd;
+ import org.apache.cloudstack.api.command.user.vm.UpgradeVMCmd;
+ import org.apache.cloudstack.api.command.user.volume.AttachVolumeCmd;
+ import org.apache.cloudstack.api.command.user.volume.DetachVolumeCmd;
import org.apache.log4j.Logger;
+import org.springframework.context.annotation.Primary;
+import org.springframework.stereotype.Component;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.StopAnswer;
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/ExternalDhcpManagerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/PxeServerManagerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/cluster/ClusterManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/cluster/ClusterManagerImpl.java
index 19705ec,e341b88..9976bd7
--- a/server/src/com/cloud/cluster/ClusterManagerImpl.java
+++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java
@@@ -801,7 -811,8 +818,8 @@@ public class ClusterManagerImpl impleme
invalidHeartbeatConnection();
} finally {
+ txn.transitToAutoManagedConnection(Transaction.CLOUD_DB);
- txn.close("ClusterHeartBeat");
+ txn.close("ClusterHeartBeat");
}
}
};
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/configuration/ConfigurationManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/configuration/ConfigurationManagerImpl.java
index 0a6aaa8,2e20c13..bb943c9
--- a/server/src/com/cloud/configuration/ConfigurationManagerImpl.java
+++ b/server/src/com/cloud/configuration/ConfigurationManagerImpl.java
@@@ -39,33 -38,30 +39,31 @@@ import javax.naming.NamingException
import javax.naming.directory.DirContext;
import javax.naming.directory.InitialDirContext;
+ import org.apache.cloudstack.api.command.admin.config.UpdateCfgCmd;
+ import org.apache.cloudstack.api.command.admin.ldap.LDAPConfigCmd;
+ import org.apache.cloudstack.api.command.admin.ldap.LDAPRemoveCmd;
+ import org.apache.cloudstack.api.command.admin.network.DeleteNetworkOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.network.CreateNetworkOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.network.UpdateNetworkOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.offering.CreateDiskOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.offering.*;
+ import org.apache.cloudstack.api.command.admin.pod.DeletePodCmd;
+ import org.apache.cloudstack.api.command.admin.pod.UpdatePodCmd;
+ import org.apache.cloudstack.api.command.admin.vlan.CreateVlanIpRangeCmd;
+ import org.apache.cloudstack.api.command.admin.zone.CreateZoneCmd;
+ import org.apache.cloudstack.api.command.admin.zone.DeleteZoneCmd;
+ import org.apache.cloudstack.api.command.admin.zone.UpdateZoneCmd;
+ import org.apache.cloudstack.api.command.admin.offering.CreateServiceOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.offering.DeleteServiceOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.vlan.DeleteVlanIpRangeCmd;
+ import org.apache.cloudstack.api.command.user.network.ListNetworkOfferingsCmd;
import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
- import com.cloud.acl.SecurityChecker;
+ import org.apache.cloudstack.acl.SecurityChecker;
import com.cloud.alert.AlertManager;
- import com.cloud.api.ApiConstants.LDAPParams;
+ import org.apache.cloudstack.api.ApiConstants.LDAPParams;
import com.cloud.api.ApiDBUtils;
- import com.cloud.api.commands.CreateDiskOfferingCmd;
- import com.cloud.api.commands.CreateNetworkOfferingCmd;
- import com.cloud.api.commands.CreateServiceOfferingCmd;
- import com.cloud.api.commands.CreateVlanIpRangeCmd;
- import com.cloud.api.commands.CreateZoneCmd;
- import com.cloud.api.commands.DeleteDiskOfferingCmd;
- import com.cloud.api.commands.DeleteNetworkOfferingCmd;
- import com.cloud.api.commands.DeletePodCmd;
- import com.cloud.api.commands.DeleteServiceOfferingCmd;
- import com.cloud.api.commands.DeleteVlanIpRangeCmd;
- import com.cloud.api.commands.DeleteZoneCmd;
- import com.cloud.api.commands.LDAPConfigCmd;
- import com.cloud.api.commands.LDAPRemoveCmd;
- import com.cloud.api.commands.ListNetworkOfferingsCmd;
- import com.cloud.api.commands.UpdateCfgCmd;
- import com.cloud.api.commands.UpdateDiskOfferingCmd;
- import com.cloud.api.commands.UpdateNetworkOfferingCmd;
- import com.cloud.api.commands.UpdatePodCmd;
- import com.cloud.api.commands.UpdateServiceOfferingCmd;
- import com.cloud.api.commands.UpdateZoneCmd;
import com.cloud.capacity.dao.CapacityDao;
import com.cloud.configuration.Resource.ResourceType;
import com.cloud.configuration.dao.ConfigurationDao;
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/configuration/DefaultComponentLibrary.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/dao/EntityManagerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/dc/dao/HostPodDaoImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/dc/dao/HostPodDaoImpl.java
index 9110667,fce308a..a06bd3f
--- a/server/src/com/cloud/dc/dao/HostPodDaoImpl.java
+++ b/server/src/com/cloud/dc/dao/HostPodDaoImpl.java
@@@ -36,8 -36,11 +37,12 @@@ import com.cloud.utils.db.SearchBuilder
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
+ import com.cloud.utils.component.ComponentLocator;
+ import com.cloud.vm.VMInstanceVO;
+ import com.cloud.vm.VirtualMachine;
+ import com.cloud.vm.dao.VMInstanceDaoImpl;
+@Component
@Local(value={HostPodDao.class})
public class HostPodDaoImpl extends GenericDaoBase<HostPodVO, Long> implements HostPodDao {
private static final Logger s_logger = Logger.getLogger(HostPodDaoImpl.class);
@@@ -63,8 -66,29 +68,29 @@@
return listBy(sc);
}
-
- @Override
+
+ @Override
+ public List<HostPodVO> listByDataCenterIdVMTypeAndStates(long id, VirtualMachine.Type type, VirtualMachine.State... states) {
+ final VMInstanceDaoImpl _vmDao = ComponentLocator.inject(VMInstanceDaoImpl.class);
+ SearchBuilder<VMInstanceVO> vmInstanceSearch = _vmDao.createSearchBuilder();
+ vmInstanceSearch.and("type", vmInstanceSearch.entity().getType(), SearchCriteria.Op.EQ);
+ vmInstanceSearch.and("states", vmInstanceSearch.entity().getState(), SearchCriteria.Op.IN);
+
+ SearchBuilder<HostPodVO> podIdSearch = createSearchBuilder();
+ podIdSearch.and("dc", podIdSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
+ podIdSearch.select(null, SearchCriteria.Func.DISTINCT, podIdSearch.entity().getId());
+ podIdSearch.join("vmInstanceSearch", vmInstanceSearch, podIdSearch.entity().getId(),
+ vmInstanceSearch.entity().getPodIdToDeployIn(), JoinBuilder.JoinType.INNER);
+ podIdSearch.done();
+
+ SearchCriteria<HostPodVO> sc = podIdSearch.create();
+ sc.setParameters("dc", id);
+ sc.setJoinParameters("vmInstanceSearch", "type", type);
+ sc.setJoinParameters("vmInstanceSearch", "states", (Object[]) states);
+ return listBy(sc);
+ }
+
+ @Override
public HostPodVO findByName(String name, long dcId) {
SearchCriteria<HostPodVO> sc = DataCenterAndNameSearch.create();
sc.setParameters("dc", dcId);
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/deploy/FirstFitPlanner.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/deploy/FirstFitPlanner.java
index 9b5646d,06e46fb..bcc1d26
--- a/server/src/com/cloud/deploy/FirstFitPlanner.java
+++ b/server/src/com/cloud/deploy/FirstFitPlanner.java
@@@ -75,8 -72,8 +74,7 @@@ import com.cloud.storage.dao.VolumeDao
import com.cloud.user.AccountManager;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Pair;
- import com.cloud.utils.StringUtils;
import com.cloud.utils.component.Adapters;
-import com.cloud.utils.component.Inject;
import com.cloud.vm.DiskProfile;
import com.cloud.vm.ReservationContext;
import com.cloud.vm.VirtualMachine;