You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/06/18 02:24:47 UTC

[2/9] git commit: updated refs/heads/vmsync to 309f8da

Removed AsyncJobConstant


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/ea6ca5ff
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/ea6ca5ff
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/ea6ca5ff

Branch: refs/heads/vmsync
Commit: ea6ca5ff5c928a61c024c242f07cc1f813e2f616
Parents: 2b96665
Author: Alex Huang <al...@gmail.com>
Authored: Fri Jun 14 19:21:25 2013 -0700
Committer: Alex Huang <al...@gmail.com>
Committed: Mon Jun 17 17:04:01 2013 -0700

----------------------------------------------------------------------
 .../apache/cloudstack/api/ResponseObject.java   |   1 +
 api/src/org/apache/cloudstack/jobs/JobInfo.java |   8 +-
 .../com/cloud/vm/VirtualMachineManagerImpl.java |   4 +-
 .../src/com/cloud/vm/VmWorkJobDispatcher.java   |   4 +-
 .../cloudstack/vm/jobs/VmWorkJobDaoImpl.java    |   2 +-
 .../cloudstack/framework/jobs/AsyncJob.java     |   2 +-
 .../framework/jobs/AsyncJobConstants.java       |  34 ------
 .../jobs/AsyncJobExecutionContext.java          |  25 ++---
 .../framework/jobs/AsyncJobManager.java         |  10 +-
 .../framework/jobs/dao/AsyncJobDaoImpl.java     |  12 +--
 .../framework/jobs/dao/AsyncJobJoinMapDao.java  |   5 +-
 .../jobs/dao/AsyncJobJoinMapDaoImpl.java        |  50 +++++----
 .../framework/jobs/impl/AsyncJobJoinMapVO.java  |  11 +-
 .../framework/jobs/impl/AsyncJobMBeanImpl.java  |  66 +++++++-----
 .../framework/jobs/impl/AsyncJobVO.java         |  16 ++-
 .../com/cloud/api/ApiAsyncJobDispatcher.java    |   6 +-
 server/src/com/cloud/api/ApiServer.java         |   2 +-
 .../com/cloud/async/AsyncJobManagerImpl.java    | 108 +++++++++----------
 server/src/com/cloud/async/AsyncJobResult.java  |  14 +--
 .../consoleproxy/ConsoleProxyManagerImpl.java   |   8 +-
 .../com/cloud/server/ManagementServerImpl.java  |   4 +-
 .../secondary/SecondaryStorageManagerImpl.java  |   8 +-
 .../storage/snapshot/SnapshotSchedulerImpl.java |   6 +-
 .../cloud/storage/upload/UploadListener.java    |  10 +-
 .../src/com/cloud/vm/SystemVmLoadScanner.java   |   4 +-
 .../com/cloud/async/TestAsyncJobManager.java    |  13 ++-
 .../vm/VmWorkMockVirtualMachineManagerImpl.java |   2 +-
 .../cloud/vm/VmWorkTestWorkJobDispatcher.java   |   2 +-
 28 files changed, 215 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/api/src/org/apache/cloudstack/api/ResponseObject.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/api/ResponseObject.java b/api/src/org/apache/cloudstack/api/ResponseObject.java
index c8bd457..b696159 100644
--- a/api/src/org/apache/cloudstack/api/ResponseObject.java
+++ b/api/src/org/apache/cloudstack/api/ResponseObject.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.apache.cloudstack.api;
 
