You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/01/09 14:26:48 UTC

[13/50] [abbrv] Merge branch 'api_refactoring' into javelin

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/api/ApiDispatcher.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/api/ApiDispatcher.java
index 3c1b0b0,60d8836..885cf87
--- a/server/src/com/cloud/api/ApiDispatcher.java
+++ b/server/src/com/cloud/api/ApiDispatcher.java
@@@ -27,15 -30,19 +30,20 @@@ import java.util.Map
  import java.util.StringTokenizer;
  import java.util.regex.Matcher;
  
- import javax.inject.Inject;
- 
+ import com.cloud.dao.EntityManager;
+ import org.apache.cloudstack.acl.ControlledEntity;
+ import org.apache.cloudstack.acl.InfrastructureEntity;
+ import org.apache.cloudstack.acl.Role;
+ import org.apache.cloudstack.api.*;
  import org.apache.log4j.Logger;
 +import org.springframework.stereotype.Component;
  
- import com.cloud.api.BaseCmd.CommandType;
- import com.cloud.api.commands.ListEventsCmd;
+ import org.apache.cloudstack.api.BaseCmd.CommandType;
+ import org.apache.cloudstack.api.command.user.event.ListEventsCmd;
  import com.cloud.async.AsyncCommandQueued;
  import com.cloud.async.AsyncJobManager;
+ import com.cloud.configuration.Config;
+ import com.cloud.configuration.dao.ConfigurationDao;
  import com.cloud.exception.AccountLimitException;
  import com.cloud.exception.InsufficientCapacityException;
  import com.cloud.exception.InvalidParameterValueException;
@@@ -118,10 -148,54 +149,54 @@@ public class ApiDispatcher 
          }
      }
  
+     private void checkACLOnCommand(BaseCmd cmd) {
+         // TODO Auto-generated method stub
+         //need to write an commandACLChecker adapter framework to check ACL on commands - default one will use the static roles by referring to commands.properties.
+         //one can write another commandACLChecker to check access via custom roles.
+     }
+ 
+     private List<Role> determineRole(Account caller) {
+         // TODO Auto-generated method stub
+         List<Role> effectiveRoles = new ArrayList<Role>();
+         return effectiveRoles;
+ 
+     }
+ 
+     private void doAccessChecks(BaseCmd cmd, List<Object> entitiesToAccess) {
+ 		//owner
+ 		Account caller = UserContext.current().getCaller();
+ 		Account owner = _accountMgr.getActiveAccountById(cmd.getEntityOwnerId());
+ 
+         // REMOVE ME:
+ 		// List<Role> callerRoles = determineRole(caller);
+ 		// List<Role> ownerRoles = determineRole(owner);
+ 		// check permission to call this command for the caller
+ 		// this needs checking of static roles of the caller
+         // Role based acl is done in ApiServer before api gets to ApiDispatcher
+         // checkACLOnCommand(cmd);
+ 
+         if(cmd instanceof BaseAsyncCreateCmd) {
+             //check that caller can access the owner account.
+             _accountMgr.checkAccess(caller, null, true, owner);
+         }
+ 
+         if(!entitiesToAccess.isEmpty()){
+             //check that caller can access the owner account.
+             _accountMgr.checkAccess(caller, null, true, owner);
+             for(Object entity : entitiesToAccess) {
+                 if (entity instanceof ControlledEntity) {
+                     _accountMgr.checkAccess(caller, null, true, (ControlledEntity) entity);
+                 }
+                 else if (entity instanceof InfrastructureEntity) {
+                     //do something here:D
+                 }
+             }
+         }
+ 	}
+ 
 -	public void dispatch(BaseCmd cmd, Map<String, String> params) {
 +    public void dispatch(BaseCmd cmd, Map<String, String> params) {
-         setupParameters(cmd, params);
-         ApiDispatcher.plugService(cmd);
          try {
+             processParameters(cmd, params);
              UserContext ctx = UserContext.current();
              ctx.setAccountId(cmd.getEntityOwnerId());
              if (cmd instanceof BaseAsyncCmd) {
@@@ -345,7 -435,155 +436,155 @@@
                  // and IllegalAccessException setting one of the parameters.
                  throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Internal error executing API command " + cmd.getCommandName().substring(0, cmd.getCommandName().length() - 8));
              }
+ 
+             //check access on the resource this field points to
+ 	        try {
+                 ACL checkAccess = field.getAnnotation(ACL.class);
+                 CommandType fieldType = parameterAnnotation.type();
+ 
+                 if (checkAccess != null) {
+                     // Verify that caller can perform actions in behalf of vm owner
+                     //acumulate all Controlled Entities together.
+ 
+                     //parse the array of resource types and in case of map check access on key or value or both as specified in @acl
+                     //implement external dao for classes that need findByName
+                     //for maps, specify access to be checkd on key or value.
+ 
+                     // find the controlled entity DBid by uuid
+                     if (parameterAnnotation.entityType() != null) {
+                         Class<?>[] entityList = parameterAnnotation.entityType()[0].getAnnotation(EntityReference.class).value();
+ 
+                         for (Class entity : entityList) {
+                             // Check if the parameter type is a single
+                             // Id or list of id's/name's
+                             switch (fieldType) {
+                                 case LIST:
+                                     CommandType listType = parameterAnnotation.collectionType();
+                                     switch (listType) {
+                                         case LONG:
+                                         case UUID:
+                                             List<Long> listParam = new ArrayList<Long>();
+                                             listParam = (List) field.get(cmd);
+                                             for (Long entityId : listParam) {
+                                                 Object entityObj = s_instance._entityMgr.findById(entity, (Long) field.get(cmd));
+                                                 entitiesToAccess.add(entityObj);
+                                             }
+                                             break;
+                                     /*
+                                      * case STRING: List<String> listParam =
+                                      * new ArrayList<String>(); listParam =
+                                      * (List)field.get(cmd); for(String
+                                      * entityName: listParam){
+                                      * ControlledEntity entityObj =
+                                      * (ControlledEntity
+                                      * )daoClassInstance(entityId);
+                                      * entitiesToAccess.add(entityObj); }
+                                      * break;
+                                      */
+                                         default:
+                                             break;
 -                                    }
 +        }
+                                     break;
+                                 case LONG:
+                                 case UUID:
+                                     Object entityObj = s_instance._entityMgr.findById(entity, (Long) field.get(cmd));
+                                     entitiesToAccess.add(entityObj);
+                                     break;
+                                 default:
+                                     break;
+                             }
+ 
+                             if (ControlledEntity.class.isAssignableFrom(entity)) {
+                                 if (s_logger.isDebugEnabled()) {
+                                     s_logger.debug("ControlledEntity name is:" + entity.getName());
+                                 }
+                             }
+ 
+                             if (InfrastructureEntity.class.isAssignableFrom(entity)) {
+                                 if (s_logger.isDebugEnabled()) {
+                                     s_logger.debug("InfrastructureEntity name is:" + entity.getName());
+                                 }
+                             }
+                         }
+ 
+ 	            	}
+ 
+ 	            }
+ 
+ 			} catch (IllegalArgumentException e) {
+ 	            s_logger.error("Error initializing command " + cmd.getCommandName() + ", field " + field.getName() + " is not accessible.");
+ 	            throw new CloudRuntimeException("Internal error initializing parameters for command " + cmd.getCommandName() + " [field " + field.getName() + " is not accessible]");
+ 			} catch (IllegalAccessException e) {
+ 	            s_logger.error("Error initializing command " + cmd.getCommandName() + ", field " + field.getName() + " is not accessible.");
+ 	            throw new CloudRuntimeException("Internal error initializing parameters for command " + cmd.getCommandName() + " [field " + field.getName() + " is not accessible]");
+ 			}
+ 
+         }
+ 
+         //check access on the entities.
+         s_instance.doAccessChecks(cmd, entitiesToAccess);
+ 
+     }
+ 
+     private static Long translateUuidToInternalId(String uuid, Parameter annotation)
+     {
+         if (uuid.equals("-1")) {
+             // FIXME: This is to handle a lot of hardcoded special cases where -1 is sent
+             // APITODO: Find and get rid of all hardcoded params in API Cmds and service layer
+             return -1L;
+         }
+         Long internalId = null;
+         // If annotation's empty, the cmd existed before 3.x try conversion to long
+         // FIXME: Fails if someone adds since field for any pre 3.x apis
+         boolean isPre3x = annotation.since().isEmpty();
+         // Match against Java's UUID regex to check if input is uuid string
+         boolean isUuid = uuid.matches("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$");
+         // Enforce that it's uuid for newly added apis from version 3.x
+         if (!isPre3x && !isUuid)
+             return null;
+         // Allow both uuid and internal id for pre3x apis
+         if (isPre3x && !isUuid) {
+             try {
+                 internalId = Long.parseLong(uuid);
+             } catch(NumberFormatException e) {
+                 // In case regex failed, and it's still uuid string
+                 internalId = null;
+             }
+             if (internalId != null)
+                 return internalId;
+         }
+         // There may be multiple entities defined on the @EntityReference of a Response.class
+         // UUID CommandType would expect only one entityType, so use the first entityType
+         Class<?>[] entities = annotation.entityType()[0].getAnnotation(EntityReference.class).value();
+         // Go through each entity which is an interface to a VO class and get a VO object
+         // Try to getId() for the object using reflection, break on first non-null value
+         for (Class<?> entity: entities) {
+             // findByUuid returns one VO object using uuid, use reflect to get the Id
+             Object objVO = s_instance._entityMgr.findByUuid(entity, uuid);
+             if (objVO == null) {
+                 continue;
+             }
+             // Invoke the getId method, get the internal long ID
+             // If that fails hide exceptions as the uuid may not exist
+             try {
+                 internalId = (Long) ((InternalIdentity)objVO).getId();
+             } catch (IllegalArgumentException e) {
+             } catch (NullPointerException e) {
+             }
+             // Return on first non-null Id for the uuid entity
+             if (internalId != null)
+                 break;
+         }
+         if (internalId == null) {
+             if (s_logger.isDebugEnabled()) {
+                 s_logger.debug("Object entity with uuid=" + uuid + " does not exist in the database.");
+             }
+             if (annotation.required()) {
+                 throw new InvalidParameterValueException("Invalid parameter with uuid=" + uuid
+                         + ". Entity not found, or an annotation bug.");
+             }
+         }
+         return internalId;
      }
  
      @SuppressWarnings({ "unchecked", "rawtypes" })
