You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2013/04/19 03:15:01 UTC

git commit: updated refs/heads/vmsync to c51fae0

Updated Branches:
  refs/heads/vmsync 56bc25535 -> c51fae0b6


Add async job discriminator to allow merging of HA/ItWork jobs


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/c51fae0b
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/c51fae0b
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/c51fae0b

Branch: refs/heads/vmsync
Commit: c51fae0b6c7302bfd3d96cb138b16d6ae87a00f0
Parents: 56bc255
Author: Kelven Yang <ke...@gmail.com>
Authored: Thu Apr 18 18:12:27 2013 -0700
Committer: Kelven Yang <ke...@gmail.com>
Committed: Thu Apr 18 18:12:27 2013 -0700

----------------------------------------------------------------------
 api/src/com/cloud/async/AsyncJob.java              |   19 +-
 api/src/com/cloud/async/SyncQueueItem.java         |   19 ++-
 core/src/com/cloud/async/AsyncJobVO.java           |  117 +++++-----
 core/src/com/cloud/async/SyncQueueItemVO.java      |    1 +
 .../src/com/cloud/api/ApiAsyncJobDispatcher.java   |    4 +-
 server/src/com/cloud/async/AsyncJobDispatcher.java |    2 +-
 .../com/cloud/async/AsyncJobExecutionContext.java  |   10 +-
 server/src/com/cloud/async/AsyncJobMBeanImpl.java  |   47 ++--
 server/src/com/cloud/async/AsyncJobManager.java    |    4 +-
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |   18 +-
 .../src/com/cloud/async/SyncQueueManagerImpl.java  |    9 +-
 .../src/com/cloud/async/dao/AsyncJobDaoImpl.java   |    5 -
 .../src/com/cloud/async/dao/SyncQueueDaoImpl.java  |   20 +-
 .../com/cloud/async/dao/SyncQueueItemDaoImpl.java  |    9 -
 .../async/executor/ExtractJobResultObject.java     |  183 ---------------
 .../src/com/cloud/server/ManagementServerImpl.java |    2 +-
 .../src/com/cloud/storage/VolumeManagerImpl.java   |    6 +-
 setup/db/db/schema-410to420.sql                    |    6 +
 utils/src/com/cloud/utils/EnumUtils.java           |   13 +
 19 files changed, 171 insertions(+), 323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/api/src/com/cloud/async/AsyncJob.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/async/AsyncJob.java b/api/src/com/cloud/async/AsyncJob.java
index faba92c..3edc0df 100644
--- a/api/src/com/cloud/async/AsyncJob.java
+++ b/api/src/com/cloud/async/AsyncJob.java
@@ -51,7 +51,13 @@ public interface AsyncJob extends Identity, InternalIdentity {
         AutoScaleVmGroup,
         GlobalLoadBalancerRule
     }
-
+    
+    Long getParentId();
+    
+    String getType();
+    
+    String getDispatcher();
+    
     long getUserId();
 
     long getAccountId();