+
 public interface ResponseObject {
     /**
      * Get the name of the API response

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/api/src/org/apache/cloudstack/jobs/JobInfo.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/jobs/JobInfo.java b/api/src/org/apache/cloudstack/jobs/JobInfo.java
index bce9627..6a53123 100644
--- a/api/src/org/apache/cloudstack/jobs/JobInfo.java
+++ b/api/src/org/apache/cloudstack/jobs/JobInfo.java
@@ -22,6 +22,12 @@ import org.apache.cloudstack.api.Identity;
 import org.apache.cloudstack.api.InternalIdentity;
 
 public interface JobInfo extends Identity, InternalIdentity {
+    public enum Status {
+        IN_PROGRESS,
+        SUCCEEDED,
+        FAILED,
+        CANCELLED;
+    }
 
     String getType();
 
@@ -39,7 +45,7 @@ public interface JobInfo extends Identity, InternalIdentity {
 
     String getCmdInfo();
 
-    int getStatus();
+    Status getStatus();
 
     int getProcessStatus();
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 4fee49e..0f710d1 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -725,7 +725,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 						return true;
 			
 					VmWorkJobVO workJob = _workJobDao.findById(jobId);
-					if(workJob.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS)
+					if(workJob.getStatus() != JobInfo.Status.IN_PROGRESS)
 						return true;
 					
 					return false;
@@ -1152,7 +1152,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 						return true;
 			
 					VmWorkJobVO workJob = _workJobDao.findById(jobId);
-					if(workJob.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS)
+					if(workJob.getStatus() != JobInfo.Status.IN_PROGRESS)
 						return true;
 					
 					return false;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
index 8c7fd9c..7819c1a 100644
--- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
+++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
@@ -99,10 +99,10 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
                 VmWorkStop stop = (VmWorkStop)work;
                 _vmMgr.orchestrateStop(vm.getUuid(), stop.isForceStop());
             }
-            _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_SUCCEEDED, 0, null);
+            _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null);
         } catch(Throwable e) {
             s_logger.error("Unable to complete " + job, e);
-            _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, 0, e.getMessage());
+            _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, e.getMessage());
         } finally {
             CallContext.unregister();
         }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
index 4ece4e8..0135d81 100644
--- a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
+++ b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
@@ -111,7 +111,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
 	public void expungeCompletedWorkJobs(Date cutDate) {
 		SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
 		sc.setParameters("lastUpdated",cutDate);
-		sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
+		sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
 		
 		expunge(sc);
 	}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
index dfb67f8..be92846 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
@@ -57,7 +57,7 @@ public interface AsyncJob extends JobInfo {
     String getCmdInfo();
     
     @Override
-    int getStatus();
+    Status getStatus();
 
     @Override
     int getProcessStatus();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java
deleted file mode 100644
index 9568eb4..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-public interface AsyncJobConstants {
-	public static final int STATUS_IN_PROGRESS = 0;
-	public static final int STATUS_SUCCEEDED = 1;
-	public static final int STATUS_FAILED = 2;
-	
-	public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
-	public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
-	
-	public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
-	
-	// Although we may have detailed masks for each individual wakeup event, i.e.
-	// periodical timer, matched topic from message bus, it seems that we don't
-	// need to distinguish them to such level. Therefore, only one wakeup signal
-	// is defined
-	public static final int SIGNAL_MASK_WAKEUP = 1;
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
index 3d5c326..ef0a4a6 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
@@ -22,11 +22,11 @@ import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
 import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
 import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
+import org.apache.cloudstack.jobs.JobInfo;
 
 import com.cloud.exception.ConcurrentOperationException;
 import com.cloud.exception.InsufficientCapacityException;
 import com.cloud.exception.ResourceUnavailableException;
-import com.cloud.utils.component.ComponentContext;
 
 public class AsyncJobExecutionContext  {
     private AsyncJob _job;
@@ -57,10 +57,6 @@ public class AsyncJobExecutionContext  {
 	}
 	
     public AsyncJob getJob() {
-		if(_job == null) {
-			_job = _jobMgr.getPseudoJob();
-		}
-		
 		return _job;
 	}
 	
@@ -68,7 +64,7 @@ public class AsyncJobExecutionContext  {
 		_job = job;
 	}
 	
-    public void completeAsyncJob(int jobStatus, int resultCode, Object resultObject) {
+    public void completeAsyncJob(JobInfo.Status jobStatus, int resultCode, Object resultObject) {
     	assert(_job != null);
     	_jobMgr.completeAsyncJob(_job.getId(), jobStatus, resultCode, resultObject);
     }
@@ -114,7 +110,7 @@ public class AsyncJobExecutionContext  {
     	assert(_job != null);
     	
     	AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
-    	if(record.getJoinStatus() == AsyncJobConstants.STATUS_FAILED && record.getJoinResult() != null) {
+    	if(record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) {
     		Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
     		if(exception != null && exception instanceof Exception) {
     			if(exception instanceof InsufficientCapacityException)
@@ -131,12 +127,12 @@ public class AsyncJobExecutionContext  {
     	_jobMgr.disjoinJob(_job.getId(), joinedJobId);
     }
     
-    public void completeJoin(int joinStatus, String joinResult) {
+    public void completeJoin(JobInfo.Status joinStatus, String joinResult) {
     	assert(_job != null);
     	_jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
     }
     
-    public void completeJobAndJoin(int joinStatus, String joinResult) {
+    public void completeJobAndJoin(JobInfo.Status joinStatus, String joinResult) {
     	assert(_job != null);
     	_jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
     	_jobMgr.completeAsyncJob(_job.getId(), joinStatus, 0, null);
@@ -144,21 +140,14 @@ public class AsyncJobExecutionContext  {
 
 	public static AsyncJobExecutionContext getCurrentExecutionContext() {
 		AsyncJobExecutionContext context = s_currentExectionContext.get();
-		if(context == null) {
-			context = new AsyncJobExecutionContext();
-			context = ComponentContext.inject(context);
-			context.getJob();
-			setCurrentExecutionContext(context);
-		}
-		
 		return context;
 	}
 	
-    public static AsyncJobExecutionContext registerPseudoExecutionContext() {
+    public static AsyncJobExecutionContext registerPseudoExecutionContext(long accountId, long userId) {
         AsyncJobExecutionContext context = s_currentExectionContext.get();
         if (context == null) {
             context = new AsyncJobExecutionContext();
-            context.getJob();
+            context.setJob(_jobMgr.getPseudoJob(accountId, userId));
             setCurrentExecutionContext(context);
         }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
index e577fb0..3178268 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
@@ -20,6 +20,7 @@ import java.util.List;
 
 import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.jobs.JobInfo;
 
 import com.cloud.utils.Predicate;
 import com.cloud.utils.component.Manager;
@@ -31,12 +32,9 @@ public interface AsyncJobManager extends Manager {
 	List<? extends AsyncJob> findInstancePendingAsyncJobs(String instanceType, Long accountId);
 	
 	long submitAsyncJob(AsyncJob job);
-	long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext);
 	long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId);
 
-//	AsyncJobResult queryAsyncJobResult(long jobId);
-
-    void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject);
+    void completeAsyncJob(long jobId, JobInfo.Status jobStatus, int resultCode, Object resultObject);
     void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject);
     void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
     void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
@@ -50,7 +48,7 @@ public interface AsyncJobManager extends Manager {
 	 *
 	 * @return pseudo job for the thread
 	 */
-	AsyncJob getPseudoJob();
+    AsyncJob getPseudoJob(long accountId, long userId);
 
     /**
      * Used by upper level job to wait for completion of a down-level job (usually VmWork jobs)
@@ -101,7 +99,7 @@ public interface AsyncJobManager extends Manager {
      * 					for legacy code to work. To help pass exception object easier, we use
      * 					object-stream based serialization instead of GSON
      */
-    void completeJoin(long joinJobId, int joinStatus, String joinResult);
+    void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult);
    
     void releaseSyncSource();
     void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
index c30dbde..96775f7 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -23,8 +23,8 @@ import java.util.List;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.jobs.JobInfo;
 
 import com.cloud.utils.db.DB;
 import com.cloud.utils.db.Filter;
@@ -100,7 +100,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
         SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
         sc.setParameters("instanceType", instanceType);
         sc.setParameters("instanceId", instanceId);
-        sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
+        sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
         
         List<AsyncJobVO> l = listIncludingRemovedBy(sc);
         if(l != null && l.size() > 0) {
@@ -121,7 +121,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
         if (accountId != null) {
             sc.setParameters("accountId", accountId);
         }
-        sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
+        sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
         
         return listBy(sc);
 	}