@@@ -389,45 -627,58 +628,58 @@@
                  }
                  break;
              case FLOAT:
+                 // Assuming that the parameters have been checked for required before now,
+                 // we ignore blank or null values and defer to the command to set a default
+                 // value for optional parameters ...
+                 if (paramObj != null && isNotBlank(paramObj.toString())) {
 -                    field.set(cmdObj, Float.valueOf(paramObj.toString()));
 +                field.set(cmdObj, Float.valueOf(paramObj.toString()));
+                 }
                  break;
              case INTEGER:
+                 // Assuming that the parameters have been checked for required before now,
+                 // we ignore blank or null values and defer to the command to set a default
+                 // value for optional parameters ...
+                 if (paramObj != null && isNotBlank(paramObj.toString())) {
 -                    field.set(cmdObj, Integer.valueOf(paramObj.toString()));
 +                field.set(cmdObj, Integer.valueOf(paramObj.toString()));
+                 }
                  break;
 -                case LIST:
 -                    List listParam = new ArrayList();
 -                    StringTokenizer st = new StringTokenizer(paramObj.toString(), ",");
 -                    while (st.hasMoreTokens()) {
 -                        String token = st.nextToken();
 -                        CommandType listType = annotation.collectionType();
 -                        switch (listType) {
 -                            case INTEGER:
 -                                listParam.add(Integer.valueOf(token));
 -                                break;
 +            case LIST:
 +                List listParam = new ArrayList();
 +                StringTokenizer st = new StringTokenizer(paramObj.toString(), ",");
 +                while (st.hasMoreTokens()) {
 +                    String token = st.nextToken();
 +                    CommandType listType = annotation.collectionType();
 +                    switch (listType) {
 +                    case INTEGER:
 +                        listParam.add(Integer.valueOf(token));
 +                        break;
+                             case UUID:
+                                 if (token.isEmpty())
+                                     break;
+                                 Long internalId = translateUuidToInternalId(token, annotation);
+                                 listParam.add(internalId);
+                                 break;
 -                            case LONG: {
 +                    case LONG: {
-                         Long val = null;
-                         if (identityMapper != null)
-                             val = s_instance._identityDao.getIdentityId(identityMapper, token);
-                         else
-                             val = Long.valueOf(token);
- 
-                         listParam.add(val);
+                                 listParam.add(Long.valueOf(token));
 -                            }
 -                            break;
 -                            case SHORT:
 -                                listParam.add(Short.valueOf(token));
 -                            case STRING:
 -                                listParam.add(token);
 -                                break;
 -                        }
                      }
 -                    field.set(cmdObj, listParam);
 -                    break;
 +                        break;
 +                    case SHORT:
 +                        listParam.add(Short.valueOf(token));
 +                    case STRING:
 +                        listParam.add(token);
 +                        break;
 +                    }
 +                }
 +                field.set(cmdObj, listParam);
 +                break;
+             case UUID:
+                 if (paramObj.toString().isEmpty())
+                     break;
+                 Long internalId = translateUuidToInternalId(paramObj.toString(), annotation);
+                 field.set(cmdObj, internalId);
+                 break;
              case LONG:
-                 if (identityMapper != null)
-                     field.set(cmdObj, s_instance._identityDao.getIdentityId(identityMapper, paramObj.toString()));
-                 else
 -                field.set(cmdObj, Long.valueOf(paramObj.toString()));
 +                    field.set(cmdObj, Long.valueOf(paramObj.toString()));
                  break;
              case SHORT:
                  field.set(cmdObj, Short.valueOf(paramObj.toString()));
@@@ -467,46 -718,31 +719,31 @@@
          return cal.getTime();
      }
  
-     public static void plugService(BaseCmd cmd) {
- 
-         if (!ApiServer.isPluggableServiceCommand(cmd.getClass().getName())) {
-             return;
-         }
-         Class<?> clazz = cmd.getClass();
+     public static void plugService(Field field, BaseCmd cmd) {
          ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name);
-         do {
-             Field[] fields = clazz.getDeclaredFields();
-             for (Field field : fields) {
-                 PlugService plugService = field.getAnnotation(PlugService.class);
-                 if (plugService == null) {
-                     continue;
-                 }
+ 
 -        Class<?> fc = field.getType();
 -        Object instance = null;
 -        if (PluggableService.class.isAssignableFrom(fc)) {
 -            instance = locator.getPluggableService(fc);
 -        }
 +                Class<?> fc = field.getType();
 +                Object instance = null;
 +                if (PluggableService.class.isAssignableFrom(fc)) {
 +                    instance = locator.getPluggableService(fc);
 +                }
  
 -        if (instance == null) {
 +                if (instance == null) {
-                     throw new CloudRuntimeException("Unable to plug service " + fc.getSimpleName() + " in command " + clazz.getSimpleName());
+             throw new CloudRuntimeException("Unable to plug service " + fc.getSimpleName() + " in command " + cmd.getClass().getSimpleName());
 -        }
 -
 -        try {
 -            field.setAccessible(true);
 -            field.set(cmd, instance);
 -        } catch (IllegalArgumentException e) {
 -            s_logger.error("IllegalArgumentException at plugService for command " + cmd.getCommandName() + ", field " + field.getName());
 -            throw new CloudRuntimeException("Internal error at plugService for command " + cmd.getCommandName() + " [Illegal argumet at field " + field.getName() + "]");
 -        } catch (IllegalAccessException e) {
 -            s_logger.error("Error at plugService for command " + cmd.getCommandName() + ", field " + field.getName() + " is not accessible.");
 -            throw new CloudRuntimeException("Internal error at plugService for command " + cmd.getCommandName() + " [field " + field.getName() + " is not accessible]");
 -        }
 -    }
 +                }
  
 +                try {
 +                    field.setAccessible(true);
 +                    field.set(cmd, instance);
 +                } catch (IllegalArgumentException e) {
 +                    s_logger.error("IllegalArgumentException at plugService for command " + cmd.getCommandName() + ", field " + field.getName());
 +                    throw new CloudRuntimeException("Internal error at plugService for command " + cmd.getCommandName() + " [Illegal argumet at field " + field.getName() + "]");
 +                } catch (IllegalAccessException e) {
 +                    s_logger.error("Error at plugService for command " + cmd.getCommandName() + ", field " + field.getName() + " is not accessible.");
 +                    throw new CloudRuntimeException("Internal error at plugService for command " + cmd.getCommandName() + " [field " + field.getName() + " is not accessible]");
 +                }
 +            }
-             clazz = clazz.getSuperclass();
-         } while (clazz != Object.class && clazz != null);
-     }
-     
 +    
      public static Long getIdentiyId(String tableName, String token) {
          return s_instance._identityDao.getIdentityId(tableName, token);
      }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/api/ApiServer.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/api/ApiServer.java
index 457bba4,6209171..8a88cbb
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@@ -81,11 -88,17 +89,18 @@@ import org.apache.http.protocol.Respons
  import org.apache.http.protocol.ResponseDate;
  import org.apache.http.protocol.ResponseServer;
  import org.apache.log4j.Logger;
 +import org.springframework.stereotype.Component;
  
+ import org.apache.cloudstack.api.command.admin.host.ListHostsCmd;
+ import org.apache.cloudstack.api.command.admin.router.ListRoutersCmd;
+ import org.apache.cloudstack.api.command.admin.user.ListUsersCmd;
+ import org.apache.cloudstack.api.command.user.project.ListProjectInvitationsCmd;
+ import org.apache.cloudstack.api.command.user.project.ListProjectsCmd;
+ import org.apache.cloudstack.api.command.user.securitygroup.ListSecurityGroupsCmd;
+ import org.apache.cloudstack.api.command.user.tag.ListTagsCmd;
  import com.cloud.api.response.ApiResponseSerializer;
- import com.cloud.api.response.ExceptionResponse;
- import com.cloud.api.response.ListResponse;
+ import org.apache.cloudstack.api.response.ExceptionResponse;
+ import org.apache.cloudstack.api.response.ListResponse;
  import com.cloud.async.AsyncJob;
  import com.cloud.async.AsyncJobManager;
  import com.cloud.async.AsyncJobVO;
@@@ -277,12 -186,13 +188,13 @@@ public class ApiServer implements HttpR
          _systemAccount = _accountMgr.getSystemAccount();
          _systemUser = _accountMgr.getSystemUser();
          _dispatcher = ApiDispatcher.getInstance();
 -
 +  
          Integer apiPort = null; // api port, null by default