@@ -62,12 +68,6 @@ public interface AsyncJob extends Identity, InternalIdentity {
 
     String getCmdInfo();
     
-    String getDispatcher();
-
-    int getCallbackType();
-
-    String getCallbackAddress();
-
     int getStatus();
 
     int getProcessStatus();
@@ -77,8 +77,12 @@ public interface AsyncJob extends Identity, InternalIdentity {
     String getResult();
 
     Long getInitMsid();
+    void setInitMsid(Long msid);
 
+    Long getExecutingMsid();
+    
     Long getCompleteMsid();
+    void setCompleteMsid(Long msid);
 
     Date getCreated();
 
@@ -95,4 +99,5 @@ public interface AsyncJob extends Identity, InternalIdentity {
     boolean isFromPreviousSession();
 
     SyncQueueItem getSyncSource();
+    void setSyncSource(SyncQueueItem item);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/api/src/com/cloud/async/SyncQueueItem.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/async/SyncQueueItem.java b/api/src/com/cloud/async/SyncQueueItem.java
index 9f9c379..b8232f4 100644
--- a/api/src/com/cloud/async/SyncQueueItem.java
+++ b/api/src/com/cloud/async/SyncQueueItem.java
@@ -16,11 +16,26 @@
 // under the License.
 package com.cloud.async;
 
-
 public interface SyncQueueItem {
     public final String AsyncJobContentType = "AsyncJob";
 
-    String getContentType();
+    /**
+     * @return queue item id
+     */
+    long getId();
 
+    /**
+     * @return queue id
+     */
+    Long getQueueId();
+
+    /**
+     * @return subject object type pointed by the queue item
+     */
+    String getContentType();
+    
+    /**
+     * @return subject object id pointed by the queue item
+     */
     Long getContentId();
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/core/src/com/cloud/async/AsyncJobVO.java
----------------------------------------------------------------------
diff --git a/core/src/com/cloud/async/AsyncJobVO.java b/core/src/com/cloud/async/AsyncJobVO.java
index 83ddb0c..73d48cd 100644
--- a/core/src/com/cloud/async/AsyncJobVO.java
+++ b/core/src/com/cloud/async/AsyncJobVO.java
@@ -20,12 +20,16 @@ import java.util.Date;
 import java.util.UUID;
 
 import javax.persistence.Column;
+import javax.persistence.DiscriminatorColumn;
+import javax.persistence.DiscriminatorType;
 import javax.persistence.Entity;
 import javax.persistence.EnumType;
 import javax.persistence.Enumerated;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
+import javax.persistence.Inheritance;
+import javax.persistence.InheritanceType;
 import javax.persistence.Table;
 import javax.persistence.Temporal;
 import javax.persistence.TemporalType;
@@ -35,15 +39,24 @@ import com.cloud.utils.db.GenericDao;
 
 @Entity
 @Table(name="async_job")
+@Inheritance(strategy=InheritanceType.JOINED)
+@DiscriminatorColumn(name="job_type", discriminatorType=DiscriminatorType.STRING, length=32)
 public class AsyncJobVO implements AsyncJob {
-	public static final int CALLBACK_POLLING = 0;
-	public static final int CALLBACK_EMAIL = 1;
 	
 	@Id
     @GeneratedValue(strategy=GenerationType.IDENTITY)
     @Column(name="id")
     private Long id = null;
+	
+    @Column(name="parent_id")
+    private Long parentId;
 
+    @Column(name="job_type", length=32)
+    protected String type;
+    
+    @Column(name="job_dispatcher", length=64)
+    protected String dispatcher;
+    
     @Column(name="user_id")
     private long userId;
     
@@ -56,18 +69,9 @@ public class AsyncJobVO implements AsyncJob {
 	@Column(name="job_cmd_ver")
     private int cmdVersion;
 	
-	@Column(name="job_dispatcher")
-	private String jobDispatcher;
-	
     @Column(name="job_cmd_info", length=65535)
     private String cmdInfo;
   
-    @Column(name="callback_type")
-    private int callbackType;
-    
-    @Column(name="callback_address")
-    private String callbackAddress;
-    
     @Column(name="job_status")
     private int status;
     
@@ -93,6 +97,9 @@ public class AsyncJobVO implements AsyncJob {
     @Column(name="job_complete_msid")
     private Long completeMsid;
     
+    @Column(name="job_executing_msid")
+    private Long executingMsid;
+    
     @Column(name=GenericDao.CREATED_COLUMN)
     private Date created;
     
@@ -111,7 +118,7 @@ public class AsyncJobVO implements AsyncJob {
     private String uuid;
 
     @Transient
-    private SyncQueueItemVO syncSource = null;
+    private SyncQueueItem syncSource = null;
 
     @Transient
     private boolean fromPreviousSession = false;
@@ -125,21 +132,13 @@ public class AsyncJobVO implements AsyncJob {
 		this.accountId = accountId;
 		this.cmd = cmd;
 		this.cmdInfo = cmdInfo;
-	    this.callbackType = CALLBACK_POLLING;
 	    this.uuid = UUID.randomUUID().toString();
 	    this.instanceId = instanceId;
+	    this.instanceType = instanceType;
+	    
+	    this.type ="AsyncJobVO";
     }
 
-    public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo,
-		int callbackType, String callbackAddress, Long instanceId, Type instanceType) {
-	
-		this(userId, accountId, cmd, cmdInfo, instanceId, instanceType);
-		this.callbackType = callbackType;
-		this.callbackAddress = callbackAddress;
-	    this.uuid = UUID.randomUUID().toString();
-    }
-
-
     @Override
     public long getId() {
 		return id;
@@ -148,6 +147,33 @@ public class AsyncJobVO implements AsyncJob {
 	public void setId(Long id) {
 		this.id = id;
 	}
+	
+	@Override
+	public Long getParentId() {
+		return this.parentId;
+	}
+	
+	public void setParentId(Long parentId) {
+		this.parentId = parentId;
+	}
+	
+	@Override
+	public String getType() {
+		return this.type;
+	}
+	
+	public void setType(String type) {
+		this.type = type;
+	}
+	
+	@Override
+	public String getDispatcher() {
+		return this.dispatcher;
+	}
+	
+	public void setDispatcher(String dispatcher) {
+		this.dispatcher = dispatcher;
+	}
 
 	@Override
     public long getUserId() {
@@ -195,33 +221,6 @@ public class AsyncJobVO implements AsyncJob {
 	}
 	
 	@Override
-	public String getDispatcher() {
-		return this.jobDispatcher;
-	}
-	
-	public void setDispatcher(String dispatcher) {
-		this.jobDispatcher = dispatcher;
-	}
-
-	@Override
-    public int getCallbackType() {
-		return callbackType;
-	}
-
-	public void setCallbackType(int callbackType) {
-		this.callbackType = callbackType;
-	}
-
-	@Override
-    public String getCallbackAddress() {
-		return callbackAddress;
-	}
-
-	public void setCallbackAddress(String callbackAddress) {
-		this.callbackAddress = callbackAddress;
-	}
-
-	@Override
     public int getStatus() {
 		return status;
 	}
@@ -262,15 +261,26 @@ public class AsyncJobVO implements AsyncJob {
 		return initMsid;
 	}
 
+	@Override
 	public void setInitMsid(Long initMsid) {
 		this.initMsid = initMsid;
 	}
+	
+	@Override
+	public Long getExecutingMsid() {
+		return this.executingMsid;
+	}
+	
+	public void setExecutingMsid(Long executingMsid) {
+		this.executingMsid = executingMsid;
+	}
 
 	@Override
     public Long getCompleteMsid() {
 		return completeMsid;
 	}
 
+	@Override
 	public void setCompleteMsid(Long completeMsid) {
 		this.completeMsid = completeMsid;
 	}
@@ -330,11 +340,12 @@ public class AsyncJobVO implements AsyncJob {
 	}
 	
 	@Override
-    public SyncQueueItemVO getSyncSource() {
+    public SyncQueueItem getSyncSource() {
         return syncSource;
     }
     
-    public void setSyncSource(SyncQueueItemVO syncSource) {
+	@Override
+    public void setSyncSource(SyncQueueItem syncSource) {
         this.syncSource = syncSource;
     }
     
@@ -367,8 +378,6 @@ public class AsyncJobVO implements AsyncJob {
 		sb.append(", cmd: ").append(getCmd());
 		sb.append(", cmdInfo: ").append(getCmdInfo());
 		sb.append(", cmdVersion: ").append(getCmdVersion());
-		sb.append(", callbackType: ").append(getCallbackType());
-		sb.append(", callbackAddress: ").append(getCallbackAddress());
 		sb.append(", status: ").append(getStatus());
 		sb.append(", processStatus: ").append(getProcessStatus());
 		sb.append(", resultCode: ").append(getResultCode());

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/core/src/com/cloud/async/SyncQueueItemVO.java
----------------------------------------------------------------------
diff --git a/core/src/com/cloud/async/SyncQueueItemVO.java b/core/src/com/cloud/async/SyncQueueItemVO.java
index b0546a7..21fbb91 100644
--- a/core/src/com/cloud/async/SyncQueueItemVO.java
+++ b/core/src/com/cloud/async/SyncQueueItemVO.java
@@ -68,6 +68,7 @@ public class SyncQueueItemVO implements SyncQueueItem, InternalIdentity {
         this.id = id;
     }
 
+    @Override
     public Long getQueueId() {
         return queueId;
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
index 98a7747..9c87378 100644
--- a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
+++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
@@ -27,10 +27,10 @@ import org.apache.cloudstack.api.ServerApiException;
 import org.apache.cloudstack.api.response.ExceptionResponse;
 import org.apache.log4j.Logger;
 
+import com.cloud.async.AsyncJob;
 import com.cloud.async.AsyncJobDispatcher;
 import com.cloud.async.AsyncJobManager;
 import com.cloud.async.AsyncJobResult;
-import com.cloud.async.AsyncJobVO;
 import com.cloud.async.SyncQueueManager;
 import com.cloud.user.Account;
 import com.cloud.user.UserContext;
@@ -53,7 +53,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
     }
     
 	@Override
-	public void RunJob(AsyncJobVO job) {
+	public void RunJob(AsyncJob job) {
         BaseAsyncCmd cmdObj = null;
         try {
             Class<?> cmdClass = Class.forName(job.getCmd());

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/async/AsyncJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobDispatcher.java b/server/src/com/cloud/async/AsyncJobDispatcher.java
index 80e6254..44b8603 100644
--- a/server/src/com/cloud/async/AsyncJobDispatcher.java
+++ b/server/src/com/cloud/async/AsyncJobDispatcher.java
@@ -19,5 +19,5 @@ package com.cloud.async;
 import com.cloud.utils.component.Adapter;
 
 public interface AsyncJobDispatcher extends Adapter {
-	void RunJob(AsyncJobVO job);
+	void RunJob(AsyncJob job);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/async/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobExecutionContext.java b/server/src/com/cloud/async/AsyncJobExecutionContext.java
index a09604b..f71a391 100644
--- a/server/src/com/cloud/async/AsyncJobExecutionContext.java
+++ b/server/src/com/cloud/async/AsyncJobExecutionContext.java
@@ -17,26 +17,26 @@
 package com.cloud.async;
 
 public class AsyncJobExecutionContext  {
-	private AsyncJobVO _job;
+	private AsyncJob _job;
 	
 	private static ThreadLocal<AsyncJobExecutionContext> s_currentExectionContext = new ThreadLocal<AsyncJobExecutionContext>();
 
 	public AsyncJobExecutionContext() {
 	}
 	
-	public AsyncJobExecutionContext(AsyncJobVO job) {
+	public AsyncJobExecutionContext(AsyncJob job) {
 		_job = job;
 	}
 	
-	public SyncQueueItemVO getSyncSource() {
+	public SyncQueueItem getSyncSource() {
 		return _job.getSyncSource();
 	}
 	
-	public AsyncJobVO getJob() {
+	public AsyncJob getJob() {
 		return _job;
 	}
 	
-	public void setJob(AsyncJobVO job) {
+	public void setJob(AsyncJob job) {
 		_job = job;
 	}
 	

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/async/AsyncJobMBeanImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobMBeanImpl.java b/server/src/com/cloud/async/AsyncJobMBeanImpl.java
index 282dace..fa579f3 100644
--- a/server/src/com/cloud/async/AsyncJobMBeanImpl.java
+++ b/server/src/com/cloud/async/AsyncJobMBeanImpl.java
@@ -24,32 +24,32 @@ import javax.management.StandardMBean;
 import com.cloud.utils.DateUtil;
 
 public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
-	private AsyncJobVO _jobVo;
+	private AsyncJob _job;
 	
-	public AsyncJobMBeanImpl(AsyncJobVO jobVo) {
+	public AsyncJobMBeanImpl(AsyncJob job) {
 		super(AsyncJobMBean.class, false);
 		
-		_jobVo = jobVo;
+		_job = job;
 	}
 	
 	public long getAccountId() {
-		return _jobVo.getAccountId();
+		return _job.getAccountId();
 	}
 	
 	public long getUserId() {
-		return _jobVo.getUserId();
+		return _job.getUserId();
 	}
 	
 	public String getCmd() {
-		return _jobVo.getCmd();
+		return _job.getCmd();
 	}
 	
 	public String getCmdInfo() {
-		return _jobVo.getCmdInfo();
+		return _job.getCmdInfo();
 	}
 	
 	public String getStatus() {
-		int jobStatus = _jobVo.getStatus();
+		int jobStatus = _job.getStatus();
 		switch(jobStatus) {
 		case AsyncJobResult.STATUS_SUCCEEDED :
 			return "Completed";
@@ -65,52 +65,52 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
 	}
 	
 	public int getProcessStatus() {
-		return _jobVo.getProcessStatus();
+		return _job.getProcessStatus();
 	}
 	
 	public int getResultCode() {
-		return _jobVo.getResultCode();
+		return _job.getResultCode();
 	}
 	
 	public String getResult() {
-		return _jobVo.getResult();
+		return _job.getResult();
 	}
 	
 	public String getInstanceType() {
-		if(_jobVo.getInstanceType() != null)
-			return _jobVo.getInstanceType().toString();
+		if(_job.getInstanceType() != null)
+			return _job.getInstanceType().toString();
 		return "N/A";
 	}
 	
 	public String getInstanceId() {
-		if(_jobVo.getInstanceId() != null)
-			return String.valueOf(_jobVo.getInstanceId());
+		if(_job.getInstanceId() != null)
+			return String.valueOf(_job.getInstanceId());
 		return "N/A";
 	}
 	
 	public String getInitMsid() {
-		if(_jobVo.getInitMsid() != null) {
-			return String.valueOf(_jobVo.getInitMsid());
+		if(_job.getInitMsid() != null) {
+			return String.valueOf(_job.getInitMsid());
 		}
 		return "N/A";
 	}
 	
 	public String getCreateTime() {
-		Date time = _jobVo.getCreated();
+		Date time = _job.getCreated();
 		if(time != null)
 			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
 		return "N/A";
 	}
 	
 	public String getLastUpdateTime() {
-		Date time = _jobVo.getLastUpdated();
+		Date time = _job.getLastUpdated();
 		if(time != null)
 			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
 		return "N/A";
 	}
 	
 	public String getLastPollTime() {
-		Date time = _jobVo.getLastPolled();
+		Date time = _job.getLastPolled();
 	
 		if(time != null)
 			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
@@ -118,7 +118,7 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
 	}
 	
 	public String getSyncQueueId() {
-		SyncQueueItemVO item = _jobVo.getSyncSource();
+		SyncQueueItem item = _job.getSyncSource();
 		if(item != null && item.getQueueId() != null) {
 			return String.valueOf(item.getQueueId());
 		}
@@ -126,7 +126,7 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
 	}
 	
 	public String getSyncQueueContentType() {
-		SyncQueueItemVO item = _jobVo.getSyncSource();
+		SyncQueueItem item = _job.getSyncSource();
 		if(item != null) {
 			return item.getContentType();
 		}
@@ -134,11 +134,10 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
 	}
 	
 	public String getSyncQueueContentId() {
-		SyncQueueItemVO item = _jobVo.getSyncSource();
+		SyncQueueItem item = _job.getSyncSource();
 		if(item != null && item.getContentId() != null) {
 			return String.valueOf(item.getContentId());
 		}
 		return "N/A";
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/async/AsyncJobManager.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManager.java b/server/src/com/cloud/async/AsyncJobManager.java
index f7ffc5a..95f7157 100644
--- a/server/src/com/cloud/async/AsyncJobManager.java
+++ b/server/src/com/cloud/async/AsyncJobManager.java
@@ -28,8 +28,8 @@ public interface AsyncJobManager extends Manager {
 	
 	public List<? extends AsyncJob> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId);
 	
-	public long submitAsyncJob(AsyncJobVO job);
-	public long submitAsyncJob(AsyncJobVO job, boolean scheduleJobExecutionInContext);
+	public long submitAsyncJob(AsyncJob job);
+	public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext);
 	public AsyncJobResult queryAsyncJobResult(long jobId);    
 	
     public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 586f40e..d2f4912 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -59,6 +59,8 @@ import com.cloud.utils.PropertiesUtil;
 import com.cloud.utils.component.ManagerBase;
 import com.cloud.utils.concurrency.NamedThreadFactory;
 import com.cloud.utils.db.DB;
+import com.cloud.utils.db.GenericDao;
+import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.GlobalLock;
 import com.cloud.utils.db.Transaction;
 import com.cloud.utils.exception.CloudRuntimeException;
@@ -114,17 +116,21 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     }
 
     @Override
-    public long submitAsyncJob(AsyncJobVO job) {
+    public long submitAsyncJob(AsyncJob job) {
         return submitAsyncJob(job, false);
     }
 
-    @Override @DB
-    public long submitAsyncJob(AsyncJobVO job, boolean scheduleJobExecutionInContext) {
+    @SuppressWarnings("unchecked")
+	@Override @DB
+    public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext) {
         Transaction txt = Transaction.currentTxn();
         try {
+        	@SuppressWarnings("rawtypes")
+			GenericDao dao = GenericDaoBase.getDao(job.getClass());
+        	
             txt.start();
             job.setInitMsid(getMsid());
-            _jobDao.persist(job);
+            dao.persist(job);
             txt.commit();
 
             // no sync source originally
@@ -353,7 +359,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
         scheduleExecution(job, false);
     }
 
-    private void scheduleExecution(final AsyncJobVO job, boolean executeInContext) {
+    private void scheduleExecution(final AsyncJob job, boolean executeInContext) {
         Runnable runnable = getExecutorRunnable(this, job);
         if (executeInContext) {
             runnable.run();
@@ -375,7 +381,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     	return null;
     }
     
-    private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJobVO job) {
+    private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJob job) {
         return new Runnable() {
             @Override
             public void run() {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/async/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/SyncQueueManagerImpl.java b/server/src/com/cloud/async/SyncQueueManagerImpl.java
index bde77a0..eaa97af 100644
--- a/server/src/com/cloud/async/SyncQueueManagerImpl.java
+++ b/server/src/com/cloud/async/SyncQueueManagerImpl.java
@@ -20,11 +20,8 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
-import javax.ejb.Local;
 import javax.inject.Inject;
-
 import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
 
 import com.cloud.async.dao.SyncQueueDao;
 import com.cloud.async.dao.SyncQueueItemDao;
@@ -34,8 +31,6 @@ import com.cloud.utils.db.DB;
 import com.cloud.utils.db.Transaction;
 import com.cloud.utils.exception.CloudRuntimeException;
 
-@Component
-@Local(value={SyncQueueManager.class})
 public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManager {
     public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
 
@@ -185,10 +180,10 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
 
                 _syncQueueItemDao.expunge(itemVO.getId());
 
-                //if item is active, reset queue information
+                // if item is active, reset queue information
                 if (itemVO.getLastProcessMsid() != null) {
                     queueVO.setLastUpdated(DateUtil.currentGMTTime());
-                    //decrement the count
+                    // decrement the count
                     assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
                     queueVO.setQueueSize(queueVO.getQueueSize() - 1);
                     _syncQueueDao.update(queueVO.getId(), queueVO);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
index 4793a6e..0eee76b 100644
--- a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
+++ b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
@@ -21,10 +21,7 @@ import java.sql.SQLException;
 import java.util.Date;
 import java.util.List;
 
-import javax.ejb.Local;
-
 import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
 
 import com.cloud.async.AsyncJob;
 import com.cloud.async.AsyncJobResult;
@@ -36,8 +33,6 @@ import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.Transaction;
 
-@Component
-@Local(value = { AsyncJobDao.class })
 public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements AsyncJobDao {
     private static final Logger s_logger = Logger.getLogger(AsyncJobDaoImpl.class.getName());
 	

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java b/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
index 7b4c182..a1dc90a 100644
--- a/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
+++ b/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java
@@ -22,10 +22,7 @@ import java.sql.SQLException;
 import java.util.Date;
 import java.util.TimeZone;
 
-import javax.ejb.Local;
-
 import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
 
 import com.cloud.async.SyncQueueVO;
 import com.cloud.utils.DateUtil;
@@ -34,12 +31,18 @@ import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.Transaction;
 
-@Component
-@Local(value = { SyncQueueDao.class })
 public class SyncQueueDaoImpl extends GenericDaoBase<SyncQueueVO, Long> implements SyncQueueDao {
     private static final Logger s_logger = Logger.getLogger(SyncQueueDaoImpl.class.getName());
     
     SearchBuilder<SyncQueueVO> TypeIdSearch = createSearchBuilder();
+
+    public SyncQueueDaoImpl() {
+	    super();
+	    TypeIdSearch = createSearchBuilder();
+        TypeIdSearch.and("syncObjType", TypeIdSearch.entity().getSyncObjType(), SearchCriteria.Op.EQ);
+        TypeIdSearch.and("syncObjId", TypeIdSearch.entity().getSyncObjId(), SearchCriteria.Op.EQ);
+        TypeIdSearch.done();
+	}
 	
 	@Override
 	public void ensureQueue(String syncObjType, long syncObjId) {
@@ -71,11 +74,4 @@ public class SyncQueueDaoImpl extends GenericDaoBase<SyncQueueVO, Long> implemen
         return findOneBy(sc);
 	}
 
-	protected SyncQueueDaoImpl() {
-	    super();
-	    TypeIdSearch = createSearchBuilder();
-        TypeIdSearch.and("syncObjType", TypeIdSearch.entity().getSyncObjType(), SearchCriteria.Op.EQ);
-        TypeIdSearch.and("syncObjId", TypeIdSearch.entity().getSyncObjId(), SearchCriteria.Op.EQ);
-        TypeIdSearch.done();
-	}
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
index d2d2929..829b3bb 100644
--- a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
+++ b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
@@ -25,11 +25,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.TimeZone;
 
-import javax.ejb.Local;
-import javax.inject.Inject;
-
 import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
 
 import com.cloud.async.SyncQueueItemVO;
 import com.cloud.utils.DateUtil;
@@ -42,8 +38,6 @@ import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.SearchCriteria.Op;
 import com.cloud.utils.db.Transaction;
 
-@Component
-@Local(value = { SyncQueueItemDao.class })
 @DB
 public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
     private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
@@ -57,7 +51,6 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
         queueIdSearch.selectField(queueIdSearch.entity().getId());
         queueIdSearch.done();
     }
-    
 
 	@Override
 	public SyncQueueItemVO getNextQueueItem(long queueId) {
@@ -128,7 +121,6 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
     		return lockRows(sc, filter, true);
         return listBy(sc, filter);
 	}
-	
 
     @Override
     public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
@@ -150,7 +142,6 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
         return listBy(sc, null);
     }
 
-
     @Override
     public Long getQueueItemIdByContentIdAndType(long contentId, String contentType) {
         SearchCriteria<Long> sc = queueIdSearch.create();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/async/executor/ExtractJobResultObject.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/executor/ExtractJobResultObject.java b/server/src/com/cloud/async/executor/ExtractJobResultObject.java
deleted file mode 100644
index 772f074..0000000
--- a/server/src/com/cloud/async/executor/ExtractJobResultObject.java
+++ /dev/null
@@ -1,183 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.async.executor;
-
-import java.util.Date;
-
-import com.cloud.async.AsyncInstanceCreateStatus;
-import com.cloud.serializer.Param;
-import com.cloud.storage.Volume.Type;
-import com.cloud.storage.upload.UploadState;
-
-public class ExtractJobResultObject {
-	
-	public ExtractJobResultObject(Long accountId, String typeName, String currState, int uploadPercent, Long uploadId){
-		this.accountId = accountId;
-		this.name = typeName;
-		this.state = currState;
-		this.id = uploadId;
-		this.uploadPercent = uploadPercent;
-	}
-
-    public ExtractJobResultObject(Long accountId, String typeName, String currState, Long uploadId, String url){
-        this.accountId = accountId;
-        this.name = typeName;
-        this.state = currState;
-        this.id = uploadId;
-        this.url = url;
-    }	
-    
-	public ExtractJobResultObject(){		
-	}
-	
-	@Param(name="id")
-	private long id;
-    
-	@Param(name="name")
-	private String name;
-	
-    @Param(name="uploadPercentage")
-	private int uploadPercent;
-    
-    @Param(name="uploadStatus")
-	private String uploadStatus;
-    
-    @Param(name="accountid")
-    long accountId;    
- 
-    @Param(name="result_string")
-    String result_string;    
-
-    @Param(name="created")
-    private Date createdDate;
-
-    @Param(name="state")
-    private String state;
-    
-    @Param(name="storagetype")
-	String storageType;
-    
-    @Param(name="storage")
-    private String storage;
-    
-    @Param(name="zoneid")
-    private Long zoneId;
-
-    @Param(name="zonename")
-    private String zoneName;
-
-    @Param(name="url")
-    private String url;
-    
-    public String getUrl() {
-        return url;
-    }
-
-    public void setUrl(String url) {
-        this.url = url;
-    }
-
-    public int getUploadPercent() {
-        return uploadPercent;
-    }
-
-    public void setUploadPercent(int i) {
-        this.uploadPercent = i;
-    }
-
-    public String getUploadStatus() {
-        return uploadStatus;
-    }
-
-    public void setUploadStatus(String uploadStatus) {
-        this.uploadStatus = uploadStatus;
-    }
-
-    public String getResult_string() {
-        return result_string;
-    }
-
-    public void setResult_string(String resultString) {
-        result_string = resultString;
-    }
-
-    
-    public Long getZoneId() {
-		return zoneId;
-	}
-
-	public void setZoneId(Long zoneId) {
-		this.zoneId = zoneId;
-	}
-
-	public String getZoneName() {
-		return zoneName;
-	}
-
-	public void setZoneName(String zoneName) {
-		this.zoneName = zoneName;
-	}
-
-	public String getStorage() {
-		return storage;
-	}
-
-	public void setStorage(String storage) {
-		this.storage = storage;
-	}
-
-	public void setId(long id) {
-		this.id = id;
-	}
-	
-	public long getId() {
-		return id;
-	}
-
-	public void setName(String name) {
-		this.name = name;
-	}
-	
-	public String getName() {
-		return name;
-	}
-          
-    public void setCreatedDate(Date createdDate) {
-        this.createdDate = createdDate;
-    }
-
-    public Date getCreatedDate() {
-        return createdDate;
-    }
-
-    public void setState(String status) {
-        this.state = status;
-    }
-
-    public String getState() {
-        return state;
-    }
-   
-    public void setStorageType (String storageType) {
-    	this.storageType = storageType;
-    }
-    
-    public String getStorageType() {
-    	return storageType;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/server/ManagementServerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/server/ManagementServerImpl.java b/server/src/com/cloud/server/ManagementServerImpl.java
index f030ac6..163bbca 100755
--- a/server/src/com/cloud/server/ManagementServerImpl.java
+++ b/server/src/com/cloud/server/ManagementServerImpl.java
@@ -2732,7 +2732,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
         Long volumeId = cmd.getId();
         String url = cmd.getUrl();
         Long zoneId = cmd.getZoneId();
-        AsyncJobVO job = null; // FIXME: cmd.getJob();
+        AsyncJob job = null; // FIXME: cmd.getJob();
         String mode = cmd.getMode();
         Account account = UserContext.current().getCaller();
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/server/src/com/cloud/storage/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/VolumeManagerImpl.java b/server/src/com/cloud/storage/VolumeManagerImpl.java
index 24d568b..5160c26 100644
--- a/server/src/com/cloud/storage/VolumeManagerImpl.java
+++ b/server/src/com/cloud/storage/VolumeManagerImpl.java
@@ -71,8 +71,8 @@ import com.cloud.agent.api.AttachVolumeCommand;
 import com.cloud.agent.api.to.VolumeTO;
 import com.cloud.alert.AlertManager;
 import com.cloud.api.ApiDBUtils;
+import com.cloud.async.AsyncJob;
 import com.cloud.async.AsyncJobManager;
-import com.cloud.async.AsyncJobVO;
 import com.cloud.async.AsyncJobExecutionContext;
 import com.cloud.capacity.CapacityManager;
 import com.cloud.capacity.dao.CapacityDao;
@@ -1756,7 +1756,7 @@ public class VolumeManagerImpl extends ManagerBase implements VolumeManager {
         AsyncJobExecutionContext asyncExecutionContext = AsyncJobExecutionContext.getCurrentExecutionContext();
                
         if (asyncExecutionContext != null) {
-            AsyncJobVO job = asyncExecutionContext.getJob();
+            AsyncJob job = asyncExecutionContext.getJob();
 
             if (s_logger.isInfoEnabled()) {
                 s_logger.info("Trying to attaching volume " + volumeId
@@ -1838,7 +1838,7 @@ public class VolumeManagerImpl extends ManagerBase implements VolumeManager {
 
         AsyncJobExecutionContext asyncExecutionContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (asyncExecutionContext != null) {
-            AsyncJobVO job = asyncExecutionContext.getJob();
+            AsyncJob job = asyncExecutionContext.getJob();
 
             if (s_logger.isInfoEnabled()) {
                 s_logger.info("Trying to attaching volume " + volumeId

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/setup/db/db/schema-410to420.sql
----------------------------------------------------------------------
diff --git a/setup/db/db/schema-410to420.sql b/setup/db/db/schema-410to420.sql
index c4e91eb..ea31b78 100644
--- a/setup/db/db/schema-410to420.sql
+++ b/setup/db/db/schema-410to420.sql
@@ -408,7 +408,13 @@ INSERT INTO `cloud`.`vm_template` (id, unique_name, name, public, created, type,
 
 ALTER TABLE `cloud`.`async_job` DROP COLUMN `session_key`;
 ALTER TABLE `cloud`.`async_job` DROP COLUMN `job_cmd_originator`;
+ALTER TABLE `cloud`.`async_job` DROP COLUMN `callback_type`;
+ALTER TABLE `cloud`.`async_job` DROP COLUMN `callback_address`;
+
+ALTER TABLE `cloud`.`async_job` ADD COLUMN `parent_id` bigint;
+ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_type` VARCHAR(32);
 ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_dispatcher` VARCHAR(64);
+ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_executing_msid` bigint;
 
 ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `last_event` VARCHAR(64) DEFAULT 'OperationNop';
 ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `last_event_args` VARCHAR(256);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c51fae0b/utils/src/com/cloud/utils/EnumUtils.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/EnumUtils.java b/utils/src/com/cloud/utils/EnumUtils.java
index 807964b..ecde7e7 100644
--- a/utils/src/com/cloud/utils/EnumUtils.java
+++ b/utils/src/com/cloud/utils/EnumUtils.java
@@ -26,4 +26,17 @@ public class EnumUtils {
         b.append("]");
         return b.toString();
     }
+    
+    public static <T extends Enum<T>> T fromString(Class<T> clz, String value, T defaultVal) {
+    	assert(clz != null);
+    	
+        if(value != null) {
+            try {
+                return Enum.valueOf(clz, value.trim());
+            } catch(IllegalArgumentException ex) {
+            	assert(false);
+            }
+        }
+        return defaultVal;
+    }
 }