@@ -129,8 +129,8 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
 	@Override
     public AsyncJobVO findPseudoJob(long threadId, long msid) {
 		SearchCriteria<AsyncJobVO> sc = pseudoJobSearch.create();
-		sc.setParameters("jobDispatcher", AsyncJobConstants.JOB_DISPATCHER_PSEUDO);
-		sc.setParameters("instanceType", AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE);
+        sc.setParameters("jobDispatcher", AsyncJobVO.JOB_DISPATCHER_PSEUDO);
+        sc.setParameters("instanceType", AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
 		sc.setParameters("instanceId", threadId);
 		
 		List<AsyncJobVO> result = listBy(sc);
@@ -178,7 +178,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
 	@Override
     @DB
 	public void resetJobProcess(long msid, int jobResultCode, String jobResultMessage) {
-		String sql = "UPDATE async_job SET job_status=" + AsyncJobConstants.STATUS_FAILED + ", job_result_code=" + jobResultCode
+		String sql = "UPDATE async_job SET job_status=" + JobInfo.Status.FAILED + ", job_result_code=" + jobResultCode
 			+ ", job_result='" + jobResultMessage + "' where job_status=0 AND (job_complete_msid=? OR (job_complete_msid IS NULL AND job_init_msid=?))";
 		
         Transaction txn = Transaction.currentTxn();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
index a9e82a7..9c993f1 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
@@ -19,12 +19,13 @@ package org.apache.cloudstack.framework.jobs.dao;
 import java.util.List;
 
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.jobs.JobInfo;
 
 import com.cloud.utils.db.GenericDao;
 
 public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long> {
 	
-	Long joinJob(long jobId, long joinJobId, long joinMsid, 
+	Long joinJob(long jobId, long joinJobId, long joinMsid,
 		long wakeupIntervalMs, long expirationMs,
 		Long syncSourceId, String wakeupHandler, String wakeupDispatcher);
 	void disjoinJob(long jobId, long joinedJobId);
@@ -33,7 +34,7 @@ public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long>
 	AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId);
 	List<AsyncJobJoinMapVO> listJoinRecords(long jobId);
 	
-	void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid);
+    void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult, long completeMsid);
 	
 	List<Long> wakeupScan();
 	List<Long> wakeupByJoinedJobCompletion(long joinedJobId);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
index 4cc2218..60dea03 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -26,23 +26,23 @@ import java.util.TimeZone;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.jobs.JobInfo;
 
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
 import com.cloud.utils.db.Transaction;
 import com.cloud.utils.db.UpdateBuilder;
-import com.cloud.utils.db.SearchCriteria.Op;
 
 public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Long> implements AsyncJobJoinMapDao {
     public static final Logger s_logger = Logger.getLogger(AsyncJobJoinMapDaoImpl.class);
 	
-	private final SearchBuilder<AsyncJobJoinMapVO> RecordSearch;	
-	private final SearchBuilder<AsyncJobJoinMapVO> RecordSearchByOwner;	
-	private final SearchBuilder<AsyncJobJoinMapVO> CompleteJoinSearch;	
+	private final SearchBuilder<AsyncJobJoinMapVO> RecordSearch;
+	private final SearchBuilder<AsyncJobJoinMapVO> RecordSearchByOwner;
+	private final SearchBuilder<AsyncJobJoinMapVO> CompleteJoinSearch;
 	private final SearchBuilder<AsyncJobJoinMapVO> WakeupSearch;
 	
 	public AsyncJobJoinMapDaoImpl() {
@@ -66,7 +66,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
 		WakeupSearch.done();
 	}
 	
-	public Long joinJob(long jobId, long joinJobId, long joinMsid, 
+	@Override
+    public Long joinJob(long jobId, long joinJobId, long joinMsid,
 		long wakeupIntervalMs, long expirationMs,
 		Long syncSourceId, String wakeupHandler, String wakeupDispatcher) {
 		
@@ -74,7 +75,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
 		record.setJobId(jobId);
 		record.setJoinJobId(joinJobId);
 		record.setJoinMsid(joinMsid);
-		record.setJoinStatus(AsyncJobConstants.STATUS_IN_PROGRESS);
+		record.setJoinStatus(JobInfo.Status.IN_PROGRESS);
 		record.setSyncSourceId(syncSourceId);
 		record.setWakeupInterval(wakeupIntervalMs / 1000);		// convert millisecond to second
 		record.setWakeupHandler(wakeupHandler);
@@ -84,11 +85,12 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
 			record.setExpiration(new Date(DateUtil.currentGMTTime().getTime() + expirationMs));
 		}
 		
-		this.persist(record);
+		persist(record);
 		return record.getId();
 	}
 	
-	public void disjoinJob(long jobId, long joinedJobId) {
+	@Override
+    public void disjoinJob(long jobId, long joinedJobId) {
 		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
 		sc.setParameters("jobId", jobId);
 		sc.setParameters("joinJobId", joinedJobId);
@@ -96,14 +98,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
 		this.expunge(sc);
 	}
 	
-	public void disjoinAllJobs(long jobId) {
+	@Override
+    public void disjoinAllJobs(long jobId) {
 		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearchByOwner.create();
 		sc.setParameters("jobId", jobId);
 		
 		this.expunge(sc);
 	}
 	
-	public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) {
+	@Override
+    public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) {
 		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
 		sc.setParameters("jobId", jobId);
 		sc.setParameters("joinJobId", joinJobId);
@@ -117,14 +121,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
 		return null;
 	}
 	
-	public List<AsyncJobJoinMapVO> listJoinRecords(long jobId) {
+	@Override
+    public List<AsyncJobJoinMapVO> listJoinRecords(long jobId) {
 		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearchByOwner.create();
 		sc.setParameters("jobId", jobId);
 		
 		return this.listBy(sc);
 	}
 	
-	public void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid) {
+    @Override
+    public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult, long completeMsid) {
         AsyncJobJoinMapVO record = createForUpdate();
         record.setJoinStatus(joinStatus);
         record.setJoinResult(joinResult);
@@ -138,7 +144,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
         update(ub, sc, null);
 	}
 
-	public List<Long> wakeupScan() {
+	@Override
+    public List<Long> wakeupScan() {
 		List<Long> standaloneList = new ArrayList<Long>();
 		
 		Date cutDate = DateUtil.currentGMTTime();
@@ -149,9 +156,9 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
 			txn.start();
 			
 			//
-			// performance sensitive processing, do it in plain SQL 
+			// performance sensitive processing, do it in plain SQL
 			//
-			String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +  
+			String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
 					"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
 			pstmt = txn.prepareStatement(sql);
 	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
@@ -159,7 +166,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
 	        pstmt.executeUpdate();
 	        pstmt.close();
 			
-			sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " + 
+			sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
 					"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
 			pstmt = txn.prepareStatement(sql);
 	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
@@ -194,7 +201,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
         return standaloneList;
 	}
 	
-	public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
+	@Override
+    public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
 		List<Long> standaloneList = new ArrayList<Long>();
 		
 		Transaction txn = Transaction.currentTxn();
@@ -203,16 +211,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
 			txn.start();
 			
 			//
-			// performance sensitive processing, do it in plain SQL 
+			// performance sensitive processing, do it in plain SQL
 			//
-			String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +  
+			String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
 					"(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
 			pstmt = txn.prepareStatement(sql);
 	        pstmt.setLong(1, joinedJobId);
 	        pstmt.executeUpdate();
 	        pstmt.close();
 			
-			sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " + 
+			sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
 					"(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
 			pstmt = txn.prepareStatement(sql);
 	        pstmt.setLong(1, joinedJobId);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
index 0bcdc3b..287121f 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
@@ -20,6 +20,8 @@ import java.util.Date;
 
 import javax.persistence.Column;
 import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
@@ -27,6 +29,8 @@ import javax.persistence.Table;
 import javax.persistence.Temporal;
 import javax.persistence.TemporalType;
 
+import org.apache.cloudstack.jobs.JobInfo;
+
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.db.GenericDao;
 
@@ -45,7 +49,8 @@ public class AsyncJobJoinMapVO {
 	private long joinJobId;
 
     @Column(name="join_status")
-    private int joinStatus;
+    @Enumerated(EnumType.ORDINAL)
+    private JobInfo.Status joinStatus;
     
     @Column(name="join_result", length=1024)
     private String joinResult;
@@ -112,11 +117,11 @@ public class AsyncJobJoinMapVO {
 		this.joinJobId = joinJobId;
 	}
 
-	public int getJoinStatus() {
+    public JobInfo.Status getJoinStatus() {
 		return joinStatus;
 	}
 
-	public void setJoinStatus(int joinStatus) {
+    public void setJoinStatus(JobInfo.Status joinStatus) {
 		this.joinStatus = joinStatus;
 	}
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
index 95f01e5..0a48da3 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
@@ -22,13 +22,12 @@ import java.util.TimeZone;
 import javax.management.StandardMBean;
 
 import org.apache.cloudstack.framework.jobs.AsyncJob;
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
 import org.apache.cloudstack.framework.jobs.AsyncJobMBean;
 
 import com.cloud.utils.DateUtil;
 
 public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
-	private AsyncJob _job;
+	private final AsyncJob _job;
 	
 	public AsyncJobMBeanImpl(AsyncJob job) {
 		super(AsyncJobMBean.class, false);
@@ -36,84 +35,100 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
 		_job = job;
 	}
 	
-	public long getAccountId() {
+	@Override
+    public long getAccountId() {
 		return _job.getAccountId();
 	}
 	
-	public long getUserId() {
+	@Override
+    public long getUserId() {
 		return _job.getUserId();
 	}
 	
-	public String getCmd() {
+	@Override
+    public String getCmd() {
 		return _job.getCmd();
 	}
 	
-	public String getCmdInfo() {
+	@Override
+    public String getCmdInfo() {
 		return _job.getCmdInfo();
 	}
 	
-	public String getStatus() {
-		int jobStatus = _job.getStatus();
-		switch(jobStatus) {
-		case AsyncJobConstants.STATUS_SUCCEEDED :
+	@Override
+    public String getStatus() {
+        switch (_job.getStatus()) {
+        case SUCCEEDED:
 			return "Completed";
 		
-		case AsyncJobConstants.STATUS_IN_PROGRESS:
+        case IN_PROGRESS:
 			return "In preogress";
 			
-		case AsyncJobConstants.STATUS_FAILED:
+        case FAILED:
 			return "failed";
+			
+        case CANCELLED:
+            return "cancelled";
 		}
 		
 		return "Unknow";
 	}
 	
-	public int getProcessStatus() {
+	@Override
+    public int getProcessStatus() {
 		return _job.getProcessStatus();
 	}
 	
-	public int getResultCode() {
+	@Override
+    public int getResultCode() {
 		return _job.getResultCode();
 	}
 	
-	public String getResult() {
+	@Override
+    public String getResult() {
 		return _job.getResult();
 	}
 	
-	public String getInstanceType() {
+	@Override
+    public String getInstanceType() {
 		if(_job.getInstanceType() != null)
 			return _job.getInstanceType().toString();
 		return "N/A";
 	}
 	
-	public String getInstanceId() {
+	@Override
+    public String getInstanceId() {
 		if(_job.getInstanceId() != null)
 			return String.valueOf(_job.getInstanceId());
 		return "N/A";
 	}
 	
-	public String getInitMsid() {
+	@Override
+    public String getInitMsid() {
 		if(_job.getInitMsid() != null) {
 			return String.valueOf(_job.getInitMsid());
 		}
 		return "N/A";
 	}
 	
-	public String getCreateTime() {
+	@Override
+    public String getCreateTime() {
 		Date time = _job.getCreated();
 		if(time != null)
 			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
 		return "N/A";
 	}
 	
-	public String getLastUpdateTime() {
+	@Override
+    public String getLastUpdateTime() {
 		Date time = _job.getLastUpdated();
 		if(time != null)
 			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
 		return "N/A";
 	}
 	
-	public String getLastPollTime() {
+	@Override
+    public String getLastPollTime() {
 		Date time = _job.getLastPolled();
 	
 		if(time != null)
@@ -121,7 +136,8 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
 		return "N/A";
 	}
 	
-	public String getSyncQueueId() {
+	@Override
+    public String getSyncQueueId() {
 		SyncQueueItem item = _job.getSyncSource();
 		if(item != null && item.getQueueId() != null) {
 			return String.valueOf(item.getQueueId());
@@ -129,7 +145,8 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
 		return "N/A";
 	}
 	
-	public String getSyncQueueContentType() {
+	@Override
+    public String getSyncQueueContentType() {
 		SyncQueueItem item = _job.getSyncSource();
 		if(item != null) {
 			return item.getContentType();
@@ -137,7 +154,8 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
 		return "N/A";
 	}
 	
-	public String getSyncQueueContentId() {
+	@Override
+    public String getSyncQueueContentId() {
 		SyncQueueItem item = _job.getSyncSource();
 		if(item != null && item.getContentId() != null) {
 			return String.valueOf(item.getContentId());

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
index 0e103b9..cab047b 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
@@ -23,6 +23,8 @@ import javax.persistence.Column;
 import javax.persistence.DiscriminatorColumn;
 import javax.persistence.DiscriminatorType;
 import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
@@ -44,11 +46,14 @@ import com.cloud.utils.db.GenericDao;
 @Inheritance(strategy=InheritanceType.JOINED)
 @DiscriminatorColumn(name="job_type", discriminatorType=DiscriminatorType.STRING, length=32)
 public class AsyncJobVO implements AsyncJob, JobInfo {
+
+    public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
+    public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
 	
 	@Id
     @GeneratedValue(strategy=GenerationType.IDENTITY)
     @Column(name="id")
-    private Long id = null;
+    private long id;
 	
     @Column(name="job_type", length=32)
     protected String type;
@@ -78,7 +83,8 @@ public class AsyncJobVO implements AsyncJob, JobInfo {
     private String cmdInfo;
   
     @Column(name="job_status")
-    private int status;
+    @Enumerated(value = EnumType.ORDINAL)
+    private Status status;
     
     @Column(name="job_process_status")
     private int processStatus;
@@ -145,7 +151,7 @@ public class AsyncJobVO implements AsyncJob, JobInfo {
 		return id;
 	}
 
-	public void setId(Long id) {
+    public void setId(long id) {
 		this.id = id;
 	}
 	
@@ -236,11 +242,11 @@ public class AsyncJobVO implements AsyncJob, JobInfo {
 	}
 	
 	@Override
-    public int getStatus() {
+    public Status getStatus() {
 		return status;
 	}
 
-	public void setStatus(int status) {
+    public void setStatus(Status status) {
 		this.status = status;
 	}
 	

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
index c442559..e0823dc 100644
--- a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
+++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
@@ -33,9 +33,9 @@ import org.apache.cloudstack.api.ServerApiException;
 import org.apache.cloudstack.api.response.ExceptionResponse;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.jobs.AsyncJob;
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
 import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.jobs.JobInfo;
 
 import com.cloud.user.Account;
 import com.cloud.user.dao.AccountDao;
@@ -93,7 +93,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
                 _dispatcher.dispatch(cmdObj, params, true);
 
                 // serialize this to the async job table
-                _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject());
+                _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, cmdObj.getResponseObject());
             } finally {
                 CallContext.unregister();
             }
@@ -116,7 +116,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
 
             // FIXME:  setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling
             //         and we need to preserve that as much as possible here
-            _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response);
+            _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response);
         }
 	}
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/api/ApiServer.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java
index ae5be33..be161c6 100755
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@ -605,7 +605,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
                 if (response.getObjectId() != null && objectJobMap.containsKey(response.getObjectId())) {
                     AsyncJob job = objectJobMap.get(response.getObjectId());
                     response.setJobId(job.getUuid());
-                    response.setJobStatus(job.getStatus());
+                    response.setJobStatus(job.getStatus().ordinal());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 332587a..2e3a9a0 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -38,9 +38,9 @@ import org.apache.log4j.Logger;
 import org.apache.cloudstack.api.ApiErrorCode;
 import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
 import org.apache.cloudstack.api.response.ExceptionResponse;
+import org.apache.cloudstack.config.ConfigRepo;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.jobs.AsyncJob;
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
 import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
 import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
@@ -60,11 +60,14 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
 import org.apache.cloudstack.framework.messagebus.MessageBus;
 import org.apache.cloudstack.framework.messagebus.MessageDetector;
 import org.apache.cloudstack.framework.messagebus.PublishScope;
+import org.apache.cloudstack.jobs.JobInfo;
+import org.apache.cloudstack.jobs.JobInfo.Status;
 
 import com.cloud.api.ApiSerializerHelper;
 import com.cloud.cluster.ClusterManager;
 import com.cloud.cluster.ClusterManagerListener;
 import com.cloud.cluster.ManagementServerHost;
+import com.cloud.cluster.ManagementServerNode;
 import com.cloud.configuration.Config;
 import com.cloud.configuration.dao.ConfigurationDao;
 import com.cloud.exception.InvalidParameterValueException;
@@ -76,7 +79,6 @@ import com.cloud.utils.DateUtil;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.Predicate;
 import com.cloud.utils.PropertiesUtil;
-import com.cloud.utils.component.ComponentContext;
 import com.cloud.utils.component.ManagerBase;
 import com.cloud.utils.concurrency.NamedThreadFactory;
 import com.cloud.utils.db.DB;
@@ -87,11 +89,16 @@ import com.cloud.utils.db.Transaction;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.exception.ExceptionUtil;
 import com.cloud.utils.mgmt.JmxUtil;
-import com.cloud.utils.net.MacAddress;
 
 public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener {
-    public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
+    private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
     private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; 	// 3 seconds
+    private static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
+    // Although we may have detailed masks for each individual wakeup event, i.e.
+    // periodical timer, matched topic from message bus, it seems that we don't
+    // need to distinguish them to such level. Therefore, only one wakeup signal
+    // is defined
+    public static final int SIGNAL_MASK_WAKEUP = 1;
 
     private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
     private static final int HEARTBEAT_INTERVAL = 2000;
@@ -107,6 +114,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     @Inject private List<AsyncJobDispatcher> _jobDispatchers;
     @Inject private MessageBus _messageBus;
     @Inject private AsyncJobMonitor _jobMonitor;
+    @Inject
+    private ConfigRepo _configRepo;
 
 	private long _jobExpireSeconds = 86400;						// 1 day
     private long _jobCancelThresholdSeconds = 3600;         	// 1 hour (for cancelling the jobs blocking other jobs)
@@ -128,15 +137,15 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     }
     
     @Override @DB
-	public AsyncJob getPseudoJob() {
+    public AsyncJob getPseudoJob(long accountId, long userId) {
     	AsyncJobVO job = _jobDao.findPseudoJob(Thread.currentThread().getId(), getMsid());
     	if(job == null) {
 	    	job = new AsyncJobVO();
-	    	job.setAccountId(_accountMgr.getSystemAccount().getId());
-	    	job.setUserId(_accountMgr.getSystemUser().getId());
+            job.setAccountId(accountId);
+            job.setUserId(userId);
 	    	job.setInitMsid(getMsid());
-	    	job.setDispatcher(AsyncJobConstants.JOB_DISPATCHER_PSEUDO);
-	    	job.setInstanceType(AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE);
+            job.setDispatcher(AsyncJobVO.JOB_DISPATCHER_PSEUDO);
+            job.setInstanceType(AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
 	    	job.setInstanceId(Thread.currentThread().getId());
 	    	_jobDao.persist(job);
     	}
@@ -149,31 +158,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     }
 
     @SuppressWarnings("unchecked")
-	@Override @DB
+    @DB
     public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext) {
-        Transaction txt = Transaction.currentTxn();
-        try {
-        	@SuppressWarnings("rawtypes")
-			GenericDao dao = GenericDaoBase.getDao(job.getClass());
-        	
-            txt.start();
-            job.setInitMsid(getMsid());
-            dao.persist(job);
-            txt.commit();
-
-            // no sync source originally
-            job.setSyncSource(null);
-            scheduleExecution(job, scheduleJobExecutionInContext);
-            if(s_logger.isDebugEnabled()) {
-                s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
-            }
-            return job.getId();
-        } catch(Exception e) {
-            txt.rollback();
-            String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception.";
-            s_logger.warn(errMsg, e);
-            throw new CloudRuntimeException(errMsg);
+        @SuppressWarnings("rawtypes")
+        GenericDao dao = GenericDaoBase.getDao(job.getClass());
+        job.setInitMsid(getMsid());
+        job.setSyncSource(null);        // no sync source originally
+        dao.persist(job);
+
+        scheduleExecution(job, scheduleJobExecutionInContext);
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
         }
+        return job.getId();
     }
 
     @SuppressWarnings("unchecked")
@@ -199,7 +196,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
 	}
     
     @Override @DB
-    public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject) {
+    public void completeAsyncJob(long jobId, Status jobStatus, int resultCode, Object resultObject) {
         if(s_logger.isDebugEnabled()) {
             s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus +
                     ", resultCode: " + resultCode + ", result: " + resultObject);
@@ -219,7 +216,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                 return;
             }
             
-            if(job.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS) {
+            if(job.getStatus() != JobInfo.Status.IN_PROGRESS) {
                 if(s_logger.isDebugEnabled()) {
                     s_logger.debug("job-" + jobId + " is already completed.");
                 }
@@ -251,7 +248,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
             for(Long id : wakeupList) {
             	// TODO, we assume that all jobs in this category is API job only
             	AsyncJobVO jobToWakeup = _jobDao.findById(id);
-            	if(jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0)
+                if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
             	    scheduleExecution(jobToWakeup, false);
             }
              
@@ -358,7 +355,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     }
     
     @Override @DB
-    public void completeJoin(long joinJobId, int joinStatus, String joinResult) {
+    public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult) {
     	_joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid());
     }
     
@@ -436,8 +433,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                 jobResult.setResultCode(job.getResultCode());
                 jobResult.setUuid(job.getUuid());
 
-                if(job.getStatus() == AsyncJobConstants.STATUS_SUCCEEDED ||
-                        job.getStatus() == AsyncJobConstants.STATUS_FAILED) {
+                if(job.getStatus() == JobInfo.Status.SUCCEEDED ||
+                        job.getStatus() == JobInfo.Status.FAILED) {
 
                     if(s_logger.isDebugEnabled()) {
                         s_logger.debug("Async job-" + jobId + " completed");
@@ -451,14 +448,14 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                     s_logger.debug("Async job-" + jobId + " does not exist, invalid job id?");
                 }
 
-                jobResult.setJobStatus(AsyncJobConstants.STATUS_FAILED);
+                jobResult.setJobStatus(JobInfo.Status.FAILED);
                 jobResult.setResult("job-" + jobId + " does not exist");
             }
             txt.commit();
         } catch(Exception e) {
             s_logger.error("Unexpected exception while querying async job-" + jobId + " status: ", e);
 
-            jobResult.setJobStatus(AsyncJobConstants.STATUS_FAILED);
+            jobResult.setJobStatus(JobInfo.Status.FAILED);
             jobResult.setResult("Exception: " + e.toString());
             txt.rollback();
         }
@@ -475,7 +472,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     }
 
     private void scheduleExecution(final AsyncJob job, boolean executeInContext) {
-        Runnable runnable = getExecutorRunnable(this, job);
+        Runnable runnable = getExecutorRunnable(job);
         if (executeInContext) {
             runnable.run();
         } else {
@@ -516,7 +513,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     	}
     }
     
-    private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJob job) {
+    private Runnable getExecutorRunnable(final AsyncJob job) {
         return new Runnable() {
             @Override
             public void run() {
@@ -539,20 +536,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                     }
                     
                     _jobMonitor.registerActiveTask(runNumber, job.getId());
-                    AsyncJobExecutionContext.setCurrentExecutionContext((AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job))
-                    );
+                    AsyncJobExecutionContext.setCurrentExecutionContext(new AsyncJobExecutionContext(job));
                     
                     // execute the job
                     if(s_logger.isDebugEnabled()) {
                         s_logger.debug("Executing " + job);
                     }
 
-                    if((getAndResetPendingSignals(job) & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0) {
+                    if ((getAndResetPendingSignals(job) & SIGNAL_MASK_WAKEUP) != 0) {
                     	AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
                     	if(jobDispatcher != null) {
                     		jobDispatcher.runJob(job);
                     	} else {
-                    		s_logger.error("Unable to find a wakeup dispatcher from the joined job. job-" + job.getId());
+                    		s_logger.error("Unable to find a wakeup dispatcher from the joined job: " + job);
                     	}
                     } else {
 	                    AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher());
@@ -560,7 +556,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
 	                    	jobDispatcher.runJob(job);
 	                    } else {
 	                    	s_logger.error("Unable to find job dispatcher, job will be cancelled");
-	                        completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
+	                        completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
 	                    }
                     }
                     
@@ -570,7 +566,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                    
             	} catch (Throwable e) {
             		s_logger.error("Unexpected exception", e);
-                    completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
+                    completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
             	} finally {
             		// guard final clause as well
                     try {
@@ -598,7 +594,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                     	//
                     	// clean execution environment
                     	//
-                        AsyncJobExecutionContext.setCurrentExecutionContext(null);
+                        AsyncJobExecutionContext.unregister();
                         _jobMonitor.unregisterActiveTask(runNumber);
 	                    
                     } catch(Throwable e) {
@@ -727,7 +723,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                     for(Long jobId : standaloneWakeupJobs) {
                     	// TODO, we assume that all jobs in this category is API job only
                     	AsyncJobVO job = _jobDao.findById(jobId);
-                    	if(job != null && (job.getPendingSignals() & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0)
+                        if (job != null && (job.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
                     	    scheduleExecution(job, false);
                     }
                 } catch(Throwable e) {
@@ -789,7 +785,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                     if(blockItems != null && blockItems.size() > 0) {
                         for(SyncQueueItemVO item : blockItems) {
                             if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
-                                completeAsyncJob(item.getContentId(), AsyncJobConstants.STATUS_FAILED, 0,
+                                completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0,
                                         getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
                             }
 
@@ -819,11 +815,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     }
 
     private long getMsid() {
-        if(_clusterMgr != null) {
-            return _clusterMgr.getManagementNodeId();
-        }
-
-        return MacAddress.getMacAddress().toLong();
+        return ManagementServerNode.getManagementServerId();
     }
 
     private void cleanupPendingJobs(List<SyncQueueItemVO> l) {
@@ -838,7 +830,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                     Long jobId = item.getContentId();
                     if(jobId != null) {
                         s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
-                        completeAsyncJob(jobId, AsyncJobConstants.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
+                        completeAsyncJob(jobId, JobInfo.Status.FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
                     }
                 }
                 _queueMgr.purgeItem(item.getId());
@@ -864,7 +856,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
             int poolSize = (cloudMaxActive * 2) / 3;
 
             s_logger.info("Start AsyncJobManager thread pool in size " + poolSize);
-            _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobConstants.JOB_POOL_THREAD_PREFIX));
+            _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(JOB_POOL_THREAD_PREFIX));
         } catch (final Exception e) {
             throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
         }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/async/AsyncJobResult.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobResult.java b/server/src/com/cloud/async/AsyncJobResult.java
index 783655e..d71e64b 100644
--- a/server/src/com/cloud/async/AsyncJobResult.java
+++ b/server/src/com/cloud/async/AsyncJobResult.java
@@ -16,14 +16,14 @@
 // under the License.
 package com.cloud.async;
 
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.jobs.JobInfo;
 
 import com.cloud.api.ApiSerializerHelper;
 
 public class AsyncJobResult {
 	
 	private long jobId;
-	private int jobStatus;
+    private JobInfo.Status jobStatus;
 	private int processStatus;
 	private int resultCode;
 	private String result;
@@ -31,7 +31,7 @@ public class AsyncJobResult {
 
 	public AsyncJobResult(long jobId) {
 		this.jobId = jobId;
-		jobStatus = AsyncJobConstants.STATUS_IN_PROGRESS;
+		jobStatus = JobInfo.Status.IN_PROGRESS;
 		processStatus = 0;
 		resultCode = 0;
 		result = "";
@@ -46,18 +46,18 @@ public class AsyncJobResult {
 	}
 	
 	public String getUuid() {
-		return this.uuid;
+		return uuid;
 	}
 	
 	public void setUuid(String uuid) {
 		this.uuid = uuid;
 	}
 	
-	public int getJobStatus() {
+    public JobInfo.Status getJobStatus() {
 		return jobStatus;
 	}
 	
-	public void setJobStatus(int jobStatus) {
+    public void setJobStatus(JobInfo.Status jobStatus) {
 		this.jobStatus = jobStatus;
 	}
 	
@@ -97,7 +97,7 @@ public class AsyncJobResult {
     public String toString() {
 		StringBuffer sb = new StringBuffer();
 		sb.append("AsyncJobResult {jobId:").append(getJobId());
-		sb.append(", jobStatus: ").append(getJobStatus());
+        sb.append(", jobStatus: ").append(getJobStatus().ordinal());
 		sb.append(", processStatus: ").append(getProcessStatus());
 		sb.append(", resultCode: ").append(getResultCode());
 		sb.append(", result: ").append(result);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
index f6bddf0..513a713 100755
--- a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
+++ b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
@@ -1728,11 +1728,11 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy
 //	    	_itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(),
 //	    		user, account,  ((VmWorkStart)work).getPlan());
 //
-//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
+//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
 //    	} catch(Exception e) {
 //    		s_logger.error("Exception in process VM-start work", e);
 //    		String result = SerializerHelper.toObjectSerializedString(e);
-//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
+//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
 //    	}
 //    }
 //
@@ -1748,11 +1748,11 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy
 //    	try {
 //	    	_itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account);
 //
-//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
+//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
 //    	} catch(Exception e) {
 //    		s_logger.error("Exception in process VM-stop work", e);
 //    		String result = SerializerHelper.toObjectSerializedString(e);
-//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
+//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
 //    	}
 //    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/server/ManagementServerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/server/ManagementServerImpl.java b/server/src/com/cloud/server/ManagementServerImpl.java
index 6de8840..9fc4645 100755
--- a/server/src/com/cloud/server/ManagementServerImpl.java
+++ b/server/src/com/cloud/server/ManagementServerImpl.java
@@ -3469,7 +3469,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
             if (asyncExecutionContext != null) {
                 job = asyncExecutionContext.getJob();
                 _asyncMgr.updateAsyncJobAttachment(job.getId(), Upload.Type.VOLUME.toString(), volumeId);
-                _asyncMgr.updateAsyncJobStatus(job.getId(), AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
+                _asyncMgr.updateAsyncJobStatus(job.getId(), JobInfo.Status.IN_PROGRESS, resultObj);
             }
             String value = _configs.get(Config.CopyVolumeWait.toString());
             int copyvolumewait = NumbersUtil.parseInt(value, Integer.parseInt(Config.CopyVolumeWait.getDefaultValue()));
@@ -3490,7 +3490,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
                 resultObj.setResultString(errorString);
                 resultObj.setUploadStatus(UploadVO.Status.COPY_ERROR.toString());
                 if (asyncExecutionContext != null) {
-                    _asyncMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, 0, resultObj);
+                    _asyncMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, resultObj);
                 }
 
                 // Update the DB that volume couldn't be copied

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
index 562db28..1324ac7 100755
--- a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
+++ b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
@@ -1462,11 +1462,11 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
 //	    	_itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(),
 //	    		user, account,  ((VmWorkStart)work).getPlan());
 //
-//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
+//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
 //    	} catch(Exception e) {
 //    		s_logger.error("Exception in process VM-start work", e);
 //    		String result = SerializerHelper.toObjectSerializedString(e);
-//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
+//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
 //    	}
 //    }
 //
@@ -1482,11 +1482,11 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
 //    	try {
 //	    	_itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account);
 //
-//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
+//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
 //    	} catch(Exception e) {
 //    		s_logger.error("Exception in process VM-stop work", e);
 //    		String result = SerializerHelper.toObjectSerializedString(e);
-//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
+//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
 //    	}
 //    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
index c6dd772..607e39b 100644
--- a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
+++ b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
@@ -143,14 +143,14 @@ public class SnapshotSchedulerImpl extends ManagerBase implements SnapshotSchedu
             Long asyncJobId = snapshotSchedule.getAsyncJobId();
             AsyncJobVO asyncJob = _asyncJobDao.findById(asyncJobId);
             switch (asyncJob.getStatus()) {
-            case AsyncJobConstants.STATUS_SUCCEEDED:
+            case JobInfo.Status.SUCCEEDED:
                 // The snapshot has been successfully backed up.
                 // The snapshot state has also been cleaned up.
                 // We can schedule the next job for this snapshot.
                 // Remove the existing entry in the snapshot_schedule table.
                 scheduleNextSnapshotJob(snapshotSchedule);
                 break;
-            case AsyncJobConstants.STATUS_FAILED:
+            case JobInfo.Status.FAILED:
                 // Check the snapshot status.
                 Long snapshotId = snapshotSchedule.getSnapshotId();
                 if (snapshotId == null) {
@@ -188,7 +188,7 @@ public class SnapshotSchedulerImpl extends ManagerBase implements SnapshotSchedu
                 }
 
                 break;
-            case AsyncJobConstants.STATUS_IN_PROGRESS:
+            case JobInfo.Status.IN_PROGRESS:
                 // There is no way of knowing from here whether
                 // 1) Another management server is processing this snapshot job
                 // 2) The management server has crashed and this snapshot is lying

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/storage/upload/UploadListener.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/upload/UploadListener.java b/server/src/com/cloud/storage/upload/UploadListener.java
index b66624d..b6a9288 100755
--- a/server/src/com/cloud/storage/upload/UploadListener.java
+++ b/server/src/com/cloud/storage/upload/UploadListener.java
@@ -365,7 +365,7 @@ public class UploadListener implements Listener {
 		resultObj.setResultString(uploadErrorString);
 		resultObj.setState(state.toString());
 		asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
-		asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
+		asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
 
 		UploadVO vo = uploadDao.createForUpdate();
 		vo.setUploadState(state);
@@ -378,7 +378,7 @@ public class UploadListener implements Listener {
 		resultObj.setResultString(uploadErrorString);
 		resultObj.setState(state.toString());
 		asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
-		asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
+		asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
 
 
 		UploadVO vo = uploadDao.createForUpdate();
@@ -407,12 +407,12 @@ public class UploadListener implements Listener {
 
 		if (answer.getUploadStatus() == Status.UPLOAD_IN_PROGRESS){
 			asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
-			asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
+			asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
 		}else if(answer.getUploadStatus() == Status.UPLOADED){
 		    resultObj.setResultString("Success");
-			asyncMgr.completeAsyncJob(asyncJobId, AsyncJobConstants.STATUS_SUCCEEDED, 1, resultObj);
+			asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.SUCCEEDED, 1, resultObj);
 		}else{
-			asyncMgr.completeAsyncJob(asyncJobId, AsyncJobConstants.STATUS_FAILED, 2, resultObj);
+			asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.FAILED, 2, resultObj);
 		}
         UploadVO updateBuilder = uploadDao.createForUpdate();
 		updateBuilder.setUploadPercent(answer.getUploadPct());

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/vm/SystemVmLoadScanner.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/SystemVmLoadScanner.java b/server/src/com/cloud/vm/SystemVmLoadScanner.java
index 704129d..4d378ce 100644
--- a/server/src/com/cloud/vm/SystemVmLoadScanner.java
+++ b/server/src/com/cloud/vm/SystemVmLoadScanner.java
@@ -71,8 +71,8 @@ public class SystemVmLoadScanner<T> {
             @Override
             public void run() {
                 try {
-                    CallContext.registerSystemCallContextOnceOnly();
-                    AsyncJobExecutionContext.registerPseudoExecutionContext();
+                    CallContext cc = CallContext.registerSystemCallContextOnceOnly();
+                    AsyncJobExecutionContext.registerPseudoExecutionContext(cc.getCallingAccountId(), cc.getCallingUserId());
                 } catch (Exception e) {
                     s_logger.fatal("Unable to start the capacity scan task", e);
                     System.exit(1);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/test/com/cloud/async/TestAsyncJobManager.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java
index 320c68f..419f5fa 100644
--- a/server/test/com/cloud/async/TestAsyncJobManager.java
+++ b/server/test/com/cloud/async/TestAsyncJobManager.java
@@ -48,10 +48,13 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
 import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
 import org.apache.cloudstack.framework.messagebus.MessageBus;
 import org.apache.cloudstack.framework.messagebus.PublishScope;
+import org.apache.cloudstack.jobs.JobInfo;
 
 import com.cloud.cluster.ClusterManager;
+import com.cloud.user.Account;
 import com.cloud.user.AccountManager;
 import com.cloud.user.AccountVO;
+import com.cloud.user.User;
 import com.cloud.user.UserVO;
 import com.cloud.utils.Predicate;
 import com.cloud.utils.component.ComponentContext;
@@ -142,21 +145,21 @@ public class TestAsyncJobManager extends TestCase {
     	AsyncJobJoinMapVO record = joinMapDao.getJoinRecord(2, 1);
     	Assert.assertTrue(record != null);
     	Assert.assertTrue(record.getJoinMsid() == 100);
-    	Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_IN_PROGRESS);
+    	Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.IN_PROGRESS);
     	
-    	joinMapDao.completeJoin(1, AsyncJobConstants.STATUS_SUCCEEDED, "Done", 101);
+    	joinMapDao.completeJoin(1, JobInfo.Status.SUCCEEDED, "Done", 101);
     	
     	record = joinMapDao.getJoinRecord(2, 1);
     	Assert.assertTrue(record != null);
     	Assert.assertTrue(record.getJoinMsid() == 100);
-    	Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_SUCCEEDED);
+    	Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.SUCCEEDED);
     	Assert.assertTrue(record.getJoinResult().equals("Done"));
     	Assert.assertTrue(record.getCompleteMsid() == 101);
     	
     	record = joinMapDao.getJoinRecord(3, 1);
     	Assert.assertTrue(record != null);
     	Assert.assertTrue(record.getJoinMsid() == 100);
-    	Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_SUCCEEDED);
+    	Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.SUCCEEDED);
     	Assert.assertTrue(record.getJoinResult().equals("Done"));
     	Assert.assertTrue(record.getCompleteMsid() == 101);
     	
@@ -198,7 +201,7 @@ public class TestAsyncJobManager extends TestCase {
     
     @Test
     public void testPseudoJob() {
-    	AsyncJob job = asyncMgr.getPseudoJob();
+        AsyncJob job = asyncMgr.getPseudoJob(Account.ACCOUNT_ID_SYSTEM, User.UID_SYSTEM);
     	Assert.assertTrue(job.getInstanceType().equals(AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE));
     	Assert.assertTrue(job.getInstanceId().longValue() == Thread.currentThread().getId());
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
index 2c1249e..49b0e31 100644
--- a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
+++ b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
@@ -316,7 +316,7 @@ public class VmWorkMockVirtualMachineManagerImpl implements VirtualMachineManage
 		if(wakeupCount++ < 3) {
 			AsyncJobExecutionContext.getCurrentExecutionContext().resetSyncSource();
 		} else {
-			AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobConstants.STATUS_SUCCEEDED, 0, null);
+			AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(JobInfo.Status.SUCCEEDED, 0, null);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java b/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java
index 72210b4..eb5e81f 100644
--- a/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java
+++ b/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java
@@ -20,7 +20,7 @@ public class VmWorkTestWorkJobDispatcher extends AdapterBase implements AsyncJob
 		} catch (InterruptedException e) {
 		}
 		
-		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
+		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
 		s_logger.info("End work job execution. job-" + job.getId());
 	}
 }