-         
-         SearchCriteria<ConfigurationVO> sc = _configDao.createSearchCriteria();
+         ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name);
+         ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
+         SearchCriteria<ConfigurationVO> sc = configDao.createSearchCriteria();
          sc.addAnd("name", SearchCriteria.Op.EQ, "integration.api.port");
 -        List<ConfigurationVO> values = configDao.search(sc, null);
 +        List<ConfigurationVO> values = _configDao.search(sc, null);
          if ((values != null) && (values.size() > 0)) {
              ConfigurationVO apiPortConfig = values.get(0);
              if (apiPortConfig.getValue() != null) {
@@@ -434,9 -356,9 +358,9 @@@
          } catch (Exception ex) {
              if (ex instanceof InvalidParameterValueException) {
              	InvalidParameterValueException ref = (InvalidParameterValueException)ex;
 -		ServerApiException e = new ServerApiException(BaseCmd.PARAM_ERROR, ex.getMessage());
 +            	ServerApiException e = new ServerApiException(BaseCmd.PARAM_ERROR, ex.getMessage());            	
                  // copy over the IdentityProxy information as well and throw the serverapiexception.
-                 ArrayList<IdentityProxy> idList = ref.getIdProxyList();
+                 ArrayList<String> idList = ref.getIdProxyList();
                  if (idList != null) {
                  	// Iterate through entire arraylist and copy over each proxy id.
                  	for (int i = 0 ; i < idList.size(); i++) {
@@@ -576,15 -506,21 +508,21 @@@
                  if (job.getInstanceId() == null) {
                      continue;
                  }
+                 String instanceUuid = ApiDBUtils.findJobInstanceUuid(job);
+                 if (instanceUuid != null) {
+                     objectJobMap.put(instanceUuid, job);
+                 }
+             }
+ 
 -            for (ResponseObject response : responses) {
 +                for (ResponseObject response : responses) {
-                     if (response.getObjectId() != null && job.getInstanceId().longValue() == response.getObjectId().longValue()) {
-                         response.setJobId(job.getId());
+                 if (response.getObjectId() != null && objectJobMap.containsKey(response.getObjectId())) {
+                     AsyncJob job = objectJobMap.get(response.getObjectId());
+                     response.setJobId(job.getUuid());
 -                    response.setJobStatus(job.getStatus());
 +                        response.setJobStatus(job.getStatus());
 +                    }
                  }
              }
          }
--    }
  
      private void buildAuditTrail(StringBuffer auditTrailSb, String command, String result) {
          if (result == null) {
@@@ -1060,34 -971,31 +973,31 @@@
              		// Cast the exception appropriately and retrieve the IdentityProxy
              		if (ex instanceof ServerApiException) {
              			ServerApiException ref = (ServerApiException) ex;
-             			ArrayList<IdentityProxy> idList = ref.getIdProxyList();
+             			ArrayList<String> idList = ref.getIdProxyList();
              			if (idList != null) {
              				for (int i=0; i < idList.size(); i++) {
-             					IdentityProxy id = idList.get(i);
-             					apiResponse.addProxyObject(id.getTableName(), id.getValue(), id.getidFieldName());
+             					apiResponse.addProxyObject(idList.get(i));
 -					}
 +            				}            				
              			}
              			// Also copy over the cserror code and the function/layer in which it was thrown.
              			apiResponse.setCSErrorCode(ref.getCSErrorCode());
              		} else if (ex instanceof PermissionDeniedException) {
              			PermissionDeniedException ref = (PermissionDeniedException) ex;
-             			ArrayList<IdentityProxy> idList = ref.getIdProxyList();
+             			ArrayList<String> idList = ref.getIdProxyList();
              			if (idList != null) {
              				for (int i=0; i < idList.size(); i++) {
-             					IdentityProxy id = idList.get(i);
-             					apiResponse.addProxyObject(id.getTableName(), id.getValue(), id.getidFieldName());
+             					apiResponse.addProxyObject(idList.get(i));
 -					}
 +            				}            				
              			}
              			// Also copy over the cserror code and the function/layer in which it was thrown.
              			apiResponse.setCSErrorCode(ref.getCSErrorCode());
              		} else if (ex instanceof InvalidParameterValueException) {
              			InvalidParameterValueException ref = (InvalidParameterValueException) ex;
-             			ArrayList<IdentityProxy> idList = ref.getIdProxyList();
+             			ArrayList<String> idList = ref.getIdProxyList();
              			if (idList != null) {
              				for (int i=0; i < idList.size(); i++) {
-             					IdentityProxy id = idList.get(i);
-             					apiResponse.addProxyObject(id.getTableName(), id.getValue(), id.getidFieldName());
+             					apiResponse.addProxyObject(idList.get(i));
 -					}
 +            				}            				
              			}
              			// Also copy over the cserror code and the function/layer in which it was thrown.
              			apiResponse.setCSErrorCode(ref.getCSErrorCode());

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/async/AsyncJobManagerImpl.java
index 00546fb,7bf5c5a..e4016f7
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@@ -34,9 -34,9 +34,10 @@@ import java.util.concurrent.TimeUnit
  import javax.ejb.Local;
  import javax.naming.ConfigurationException;
  
+ import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
  import org.apache.log4j.Logger;
  import org.apache.log4j.NDC;
 +import org.springframework.stereotype.Component;
  
  import com.cloud.api.ApiDispatcher;
  import com.cloud.api.ApiGsonHelper;
@@@ -74,46 -73,36 +74,47 @@@ import com.cloud.utils.mgmt.JmxUtil
  import com.cloud.utils.net.MacAddress;
  import com.google.gson.Gson;
  import com.google.gson.reflect.TypeToken;
 -
 -@Local(value={AsyncJobManager.class})
 -public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListener {
 -    public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class.getName());
 +
 +@Component
 +@Local(value={AsyncJobManager.class})
 +public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListener {
 +    public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class.getName());
  	private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; 	// 3 seconds
 -    
 -    private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
 +    
 +    private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
      private static final int HEARTBEAT_INTERVAL = 2000;
      private static final int GC_INTERVAL = 10000;				// 10 seconds
 -    
 -    private String _name;
 -    
 -    private AsyncJobExecutorContext _context;
 -    private SyncQueueManager _queueMgr;
 +    
 +    private String _name;
 +    
 +    private AsyncJobExecutorContext _context;
 +    private SyncQueueManager _queueMgr;
      private ClusterManager _clusterMgr;
 -    private AccountManager _accountMgr;
 +    private AccountManager _accountMgr;
      private AccountDao _accountDao;
      private AsyncJobDao _jobDao;
 -    private long _jobExpireSeconds = 86400;                 // 1 day
 +    private long _jobExpireSeconds = 86400;						// 1 day
-     private long _jobCancelThresholdSeconds = 3600;             // 1 hour
+     private long _jobCancelThresholdSeconds = 3600;         // 1 hour (for cancelling the jobs blocking other jobs)
+     
 -    private ApiDispatcher _dispatcher;
 -
 -    private final ScheduledExecutorService _heartbeatScheduler =
 -            Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
 -    private ExecutorService _executor;
 -
 -    @Override
 -    public AsyncJobExecutorContext getExecutorContext() {
 -        return _context;
 +    private ApiDispatcher _dispatcher;
 +
 +    private final ScheduledExecutorService _heartbeatScheduler =
 +        Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
 +    private ExecutorService _executor;
 +
 +    @Override
 +	public AsyncJobExecutorContext getExecutorContext() {
 +		return _context;
 +	}
 +    	
 +    @Override
 +	public AsyncJobVO getAsyncJob(long jobId) {
 +    	return _jobDao.findById(jobId);
 +    }
 +    
 +    @Override
 +	public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
 +    	return _jobDao.findInstancePendingAsyncJob(instanceType, instanceId);
      }
      
      @Override
@@@ -181,81 -180,81 +182,81 @@@
  
      		if (resultObject != null) {
                  job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject));
 -    		}
 -
 -    		job.setLastUpdated(DateUtil.currentGMTTime());
 -    		_jobDao.update(jobId, job);
 -    		txt.commit();
 -    	} catch(Exception e) {
 -    		s_logger.error("Unexpected exception while completing async job-" + jobId, e);
 -    		txt.rollback();
 -    	}
 -    }
 -
 -    @Override @DB
 -    public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject) {
 +    		}
 +
 +    		job.setLastUpdated(DateUtil.currentGMTTime());
 +    		_jobDao.update(jobId, job);
 +    		txt.commit();
 +    	} catch(Exception e) {
 +    		s_logger.error("Unexpected exception while completing async job-" + jobId, e);
 +    		txt.rollback();
 +    	}
 +    }
 +
 +    @Override @DB
 +    public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject) {
      	if(s_logger.isDebugEnabled()) {
 -            s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus +
 +            s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus +
      			", result: " + resultObject);
 -        }
 -    	
 -    	Transaction txt = Transaction.currentTxn();
 -    	try {
 -    		txt.start();
 -    		AsyncJobVO job = _jobDao.findById(jobId);
 -    		if(job == null) {
 +        }
 +    	
 +    	Transaction txt = Transaction.currentTxn();
 +    	try {
 +    		txt.start();
 +    		AsyncJobVO job = _jobDao.findById(jobId);
 +    		if(job == null) {
      	    	if(s_logger.isDebugEnabled()) {
                      s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
 -                }
 -    			
 -    			txt.rollback();
 -    			return;
 -    		}
 -    		
 -    		job.setProcessStatus(processStatus);
 +                }
 +    			
 +    			txt.rollback();
 +    			return;
 +    		}
 +    		
 +    		job.setProcessStatus(processStatus);
      		if(resultObject != null) {
                  job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject));
 -            }
 -            job.setLastUpdated(DateUtil.currentGMTTime());
 -            _jobDao.update(jobId, job);
 -            txt.commit();
 -        } catch(Exception e) {
 -            s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
 -            txt.rollback();
 -        }
 -    }
 -
 -    @Override @DB
 -    public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
 -        if(s_logger.isDebugEnabled()) {
 -            s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
 -                    ", instanceId: " + instanceId);
 -        }
 -
 -        Transaction txt = Transaction.currentTxn();
 -        try {
 -            txt.start();
 -
 -            AsyncJobVO job = _jobDao.createForUpdate();
 -            //job.setInstanceType(instanceType);
 -            job.setInstanceId(instanceId);
 -            job.setLastUpdated(DateUtil.currentGMTTime());
 -            _jobDao.update(jobId, job);
 -
 -            txt.commit();
 -        } catch(Exception e) {
 -            s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
 -            txt.rollback();
 -        }
 -    }
 -
 -    @Override
 +            }
 +    		job.setLastUpdated(DateUtil.currentGMTTime());
 +    		_jobDao.update(jobId, job);
 +    		txt.commit();
 +    	} catch(Exception e) {
 +    		s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
 +    		txt.rollback();
 +    	}
 +    }
 +
 +    @Override @DB
 +    public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
 +    	if(s_logger.isDebugEnabled()) {
 +            s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
 +    			", instanceId: " + instanceId);
 +        }
 +    	
 +    	Transaction txt = Transaction.currentTxn();
 +    	try {
 +    		txt.start();
 +
 +	    	AsyncJobVO job = _jobDao.createForUpdate();
 +	    	//job.setInstanceType(instanceType);
 +	    	job.setInstanceId(instanceId);
 +			job.setLastUpdated(DateUtil.currentGMTTime());
 +			_jobDao.update(jobId, job);
 +
 +    		txt.commit();
 +    	} catch(Exception e) {
 +    		s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
 +    		txt.rollback();
 +    	}
 +    }
 +
 +    @Override
-     public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId) {
+     public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) {
 -        // This method is re-entrant.  If an API developer wants to synchronized on an object, e.g. the router,
 -        // when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this).
 -        // This method will get called every time their business logic executes.  The first time it exectues for a job
 -        // there will be no sync source, but on subsequent execution there will be a sync souce.  If this is the first
 -        // time the job executes we queue the job, otherwise we just return so that the business logic can execute.
 +    	// This method is re-entrant.  If an API developer wants to synchronized on an object, e.g. the router,
 +    	// when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this).
 +    	// This method will get called every time their business logic executes.  The first time it exectues for a job
 +    	// there will be no sync source, but on subsequent execution there will be a sync souce.  If this is the first
 +    	// time the job executes we queue the job, otherwise we just return so that the business logic can execute.
          if (job.getSyncSource() != null) {
              return;
          }
@@@ -270,9 -269,9 +271,9 @@@
      	// we retry five times until we throw an exception
  		Random random = new Random();
  
 -        for(int i = 0; i < 5; i++) {
 +    	for(int i = 0; i < 5; i++) {
-     		queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId());
+             queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
 -            if(queue != null) {
 +    		if(queue != null) {
                  break;
              }
  
@@@ -599,65 -598,78 +600,78 @@@
  		return new Runnable() {
  			@Override
              public void run() {
 -                GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
 -                try {
 -                    if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
 -                        try {
 -                            reallyRun();
 -                        } finally {
 -                            scanLock.unlock();
 -                        }
 -                    }
 -                } finally {
 -                    scanLock.releaseRef();
 -                }
 -            }
 -            
 +				GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
 +				try {
 +					if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
 +						try {
 +							reallyRun();
 +						} finally {
 +							scanLock.unlock();
 +						}
 +					}
 +				} finally {
 +					scanLock.releaseRef();
 +				}
 +			}
 +			
- 			private void reallyRun() {
+             public void reallyRun() {
 -                try {
 -                    s_logger.trace("Begin cleanup expired async-jobs");
 -
 -                    Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000);
 -
 -                    // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
 -                    // hopefully this will be fast enough to balance potential growth of job table
 -                    List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
 -                    if(l != null && l.size() > 0) {
 -                        for(AsyncJobVO job : l) {
 +				try {
 +					s_logger.trace("Begin cleanup expired async-jobs");
 +					
 +					Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000);
 +					
 +					// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
 +					// hopefully this will be fast enough to balance potential growth of job table
 +					List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
 +					if(l != null && l.size() > 0) {
 +						for(AsyncJobVO job : l) {
- 							_jobDao.expunge(job.getId());
+                             expungeAsyncJob(job);
 -                        }
 -                    }
 -
 +						}
 +					}
 +					
- 					// forcely cancel blocking queue items if they've been staying there for too long
+                     // forcefully cancel blocking queue items if they've been staying there for too long
 -                    List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
 -                    if(blockItems != null && blockItems.size() > 0) {
 -                        for(SyncQueueItemVO item : blockItems) {
 +				    List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
 +				    if(blockItems != null && blockItems.size() > 0) {
 +				        for(SyncQueueItemVO item : blockItems) {
- 				            if(item.getContentType().equalsIgnoreCase("AsyncJob")) {
-                                 completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
+                             if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
+                                 completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0,
+                                         getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
                              }
 -
 -                            // purge the item and resume queue processing
 -                            _queueMgr.purgeItem(item.getId());
 -                        }
 -                    }
 -
 -                    s_logger.trace("End cleanup expired async-jobs");
 -                } catch(Throwable e) {
 -                    s_logger.error("Unexpected exception when trying to execute queue item, ", e);
 -                } finally {
 -                    StackMaid.current().exitCleanup();
 -                }
 -            }
 +				        
 +				            // purge the item and resume queue processing
 +				            _queueMgr.purgeItem(item.getId());
 +				        }
 +				    }
 +					
 +					s_logger.trace("End cleanup expired async-jobs");
 +				} catch(Throwable e) {
 +					s_logger.error("Unexpected exception when trying to execute queue item, ", e);
 +				} finally {
 +					StackMaid.current().exitCleanup();
 +				}
 +			}
+ 
+            
 -        };
 -    }
 -    
 +		};
 +	}
 +	
+     @DB
+     protected void expungeAsyncJob(AsyncJobVO job) {
+         Transaction txn = Transaction.currentTxn();
+         txn.start();
+         _jobDao.expunge(job.getId());
+         //purge corresponding sync queue item
+         _queueMgr.purgeAsyncJobQueueItemId(job.getId());
+         txn.commit();
+     }
+ 
 -    private long getMsid() {
 -        if(_clusterMgr != null) {
 +	private long getMsid() {
 +		if(_clusterMgr != null) {
              return _clusterMgr.getManagementNodeId();
 -        }
 -		
 -		return MacAddress.getMacAddress().toLong();
 -	}
 +        }
 +		
 +		return MacAddress.getMacAddress().toLong();
 +	}
  	
  	private void cleanupPendingJobs(List<SyncQueueItemVO> l) {
  		if(l != null && l.size() > 0) {
@@@ -665,61 -677,61 +679,61 @@@
  				if(s_logger.isInfoEnabled()) {
                      s_logger.info("Discard left-over queue item: " + item.toString());
                  }
 -
 -                String contentType = item.getContentType();
 +				
 +				String contentType = item.getContentType();
- 				if(contentType != null && contentType.equals("AsyncJob")) {
+                 if(contentType != null && contentType.equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
 -                    Long jobId = item.getContentId();
 -                    if(jobId != null) {
 -                        s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
 -                        completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
 -                    }
 -                }
 -                _queueMgr.purgeItem(item.getId());
 -            }
 -        }
 -    }
 -
 -    @Override
 -    public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
 -        _name = name;
 -
 -        ComponentLocator locator = ComponentLocator.getCurrentLocator();
 -
 -        ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
 -        if (configDao == null) {
 -            throw new ConfigurationException("Unable to get the configuration dao.");
 -        }
 -        
 -        int expireMinutes = NumbersUtil.parseInt(
 -                configDao.getValue(Config.JobExpireMinutes.key()), 24*60);
 -        _jobExpireSeconds = (long)expireMinutes*60;
 -
 -        _jobCancelThresholdSeconds = NumbersUtil.parseInt(
 -                configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60);
 -        _jobCancelThresholdSeconds *= 60;
 -
 -        _accountDao = locator.getDao(AccountDao.class);
 -        if (_accountDao == null) {
 -            throw new ConfigurationException("Unable to get " + AccountDao.class.getName());
 -		}
 -		_jobDao = locator.getDao(AsyncJobDao.class);
 -		if (_jobDao == null) {
 -			throw new ConfigurationException("Unable to get "
 -					+ AsyncJobDao.class.getName());
 -		}
 -		
 -		_context = 	locator.getManager(AsyncJobExecutorContext.class);
 -		if (_context == null) {
 -			throw new ConfigurationException("Unable to get "
 -					+ AsyncJobExecutorContext.class.getName());
 +					Long jobId = item.getContentId();
 +					if(jobId != null) {
 +						s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
 +						completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
 +					}
 +				}
 +				_queueMgr.purgeItem(item.getId());
 +			}
  		}
 +	}
 +    
 +    @Override
 +    public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
 +    	_name = name;
 +    
 +		ComponentLocator locator = ComponentLocator.getCurrentLocator();
  		
 -		_queueMgr = locator.getManager(SyncQueueManager.class);
 -		if(_queueMgr == null) {
 -			throw new ConfigurationException("Unable to get "
 -					+ SyncQueueManager.class.getName());
 +		ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
 +		if (configDao == null) {
 +			throw new ConfigurationException("Unable to get the configuration dao.");
  		}
 +
 +		int expireMinutes = NumbersUtil.parseInt(
 +		       configDao.getValue(Config.JobExpireMinutes.key()), 24*60);
 +		_jobExpireSeconds = (long)expireMinutes*60;
  		
 +		_jobCancelThresholdSeconds = NumbersUtil.parseInt(
 +		       configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60);
 +		_jobCancelThresholdSeconds *= 60;
 +
 +		_accountDao = locator.getDao(AccountDao.class);
 +		if (_accountDao == null) {
 +            throw new ConfigurationException("Unable to get " + AccountDao.class.getName());
 +		}
 +		_jobDao = locator.getDao(AsyncJobDao.class);
 +		if (_jobDao == null) {
 +			throw new ConfigurationException("Unable to get "
 +					+ AsyncJobDao.class.getName());
 +		}
 +		
 +		_context = 	locator.getManager(AsyncJobExecutorContext.class);
 +		if (_context == null) {
 +			throw new ConfigurationException("Unable to get "
 +					+ AsyncJobExecutorContext.class.getName());
 +		}
 +		
 +		_queueMgr = locator.getManager(SyncQueueManager.class);
 +		if(_queueMgr == null) {
 +			throw new ConfigurationException("Unable to get "
 +					+ SyncQueueManager.class.getName());
 +		}
 +		
  		_clusterMgr = locator.getManager(ClusterManager.class);
  		
  		_accountMgr = locator.getManager(AccountManager.class);
@@@ -750,46 -762,44 +764,44 @@@
      }
      
      @Override
 -    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
 -        for(ManagementServerHostVO msHost : nodeList) {
 -            Transaction txn = Transaction.open(Transaction.CLOUD_DB);
 -            try {
 -                txn.start();
 -                List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
 -                cleanupPendingJobs(items);
 -                _jobDao.resetJobProcess(msHost.getId(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
 -                txn.commit();
 -            } catch(Throwable e) {
 -                s_logger.warn("Unexpected exception ", e);
 -                txn.rollback();
 -            } finally {
 -                txn.close();
 -            }
 -        }
 +	public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
 +    	for(ManagementServerHostVO msHost : nodeList) {
 +    		Transaction txn = Transaction.open(Transaction.CLOUD_DB);
 +    		try {
 +    			txn.start();
 +    			List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
 +    			cleanupPendingJobs(items);
-         		_queueMgr.resetQueueProcess(msHost.getId());
 +        		_jobDao.resetJobProcess(msHost.getId(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
 +    			txn.commit();
 +    		} catch(Throwable e) {
 +    			s_logger.warn("Unexpected exception ", e);
 +    			txn.rollback();
 +    		} finally {
 +    			txn.close();
 +    		}
 +    	}
      }
      
      @Override
  	public void onManagementNodeIsolated() {
  	}
 -
 -    @Override
 +
 +    @Override
      public boolean start() {
 -        try {
 -            List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
 -            cleanupPendingJobs(l);
 -            _jobDao.resetJobProcess(getMsid(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
 -        } catch(Throwable e) {
 -            s_logger.error("Unexpected exception " + e.getMessage(), e);
 -        }
 -
 -        _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL,
 -                HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
 -        _heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL,
 -                GC_INTERVAL, TimeUnit.MILLISECONDS);
 -
 -        return true;
 +    	try {
 +    		List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
 +    		cleanupPendingJobs(l);
-     		_queueMgr.resetQueueProcess(getMsid());
 +    		_jobDao.resetJobProcess(getMsid(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
 +    	} catch(Throwable e) {
 +    		s_logger.error("Unexpected exception " + e.getMessage(), e);
 +    	}
 +    	
 +    	_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL,
 +			HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
 +    	_heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL,
 +			GC_INTERVAL, TimeUnit.MILLISECONDS);
 +    	
 +        return true;
      }
      
      private static ExceptionResponse getResetResultResponse(String errorMessage) {

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/async/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/async/SyncQueueManagerImpl.java
index f316ff6,4d15065..94fcf0b
--- a/server/src/com/cloud/async/SyncQueueManagerImpl.java
+++ b/server/src/com/cloud/async/SyncQueueManagerImpl.java
@@@ -35,166 -34,173 +35,173 @@@ import com.cloud.utils.db.DB
  import com.cloud.utils.db.Transaction;
  import com.cloud.utils.exception.CloudRuntimeException;
  
- @Component
+ 
 -@Local(value={SyncQueueManager.class})
 -public class SyncQueueManagerImpl implements SyncQueueManager {
 -    public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
 -    
 -    private String _name;
 -    
 -    private SyncQueueDao _syncQueueDao;
 -    private SyncQueueItemDao _syncQueueItemDao;
 -
 -    @Override
 -    @DB
 +@Local(value={SyncQueueManager.class})
 +public class SyncQueueManagerImpl implements SyncQueueManager {
 +    public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
 +    
 +    private String _name;
 +    
 +    private SyncQueueDao _syncQueueDao;
 +    private SyncQueueItemDao _syncQueueItemDao;
 +
 +    @Override
 +    @DB
-     public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId) {
+     public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) {
 -        Transaction txn = Transaction.currentTxn();
 -    	try {
 -    		txn.start();
 -    		
 -    		_syncQueueDao.ensureQueue(syncObjType, syncObjId);
 -    		SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
 -    		if(queueVO == null)
 -    			throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
 +        Transaction txn = Transaction.currentTxn();
 +    	try {
 +    		txn.start();
 +    		
 +    		_syncQueueDao.ensureQueue(syncObjType, syncObjId);
 +    		SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
 +    		if(queueVO == null)
 +    			throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
  
+     		queueVO.setQueueSizeLimit(queueSizeLimit);
+     		_syncQueueDao.update(queueVO.getId(), queueVO);
 -    		
 -			Date dt = DateUtil.currentGMTTime();
 -    		SyncQueueItemVO item = new SyncQueueItemVO();
 -    		item.setQueueId(queueVO.getId());
 -    		item.setContentType(itemType);
 -    		item.setContentId(itemId);
 -    		item.setCreated(dt);
 -    		
 -    		_syncQueueItemDao.persist(item);
 -    		txn.commit();
 -    		
 -    		return queueVO;
 -    	} catch(Exception e) {
 -    		s_logger.error("Unexpected exception: ", e);
 -    		txn.rollback();
 -    	}
 -    	return null;
 -    }
 -    
 -    @Override
 -    @DB
 -    public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
 -    	Transaction txt = Transaction.currentTxn();
 -    	try {
 -    		txt.start();
 -    		
 -    		SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
 -    		if(queueVO == null) {
 -    			s_logger.error("Sync queue(id: " + queueId + ") does not exist");
 -    			txt.commit();
 -    			return null;
 -    		}
 -    		
 +    		
 +			Date dt = DateUtil.currentGMTTime();
 +    		SyncQueueItemVO item = new SyncQueueItemVO();
 +    		item.setQueueId(queueVO.getId());
 +    		item.setContentType(itemType);
 +    		item.setContentId(itemId);
 +    		item.setCreated(dt);
 +    		
 +    		_syncQueueItemDao.persist(item);
 +    		txn.commit();
 +    		
 +    		return queueVO;
 +    	} catch(Exception e) {
 +    		s_logger.error("Unexpected exception: ", e);
 +    		txn.rollback();
 +    	}
 +    	return null;
 +    }
 +    
 +    @Override
 +    @DB
 +    public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
 +    	Transaction txt = Transaction.currentTxn();
 +    	try {
 +    		txt.start();
 +    		
 +    		SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
 +    		if(queueVO == null) {
 +    			s_logger.error("Sync queue(id: " + queueId + ") does not exist");
 +    			txt.commit();
 +    			return null;
 +    		}
 +    		
-     		if(queueVO.getLastProcessTime() == null) {
+     		if(queueReadyToProcess(queueVO)) {
 -    			SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
 -    			if(itemVO != null) {
 -	    			Long processNumber = queueVO.getLastProcessNumber();
 -	    			if(processNumber == null)
 -	    				processNumber = new Long(1);
 -	    			else
 -	    				processNumber = processNumber + 1;
 -	    			Date dt = DateUtil.currentGMTTime();
 -	    			queueVO.setLastProcessNumber(processNumber);
 +    			SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
 +    			if(itemVO != null) {
 +	    			Long processNumber = queueVO.getLastProcessNumber();
 +	    			if(processNumber == null)
 +	    				processNumber = new Long(1);
 +	    			else
 +	    				processNumber = processNumber + 1;
 +	    			Date dt = DateUtil.currentGMTTime();
- 	    			queueVO.setLastProcessMsid(msid);
 +	    			queueVO.setLastProcessNumber(processNumber);
- 	    			queueVO.setLastProcessTime(dt);
  	    			queueVO.setLastUpdated(dt);
+ 	    			queueVO.setQueueSize(queueVO.getQueueSize() + 1);
 -	    			_syncQueueDao.update(queueVO.getId(), queueVO);
 -	    			
 -	    			itemVO.setLastProcessMsid(msid);
 +	    			_syncQueueDao.update(queueVO.getId(), queueVO);
 +	    			
 +	    			itemVO.setLastProcessMsid(msid);
  	    			itemVO.setLastProcessNumber(processNumber);
+ 	    			itemVO.setLastProcessTime(dt);
 -	    			_syncQueueItemDao.update(itemVO.getId(), itemVO);
 -	    			
 -	        		txt.commit();
 -	    			return itemVO;
 -    			} else {
 -        			if(s_logger.isDebugEnabled())
 -        				s_logger.debug("Sync queue (" + queueId + ") is currently empty");
 -    			}
 -    		} else {
 -    			if(s_logger.isDebugEnabled())
 -    				s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
 -    		}
 -    		txt.commit();
 -    	} catch(Exception e) {
 -    		s_logger.error("Unexpected exception: ", e);
 -    		txt.rollback();
 -    	}
 -    	
 -    	return null;
 -    }
 -    
 -    @Override
 -    @DB
 -    public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
 -    	
 -    	List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
 -    	Transaction txt = Transaction.currentTxn();
 -    	try {
 -    		txt.start();
 -    		
 -    		List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
 -    		if(l != null && l.size() > 0) {
 -    			for(SyncQueueItemVO item : l) {
 -    				SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
 -	    			SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
 +	    			_syncQueueItemDao.update(itemVO.getId(), itemVO);
 +	    			
 +	        		txt.commit();
 +	    			return itemVO;
 +    			} else {
 +        			if(s_logger.isDebugEnabled())
 +        				s_logger.debug("Sync queue (" + queueId + ") is currently empty");
 +    			}
 +    		} else {
 +    			if(s_logger.isDebugEnabled())
 +    				s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
 +    		}
 +    		txt.commit();
 +    	} catch(Exception e) {
 +    		s_logger.error("Unexpected exception: ", e);
 +    		txt.rollback();
 +    	}
 +    	
 +    	return null;
 +    }
 +    
 +    @Override
 +    @DB
 +    public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
 +    	
 +    	List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
 +    	Transaction txt = Transaction.currentTxn();
 +    	try {
 +    		txt.start();
 +    		
 +    		List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
 +    		if(l != null && l.size() > 0) {
 +    			for(SyncQueueItemVO item : l) {
 +    				SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
 +	    			SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
-     				if(queueVO.getLastProcessTime() == null && itemVO.getLastProcessNumber() == null) {
+     				if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
 -		    			Long processNumber = queueVO.getLastProcessNumber();
 -		    			if(processNumber == null)
 -		    				processNumber = new Long(1);
 -		    			else
 -		    				processNumber = processNumber + 1;
 -		    			
 -		    			Date dt = DateUtil.currentGMTTime();
 +		    			Long processNumber = queueVO.getLastProcessNumber();
 +		    			if(processNumber == null)
 +		    				processNumber = new Long(1);
 +		    			else
 +		    				processNumber = processNumber + 1;
 +		    			
 +		    			Date dt = DateUtil.currentGMTTime();
- 		    			queueVO.setLastProcessMsid(msid);
  		    			queueVO.setLastProcessNumber(processNumber);
- 		    			queueVO.setLastProcessTime(dt);
  		    			queueVO.setLastUpdated(dt);
+ 	                    queueVO.setQueueSize(queueVO.getQueueSize() + 1);
 -		    			_syncQueueDao.update(queueVO.getId(), queueVO);
 -		    			
 -		    			itemVO.setLastProcessMsid(msid);
 +		    			_syncQueueDao.update(queueVO.getId(), queueVO);
 +		    			
 +		    			itemVO.setLastProcessMsid(msid);
  		    			itemVO.setLastProcessNumber(processNumber);
+ 		    			itemVO.setLastProcessTime(dt);
 -		    			_syncQueueItemDao.update(item.getId(), itemVO);
 -		    			
 -		    			resultList.add(item);
 -    				}
 -    			}
 -    		}
 -    		txt.commit();
 -    		return resultList;
 -    	} catch(Exception e) {
 -    		s_logger.error("Unexpected exception: ", e);
 -    		txt.rollback();
 -    	}
 -    	return null;
 -    }
 -    
 -    @Override
 -    @DB
 -    public void purgeItem(long queueItemId) {
 -    	Transaction txt = Transaction.currentTxn();
 -    	try {
 -    		txt.start();
 -    		
 -			SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
 -			if(itemVO != null) {
 -				SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
 -				
 +		    			_syncQueueItemDao.update(item.getId(), itemVO);
 +		    			
 +		    			resultList.add(item);
 +    				}
 +    			}
 +    		}
 +    		txt.commit();
 +    		return resultList;
 +    	} catch(Exception e) {
 +    		s_logger.error("Unexpected exception: ", e);
 +    		txt.rollback();
 +    	}
 +    	return null;
 +    }
 +    
 +    @Override
 +    @DB
 +    public void purgeItem(long queueItemId) {
 +    	Transaction txt = Transaction.currentTxn();
 +    	try {
 +    		txt.start();
 +    		
 +			SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
 +			if(itemVO != null) {
 +				SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
 +				
  				_syncQueueItemDao.expunge(itemVO.getId());
  				
- 				queueVO.setLastProcessTime(null);
+ 				//if item is active, reset queue information
+ 				if (itemVO.getLastProcessMsid() != null) {
 -				    queueVO.setLastUpdated(DateUtil.currentGMTTime());
 +				queueVO.setLastUpdated(DateUtil.currentGMTTime());
+ 	                //decrement the count
+ 	                assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
+ 	                queueVO.setQueueSize(queueVO.getQueueSize() - 1);
 -	                _syncQueueDao.update(queueVO.getId(), queueVO);
 -				}
 +				_syncQueueDao.update(queueVO.getId(), queueVO);
 +			}
+ 			}
 -    		txt.commit();
 -    	} catch(Exception e) {
 -    		s_logger.error("Unexpected exception: ", e);
 -    		txt.rollback();
 -    	}
 +    		txt.commit();
 +    	} catch(Exception e) {
 +    		s_logger.error("Unexpected exception: ", e);
 +    		txt.rollback();
 +    	}
      }
      
      @Override
@@@ -233,44 -239,50 +240,50 @@@
          return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
      }
      
 -    @Override
 -    public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
 -    	_name = name;
 +    @Override
- 	public void resetQueueProcess(long msid) {
-     	_syncQueueDao.resetQueueProcessing(msid);
-     }
-     
-     @Override
 +    public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
 +    	_name = name;
  		ComponentLocator locator = ComponentLocator.getCurrentLocator();
 -
 -		_syncQueueDao = locator.getDao(SyncQueueDao.class);
 -		if (_syncQueueDao == null) {
 -			throw new ConfigurationException("Unable to get "
 -					+ SyncQueueDao.class.getName());
 -		}
 -		
 -		_syncQueueItemDao = locator.getDao(SyncQueueItemDao.class);
 -		if (_syncQueueItemDao == null) {
 -			throw new ConfigurationException("Unable to get "
 -					+ SyncQueueDao.class.getName());
 +		
 +		_syncQueueDao = locator.getDao(SyncQueueDao.class);
 +		if (_syncQueueDao == null) {
 +			throw new ConfigurationException("Unable to get "
 +					+ SyncQueueDao.class.getName());
  		}
 -    	
 -    	return true;
 -    }
 -    
 -    @Override
 -    public boolean start() {
 -        return true;
 -    }
 -
 -    @Override
 -    public boolean stop() {
 -        return true;
 -    }
 -    
 -    @Override
 -    public String getName() {
 -    	return _name;
 +		
 +		_syncQueueItemDao = locator.getDao(SyncQueueItemDao.class);
 +		if (_syncQueueItemDao == null) {
 +			throw new ConfigurationException("Unable to get "
 +					+ SyncQueueDao.class.getName());
 +		}
 +    	
 +    	return true;
 +    }
 +    
 +    @Override
 +    public boolean start() {
 +        return true;
 +    }
 +
 +    @Override
 +    public boolean stop() {
 +        return true;
      }
      
 +    @Override
 +    public String getName() {
 +    	return _name;
 +    }
- }
 +
+     private boolean queueReadyToProcess(SyncQueueVO queueVO) {
+         return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+     }
+     
+     @Override
+     public void purgeAsyncJobQueueItemId(long asyncJobId) {
+         Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
+         if (itemId != null) {
+             purgeItem(itemId);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
index 599e345,bfe8c0f..7b4c182
--- a/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
+++ b/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
@@@ -40,12 -38,13 +40,13 @@@ public class SyncQueueDaoImpl extends G
      private static final Logger s_logger = Logger.getLogger(SyncQueueDaoImpl.class.getName());
      
      SearchBuilder<SyncQueueVO> TypeIdSearch = createSearchBuilder();
 -
 -    @Override
 -    public void ensureQueue(String syncObjType, long syncObjId) {
 -        Date dt = DateUtil.currentGMTTime();
 +	
 +	@Override
 +	public void ensureQueue(String syncObjType, long syncObjId) {
 +		Date dt = DateUtil.currentGMTTime();
- 		String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated) values(?, ?, ?, ?)";
+         String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated)" +
+                 " values(?, ?, ?, ?)";
 -
 +		
          Transaction txn = Transaction.currentTxn();
          PreparedStatement pstmt = null;
          try {
@@@ -56,40 -55,23 +57,23 @@@
              pstmt.setString(4, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), dt));
              pstmt.execute();
          } catch (SQLException e) {
 -            s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e);
 +        	s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e);
          } catch (Throwable e) {
 -            s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e);
 +        	s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e);
          }
 -    }
 -
 -    @Override
 -    public SyncQueueVO find(String syncObjType, long syncObjId) {
 -        SearchCriteria<SyncQueueVO> sc = TypeIdSearch.create();
 -        sc.setParameters("syncObjType", syncObjType);
 -        sc.setParameters("syncObjId", syncObjId);
 +	}
 +	
 +	@Override
 +	public SyncQueueVO find(String syncObjType, long syncObjId) {
 +    	SearchCriteria<SyncQueueVO> sc = TypeIdSearch.create();
 +    	sc.setParameters("syncObjType", syncObjType);
 +    	sc.setParameters("syncObjId", syncObjId);
          return findOneBy(sc);
 -    }
 +	}
  
- 	@Override @DB
- 	public void resetQueueProcessing(long msid) {
- 		String sql = "UPDATE sync_queue set queue_proc_msid=NULL, queue_proc_time=NULL where queue_proc_msid=?";
- 		
-         Transaction txn = Transaction.currentTxn();
-         PreparedStatement pstmt = null;
-         try {
-             pstmt = txn.prepareAutoCloseStatement(sql);
-             pstmt.setLong(1, msid);
-             pstmt.execute();
-         } catch (SQLException e) {
-         	s_logger.warn("Unable to reset sync queue for management server " + msid, e);
-         } catch (Throwable e) {
-         	s_logger.warn("Unable to reset sync queue for management server " + msid, e);
-         }
- 	}
- 	
 -    protected SyncQueueDaoImpl() {
 -        super();
 -        TypeIdSearch = createSearchBuilder();
 +	protected SyncQueueDaoImpl() {
 +	    super();
 +	    TypeIdSearch = createSearchBuilder();
          TypeIdSearch.and("syncObjType", TypeIdSearch.entity().getSyncObjType(), SearchCriteria.Op.EQ);
          TypeIdSearch.and("syncObjId", TypeIdSearch.entity().getSyncObjId(), SearchCriteria.Op.EQ);
          TypeIdSearch.done();

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
index 6d7342c,5e75756..8ee21f3
--- a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
+++ b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
@@@ -25,58 -26,63 +26,66 @@@ import java.util.List
  import java.util.TimeZone;
  
  import javax.ejb.Local;
 +import javax.inject.Inject;
  
  import org.apache.log4j.Logger;
 +import org.springframework.stereotype.Component;
  
  import com.cloud.async.SyncQueueItemVO;
  import com.cloud.utils.DateUtil;
  import com.cloud.utils.db.Filter;
  import com.cloud.utils.db.GenericDaoBase;
- import com.cloud.utils.db.JoinBuilder;
+ import com.cloud.utils.db.GenericSearchBuilder;
  import com.cloud.utils.db.SearchBuilder;
  import com.cloud.utils.db.SearchCriteria;
+ import com.cloud.utils.db.SearchCriteria.Op;
  import com.cloud.utils.db.Transaction;
  
 +@Component
  @Local(value = { SyncQueueItemDao.class })
  public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
      private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
+     final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
+     
+     protected SyncQueueItemDaoImpl() {
+         super();
+         queueIdSearch = createSearchBuilder(Long.class);
+         queueIdSearch.and("contentId", queueIdSearch.entity().getContentId(), Op.EQ);
+         queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ);
+         queueIdSearch.selectField(queueIdSearch.entity().getId());
+         queueIdSearch.done();
+     }
 +    
-     // private final SyncQueueDao _syncQueueDao = new SyncQueueDaoImpl();
-     @Inject private SyncQueueDao _syncQueueDao;
  
 -
 -    @Override
 -    public SyncQueueItemVO getNextQueueItem(long queueId) {
 -        
 -        SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
 +	@Override
 +	public SyncQueueItemVO getNextQueueItem(long queueId) {
 +		
 +		SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
          sb.and("queueId", sb.entity().getQueueId(), SearchCriteria.Op.EQ);
 -        sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(),    SearchCriteria.Op.NULL);
 +        sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(),	SearchCriteria.Op.NULL);
          sb.done();
          
 -        SearchCriteria<SyncQueueItemVO> sc = sb.create();
 -        sc.setParameters("queueId", queueId);
 -        
 -        Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L);
 +    	SearchCriteria<SyncQueueItemVO> sc = sb.create();
 +    	sc.setParameters("queueId", queueId);
 +    	
 +    	Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L);
          List<SyncQueueItemVO> l = listBy(sc, filter);
          if(l != null && l.size() > 0)
 -            return l.get(0);
 -        
 -        return null;
 -    }
 -
 -    @Override
 -    public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
 -        List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
 -        
 -        String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
 -                     " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
 +        	return l.get(0);
 +    	
 +		return null;
 +	}
 +
 +	@Override
 +	public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
 +		List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
 +		
 +		String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
 +					 " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
- 					 " WHERE q.queue_proc_time IS NULL AND i.queue_proc_number IS NULL " +
+                      " WHERE q.queue_size < q.queue_size_limit AND i.queue_proc_number IS NULL " +
 -                     " GROUP BY q.id " +
 -                     " ORDER BY i.id " +
 -                     " LIMIT 0, ?";
 +					 " GROUP BY q.id " +
 +					 " ORDER BY i.id " +
 +					 " LIMIT 0, ?";
  
          Transaction txn = Transaction.currentTxn();
          PreparedStatement pstmt = null;
@@@ -85,59 -91,55 +94,55 @@@
              pstmt.setInt(1, maxItems);
              ResultSet rs = pstmt.executeQuery();
              while(rs.next()) {
 -                SyncQueueItemVO item = new SyncQueueItemVO();
 -                item.setId(rs.getLong(1));
 -                item.setQueueId(rs.getLong(2));
 -                item.setContentType(rs.getString(3));
 -                item.setContentId(rs.getLong(4));
 -                item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5)));
 -                l.add(item);
 +            	SyncQueueItemVO item = new SyncQueueItemVO();
 +            	item.setId(rs.getLong(1));
 +            	item.setQueueId(rs.getLong(2));
 +            	item.setContentType(rs.getString(3));
 +            	item.setContentId(rs.getLong(4));
 +            	item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5)));
 +            	l.add(item);
              }
          } catch (SQLException e) {
 -            s_logger.error("Unexpected sql excetpion, ", e);
 +        	s_logger.error("Unexpected sql excetpion, ", e);
          } catch (Throwable e) {
 -            s_logger.error("Unexpected excetpion, ", e);
 +        	s_logger.error("Unexpected excetpion, ", e);
          }
 -        return l;
 -    }
 -
 -    @Override
 -    public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
 -        SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
 +		return l;
 +	}
 +	
 +	@Override
 +	public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
 +		SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
          sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(),
 -            SearchCriteria.Op.EQ);
 +    		SearchCriteria.Op.EQ);
          sb.done();
          
 -        SearchCriteria<SyncQueueItemVO> sc = sb.create();
 -        sc.setParameters("lastProcessMsid", msid);
 -        
 -        Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null);
 -        
 -        if(exclusive)
 -            return lockRows(sc, filter, true);
 +    	SearchCriteria<SyncQueueItemVO> sc = sb.create();
 +    	sc.setParameters("lastProcessMsid", msid);
 +    	
 +    	Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null);
 +    	
 +    	if(exclusive)
 +    		return lockRows(sc, filter, true);
          return listBy(sc, filter);
 -    }
 -
 +	}
 +	
+ 
      @Override
      public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
          Date cutTime = DateUtil.currentGMTTime();
-         cutTime = new Date(cutTime.getTime() - thresholdMs);
-         
-         SearchBuilder<SyncQueueVO> sbQueue = _syncQueueDao.createSearchBuilder();
-         sbQueue.and("lastProcessTime", sbQueue.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
-         sbQueue.and("lastProcessTime2", sbQueue.entity().getLastProcessTime(), SearchCriteria.Op.LT);
          
          SearchBuilder<SyncQueueItemVO> sbItem = createSearchBuilder();
-         sbItem.join("queueItemJoinQueue", sbQueue, sbQueue.entity().getId(), sbItem.entity().getQueueId(), JoinBuilder.JoinType.INNER);
          sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL);
          sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL);
+         sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
+         sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT);
 -
 +        
-         sbQueue.done();
          sbItem.done();
 -
 +        
          SearchCriteria<SyncQueueItemVO> sc = sbItem.create();
-         sc.setJoinParameters("queueItemJoinQueue", "lastProcessTime2", cutTime);
+         sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs));
 -
 +        
          if(exclusive)
              return lockRows(sc, null, true);
          return listBy(sc, null);

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/BareMetalDiscoverer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/BareMetalResourceBase.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/BareMetalTemplateAdapter.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/baremetal/BareMetalTemplateAdapter.java
index e61b942,25298a9..cd3f2cb
--- a/server/src/com/cloud/baremetal/BareMetalTemplateAdapter.java
+++ b/server/src/com/cloud/baremetal/BareMetalTemplateAdapter.java
@@@ -20,14 -20,12 +20,14 @@@ import java.util.Date
  import java.util.List;
  
  import javax.ejb.Local;
 +import javax.inject.Inject;
  
+ import org.apache.cloudstack.api.command.user.iso.DeleteIsoCmd;
+ import org.apache.cloudstack.api.command.user.iso.RegisterIsoCmd;
+ import org.apache.cloudstack.api.command.user.template.RegisterTemplateCmd;
  import org.apache.log4j.Logger;
 +import org.springframework.stereotype.Component;
  
- import com.cloud.api.commands.DeleteIsoCmd;
- import com.cloud.api.commands.RegisterIsoCmd;
- import com.cloud.api.commands.RegisterTemplateCmd;
  import com.cloud.configuration.Resource.ResourceType;
  import com.cloud.dc.DataCenterVO;
  import com.cloud.event.EventTypes;

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/BareMetalVmManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/baremetal/BareMetalVmManagerImpl.java
index 02775bd,2e325ec..c419f5a
--- a/server/src/com/cloud/baremetal/BareMetalVmManagerImpl.java
+++ b/server/src/com/cloud/baremetal/BareMetalVmManagerImpl.java
@@@ -22,14 -22,15 +22,19 @@@ import java.util.List
  import java.util.Map;
  import java.util.concurrent.Executors;
  
 +import javax.annotation.PostConstruct;
  import javax.ejb.Local;
 +import javax.inject.Inject;
  import javax.naming.ConfigurationException;
  
+ import org.apache.cloudstack.api.command.user.template.CreateTemplateCmd;
+ import org.apache.cloudstack.api.command.user.vm.DeployVMCmd;
+ import org.apache.cloudstack.api.command.user.vm.UpgradeVMCmd;
+ import org.apache.cloudstack.api.command.user.volume.AttachVolumeCmd;
+ import org.apache.cloudstack.api.command.user.volume.DetachVolumeCmd;
  import org.apache.log4j.Logger;
 +import org.springframework.context.annotation.Primary;
 +import org.springframework.stereotype.Component;
  
  import com.cloud.agent.api.Answer;
  import com.cloud.agent.api.StopAnswer;

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/ExternalDhcpManagerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/baremetal/PxeServerManagerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/cluster/ClusterManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/cluster/ClusterManagerImpl.java
index 19705ec,e341b88..9976bd7
--- a/server/src/com/cloud/cluster/ClusterManagerImpl.java
+++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java
@@@ -801,7 -811,8 +818,8 @@@ public class ClusterManagerImpl impleme
  
                      invalidHeartbeatConnection();
                  } finally {
+                     txn.transitToAutoManagedConnection(Transaction.CLOUD_DB);                         
 -                    txn.close("ClusterHeartBeat");           	
 +                    txn.close("ClusterHeartBeat");
                  }
              }
          };

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/configuration/ConfigurationManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/configuration/ConfigurationManagerImpl.java
index 0a6aaa8,2e20c13..bb943c9
--- a/server/src/com/cloud/configuration/ConfigurationManagerImpl.java
+++ b/server/src/com/cloud/configuration/ConfigurationManagerImpl.java
@@@ -39,33 -38,30 +39,31 @@@ import javax.naming.NamingException
  import javax.naming.directory.DirContext;
  import javax.naming.directory.InitialDirContext;
  
+ import org.apache.cloudstack.api.command.admin.config.UpdateCfgCmd;
+ import org.apache.cloudstack.api.command.admin.ldap.LDAPConfigCmd;
+ import org.apache.cloudstack.api.command.admin.ldap.LDAPRemoveCmd;
+ import org.apache.cloudstack.api.command.admin.network.DeleteNetworkOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.network.CreateNetworkOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.network.UpdateNetworkOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.offering.CreateDiskOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.offering.*;
+ import org.apache.cloudstack.api.command.admin.pod.DeletePodCmd;
+ import org.apache.cloudstack.api.command.admin.pod.UpdatePodCmd;
+ import org.apache.cloudstack.api.command.admin.vlan.CreateVlanIpRangeCmd;
+ import org.apache.cloudstack.api.command.admin.zone.CreateZoneCmd;
+ import org.apache.cloudstack.api.command.admin.zone.DeleteZoneCmd;
+ import org.apache.cloudstack.api.command.admin.zone.UpdateZoneCmd;
+ import org.apache.cloudstack.api.command.admin.offering.CreateServiceOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.offering.DeleteServiceOfferingCmd;
+ import org.apache.cloudstack.api.command.admin.vlan.DeleteVlanIpRangeCmd;
+ import org.apache.cloudstack.api.command.user.network.ListNetworkOfferingsCmd;
  import org.apache.log4j.Logger;
 +import org.springframework.stereotype.Component;
  
- import com.cloud.acl.SecurityChecker;
+ import org.apache.cloudstack.acl.SecurityChecker;
  import com.cloud.alert.AlertManager;
- import com.cloud.api.ApiConstants.LDAPParams;
+ import org.apache.cloudstack.api.ApiConstants.LDAPParams;
  import com.cloud.api.ApiDBUtils;
- import com.cloud.api.commands.CreateDiskOfferingCmd;
- import com.cloud.api.commands.CreateNetworkOfferingCmd;
- import com.cloud.api.commands.CreateServiceOfferingCmd;
- import com.cloud.api.commands.CreateVlanIpRangeCmd;
- import com.cloud.api.commands.CreateZoneCmd;
- import com.cloud.api.commands.DeleteDiskOfferingCmd;
- import com.cloud.api.commands.DeleteNetworkOfferingCmd;
- import com.cloud.api.commands.DeletePodCmd;
- import com.cloud.api.commands.DeleteServiceOfferingCmd;
- import com.cloud.api.commands.DeleteVlanIpRangeCmd;
- import com.cloud.api.commands.DeleteZoneCmd;
- import com.cloud.api.commands.LDAPConfigCmd;
- import com.cloud.api.commands.LDAPRemoveCmd;
- import com.cloud.api.commands.ListNetworkOfferingsCmd;
- import com.cloud.api.commands.UpdateCfgCmd;
- import com.cloud.api.commands.UpdateDiskOfferingCmd;
- import com.cloud.api.commands.UpdateNetworkOfferingCmd;
- import com.cloud.api.commands.UpdatePodCmd;
- import com.cloud.api.commands.UpdateServiceOfferingCmd;
- import com.cloud.api.commands.UpdateZoneCmd;
  import com.cloud.capacity.dao.CapacityDao;
  import com.cloud.configuration.Resource.ResourceType;
  import com.cloud.configuration.dao.ConfigurationDao;

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/configuration/DefaultComponentLibrary.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/dao/EntityManagerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/dc/dao/HostPodDaoImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/dc/dao/HostPodDaoImpl.java
index 9110667,fce308a..a06bd3f
--- a/server/src/com/cloud/dc/dao/HostPodDaoImpl.java
+++ b/server/src/com/cloud/dc/dao/HostPodDaoImpl.java
@@@ -36,8 -36,11 +37,12 @@@ import com.cloud.utils.db.SearchBuilder
  import com.cloud.utils.db.SearchCriteria;
  import com.cloud.utils.db.SearchCriteria.Op;
  import com.cloud.utils.db.Transaction;
+ import com.cloud.utils.component.ComponentLocator;
+ import com.cloud.vm.VMInstanceVO;
+ import com.cloud.vm.VirtualMachine;
+ import com.cloud.vm.dao.VMInstanceDaoImpl;
  
 +@Component
  @Local(value={HostPodDao.class})
  public class HostPodDaoImpl extends GenericDaoBase<HostPodVO, Long> implements HostPodDao {
      private static final Logger s_logger = Logger.getLogger(HostPodDaoImpl.class);
@@@ -63,8 -66,29 +68,29 @@@
  		
  	    return listBy(sc);
  	}
 -
 -    @Override
 +	
 +	@Override
+     public List<HostPodVO> listByDataCenterIdVMTypeAndStates(long id, VirtualMachine.Type type, VirtualMachine.State... states) {
+         final VMInstanceDaoImpl _vmDao = ComponentLocator.inject(VMInstanceDaoImpl.class);
+         SearchBuilder<VMInstanceVO> vmInstanceSearch = _vmDao.createSearchBuilder();
+         vmInstanceSearch.and("type", vmInstanceSearch.entity().getType(), SearchCriteria.Op.EQ);
+         vmInstanceSearch.and("states", vmInstanceSearch.entity().getState(), SearchCriteria.Op.IN);
+ 
+         SearchBuilder<HostPodVO> podIdSearch = createSearchBuilder();
+         podIdSearch.and("dc", podIdSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
+         podIdSearch.select(null, SearchCriteria.Func.DISTINCT, podIdSearch.entity().getId());
+         podIdSearch.join("vmInstanceSearch", vmInstanceSearch, podIdSearch.entity().getId(),
+                 vmInstanceSearch.entity().getPodIdToDeployIn(), JoinBuilder.JoinType.INNER);
+         podIdSearch.done();
+ 
+         SearchCriteria<HostPodVO> sc = podIdSearch.create();
+         sc.setParameters("dc", id);
+         sc.setJoinParameters("vmInstanceSearch", "type", type);
+         sc.setJoinParameters("vmInstanceSearch", "states", (Object[]) states);
+         return listBy(sc);
+     }
+ 
+ 	@Override
      public HostPodVO findByName(String name, long dcId) {
  	    SearchCriteria<HostPodVO> sc = DataCenterAndNameSearch.create();
  	    sc.setParameters("dc", dcId);

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/30f2565d/server/src/com/cloud/deploy/FirstFitPlanner.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/deploy/FirstFitPlanner.java
index 9b5646d,06e46fb..bcc1d26
--- a/server/src/com/cloud/deploy/FirstFitPlanner.java
+++ b/server/src/com/cloud/deploy/FirstFitPlanner.java
@@@ -75,8 -72,8 +74,7 @@@ import com.cloud.storage.dao.VolumeDao
  import com.cloud.user.AccountManager;
  import com.cloud.utils.NumbersUtil;
  import com.cloud.utils.Pair;
- import com.cloud.utils.StringUtils;
  import com.cloud.utils.component.Adapters;
 -import com.cloud.utils.component.Inject;
  import com.cloud.vm.DiskProfile;
  import com.cloud.vm.ReservationContext;
  import com.cloud.vm.VirtualMachine;