You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/06/05 00:29:54 UTC

[1/3] further refactored jobs

Updated Branches:
  refs/heads/vmsync 51f533e97 -> bd0c239f6


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
new file mode 100644
index 0000000..95f01e5
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.Date;
+import java.util.TimeZone;
+
+import javax.management.StandardMBean;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.framework.jobs.AsyncJobMBean;
+
+import com.cloud.utils.DateUtil;
+
+public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
+	private AsyncJob _job;
+	
+	public AsyncJobMBeanImpl(AsyncJob job) {
+		super(AsyncJobMBean.class, false);
+		
+		_job = job;
+	}
+	
+	public long getAccountId() {
+		return _job.getAccountId();
+	}
+	
+	public long getUserId() {
+		return _job.getUserId();
+	}
+	
+	public String getCmd() {
+		return _job.getCmd();
+	}
+	
+	public String getCmdInfo() {
+		return _job.getCmdInfo();
+	}
+	
+	public String getStatus() {
+		int jobStatus = _job.getStatus();
+		switch(jobStatus) {
+		case AsyncJobConstants.STATUS_SUCCEEDED :
+			return "Completed";
+		
+		case AsyncJobConstants.STATUS_IN_PROGRESS:
+			return "In preogress";
+			
+		case AsyncJobConstants.STATUS_FAILED:
+			return "failed";
+		}
+		
+		return "Unknow";
+	}
+	
+	public int getProcessStatus() {
+		return _job.getProcessStatus();
+	}
+	
+	public int getResultCode() {
+		return _job.getResultCode();
+	}
+	
+	public String getResult() {
+		return _job.getResult();
+	}
+	
+	public String getInstanceType() {
+		if(_job.getInstanceType() != null)
+			return _job.getInstanceType().toString();
+		return "N/A";
+	}
+	
+	public String getInstanceId() {
+		if(_job.getInstanceId() != null)
+			return String.valueOf(_job.getInstanceId());
+		return "N/A";
+	}
+	
+	public String getInitMsid() {
+		if(_job.getInitMsid() != null) {
+			return String.valueOf(_job.getInitMsid());
+		}
+		return "N/A";
+	}
+	
+	public String getCreateTime() {
+		Date time = _job.getCreated();
+		if(time != null)
+			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
+		return "N/A";
+	}
+	
+	public String getLastUpdateTime() {
+		Date time = _job.getLastUpdated();
+		if(time != null)
+			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
+		return "N/A";
+	}
+	
+	public String getLastPollTime() {
+		Date time = _job.getLastPolled();
+	
+		if(time != null)
+			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
+		return "N/A";
+	}
+	
+	public String getSyncQueueId() {
+		SyncQueueItem item = _job.getSyncSource();
+		if(item != null && item.getQueueId() != null) {
+			return String.valueOf(item.getQueueId());
+		}
+		return "N/A";
+	}
+	
+	public String getSyncQueueContentType() {
+		SyncQueueItem item = _job.getSyncSource();
+		if(item != null) {
+			return item.getContentType();
+		}
+		return "N/A";
+	}
+	
+	public String getSyncQueueContentId() {
+		SyncQueueItem item = _job.getSyncSource();
+		if(item != null && item.getContentId() != null) {
+			return String.valueOf(item.getContentId());
+		}
+		return "N/A";
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
new file mode 100644
index 0000000..7a11195
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.framework.jobs.AsyncJob.Topics;
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
+import org.apache.cloudstack.framework.messagebus.MessageHandler;
+
+import com.cloud.utils.component.ManagerBase;
+
+public class AsyncJobMonitor extends ManagerBase {
+    public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class);
+    
+    @Inject private MessageBus _messageBus;
+	
+	private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
+	private final Timer _timer = new Timer();
+	
+	private volatile int _activePoolThreads = 0;
+	private volatile int _activeInplaceThreads = 0;
+	
+	// configuration
+	private long _inactivityCheckIntervalMs = 60000;
+	private long _inactivityWarningThresholdMs = 90000;
+	
+	public AsyncJobMonitor() {
+	}
+	
+	public long getInactivityCheckIntervalMs() {
+		return _inactivityCheckIntervalMs;
+	}
+	
+	public void setInactivityCheckIntervalMs(long intervalMs) {
+		_inactivityCheckIntervalMs = intervalMs;
+	}
+	
+	public long getInactivityWarningThresholdMs() {
+		return _inactivityWarningThresholdMs;
+	}
+	
+	public void setInactivityWarningThresholdMs(long thresholdMs) {
+		_inactivityWarningThresholdMs = thresholdMs;
+	}
+	
+    @MessageHandler(topic = AsyncJob.Topics.JOB_HEARTBEAT)
+	public void onJobHeartbeatNotify(String subject, String senderAddress, Object args) {
+		if(args != null && args instanceof Long) {
+			synchronized(this) {
+				ActiveTaskRecord record = _activeTasks.get(args);
+				if(record != null) {
+					record.updateJobHeartbeatTick();
+				}
+			}
+		}
+	}
+	
+	private void heartbeat() {
+		synchronized(this) {
+			for(Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) {
+				if(entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) {
+					s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for "
+						+ entry.getValue().millisSinceLastJobHeartbeat()/1000 + " seconds");
+				}
+			}
+		}
+	}
+	
+	@Override
+	public boolean configure(String name, Map<String, Object> params)
+			throws ConfigurationException {
+		
+        _messageBus.subscribe(AsyncJob.Topics.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this));
+		_timer.scheduleAtFixedRate(new TimerTask() {
+
+			@Override
+			public void run() {
+				heartbeat();
+			}
+			
+		}, _inactivityCheckIntervalMs, _inactivityCheckIntervalMs);
+		return true;
+	}
+	
+	public void registerActiveTask(long jobId) {
+		synchronized(this) {
+			assert(_activeTasks.get(jobId) == null);
+			
+			long threadId = Thread.currentThread().getId();
+			boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX);
+			ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread);
+			_activeTasks.put(jobId, record);
+			if(fromPoolThread)
+				_activePoolThreads++;
+			else
+				_activeInplaceThreads++;
+		}
+	}
+	
+	public void unregisterActiveTask(long jobId) {
+		synchronized(this) {
+			ActiveTaskRecord record = _activeTasks.get(jobId);
+			assert(record != null);
+			if(record != null) {
+				if(record.isPoolThread())
+					_activePoolThreads--;
+				else
+					_activeInplaceThreads--;
+				
+				_activeTasks.remove(jobId);
+			}
+		}
+	}
+	
+	public int getActivePoolThreads() {
+		return _activePoolThreads;
+	}
+	
+	public int getActiveInplaceThread() {
+		return _activeInplaceThreads;
+	}
+	
+	private static class ActiveTaskRecord {
+		long _jobId;
+		long _threadId;
+		boolean _fromPoolThread;
+		long _jobLastHeartbeatTick;
+		
+		public ActiveTaskRecord(long jobId, long threadId, boolean fromPoolThread) {
+			_threadId = threadId;
+			_jobId = jobId;
+			_fromPoolThread = fromPoolThread;
+			_jobLastHeartbeatTick = System.currentTimeMillis();
+		}
+		
+		public long getThreadId() {
+			return _threadId;
+		}
+		
+		public long getJobId() {
+			return _jobId;
+		}
+		
+		public boolean isPoolThread() {
+			return _fromPoolThread;
+		}
+		
+		public void updateJobHeartbeatTick() {
+			_jobLastHeartbeatTick = System.currentTimeMillis();
+		}
+		
+		public long millisSinceLastJobHeartbeat() {
+			return System.currentTimeMillis() - _jobLastHeartbeatTick;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
new file mode 100644
index 0000000..f2ea4ac
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.Date;
+import java.util.UUID;
+
+import javax.persistence.Column;
+import javax.persistence.DiscriminatorColumn;
+import javax.persistence.DiscriminatorType;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Inheritance;
+import javax.persistence.InheritanceType;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+import javax.persistence.Transient;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.jobs.Job;
+
+import com.cloud.utils.UuidUtils;
+import com.cloud.utils.db.GenericDao;
+
+@Entity
+@Table(name="async_job")
+@Inheritance(strategy=InheritanceType.JOINED)
+@DiscriminatorColumn(name="job_type", discriminatorType=DiscriminatorType.STRING, length=32)
+public class AsyncJobVO implements AsyncJob, Job {
+	
+	@Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id = null;
+	
+    @Column(name="job_type", length=32)
+    protected String type;
+    
+    @Column(name="job_dispatcher", length=64)
+    protected String dispatcher;
+    
+    @Column(name="job_pending_signals")
+    protected int pendingSignals;
+    
+    @Column(name="user_id")
+    private long userId;
+    
+    @Column(name="account_id")
+    private long accountId;
+    
+	@Column(name="job_cmd")
+    private String cmd;
+
+	@Column(name="job_cmd_ver")
+    private int cmdVersion;
+	
+    @Column(name="job_cmd_info", length=65535)
+    private String cmdInfo;
+  
+    @Column(name="job_status")
+    private int status;
+    
+    @Column(name="job_process_status")
+    private int processStatus;
+    
+    @Column(name="job_result_code")
+    private int resultCode;
+    
+    @Column(name="job_result", length=65535)
+    private String result;
+    
+    @Column(name="instance_type", length=64)
+    private String instanceType;
+    
+	@Column(name="instance_id", length=64)
+    private Long instanceId;
+    
+    @Column(name="job_init_msid")
+    private Long initMsid;
+
+    @Column(name="job_complete_msid")
+    private Long completeMsid;
+    
+    @Column(name="job_executing_msid")
+    private Long executingMsid;
+
+    @Column(name=GenericDao.CREATED_COLUMN)
+    private Date created;
+    
+    @Column(name="last_updated")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastUpdated;
+    
+    @Column(name="last_polled")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastPolled;
+    
+    @Column(name=GenericDao.REMOVED_COLUMN)
+    private Date removed;
+    
+    @Column(name="uuid")
+    private String uuid;
+
+    @Transient
+    private SyncQueueItem syncSource = null;
+
+    public AsyncJobVO() {
+        uuid = UUID.randomUUID().toString();
+    }
+
+    public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo, Long instanceId, String instanceType) {
+		this.userId = userId;
+		this.accountId = accountId;
+		this.cmd = cmd;
+		this.cmdInfo = cmdInfo;
+	    uuid = UUID.randomUUID().toString();
+	    this.instanceId = instanceId;
+	    this.instanceType = instanceType;
+    }
+
+    @Override
+    public long getId() {
+		return id;
+	}
+
+	public void setId(Long id) {
+		this.id = id;
+	}
+	
+    @Override
+    public String getShortUuid() {
+        return UuidUtils.first(uuid);
+    }
+
+	@Override
+	public String getType() {
+		return type;
+	}
+	
+	public void setType(String type) {
+		this.type = type;
+	}
+	
+	@Override
+	public String getDispatcher() {
+		return dispatcher;
+	}
+	
+	public void setDispatcher(String dispatcher) {
+		this.dispatcher = dispatcher;
+	}
+	
+	@Override
+	public int getPendingSignals() {
+		return pendingSignals;
+	}
+	
+	public void setPendingSignals(int signals) {
+		pendingSignals = signals;
+	}
+
+	@Override
+    public long getUserId() {
+		return userId;
+	}
+
+	public void setUserId(long userId) {
+		this.userId = userId;
+	}
+
+	@Override
+    public long getAccountId() {
+		return accountId;
+	}
+
+	public void setAccountId(long accountId) {
+		this.accountId = accountId;
+	}
+
+	@Override
+    public String getCmd() {
+		return cmd;
+	}
+
+	public void setCmd(String cmd) {
+		this.cmd = cmd;
+	}
+	
+	@Override
+    public int getCmdVersion() {
+		return cmdVersion;
+	}
+	
+	public void setCmdVersion(int version) {
+		cmdVersion = version;
+	}
+
+	@Override
+    public String getCmdInfo() {
+		return cmdInfo;
+	}
+
+	public void setCmdInfo(String cmdInfo) {
+		this.cmdInfo = cmdInfo;
+	}
+	
+	@Override
+    public int getStatus() {
+		return status;
+	}
+
+	public void setStatus(int status) {
+		this.status = status;
+	}
+	
+	@Override
+    public int getProcessStatus() {
+		return processStatus;
+	}
+	
+	public void setProcessStatus(int status) {
+		processStatus = status;
+	}
+	
+	@Override
+    public int getResultCode() {
+		return resultCode;
+	}
+	
+	public void setResultCode(int resultCode) {
+		this.resultCode = resultCode;
+	}
+
+	@Override
+    public String getResult() {
+		return result;
+	}
+
+	public void setResult(String result) {
+		this.result = result;
+	}
+
+	@Override
+    public Long getInitMsid() {
+		return initMsid;
+	}
+
+	@Override
+	public void setInitMsid(Long initMsid) {
+		this.initMsid = initMsid;
+	}
+	
+	@Override
+	public Long getExecutingMsid() {
+		return executingMsid;
+	}
+	
+	public void setExecutingMsid(Long executingMsid) {
+		this.executingMsid = executingMsid;
+	}
+
+	@Override
+    public Long getCompleteMsid() {
+		return completeMsid;
+	}
+
+	@Override
+	public void setCompleteMsid(Long completeMsid) {
+		this.completeMsid = completeMsid;
+	}
+
+	@Override
+    public Date getCreated() {
+		return created;
+	}
+
+	public void setCreated(Date created) {
+		this.created = created;
+	}
+
+	@Override
+    public Date getLastUpdated() {
+		return lastUpdated;
+	}
+
+	public void setLastUpdated(Date lastUpdated) {
+		this.lastUpdated = lastUpdated;
+	}
+
+	@Override
+    public Date getLastPolled() {
+		return lastPolled;
+	}
+
+	public void setLastPolled(Date lastPolled) {
+		this.lastPolled = lastPolled;
+	}
+
+	@Override
+    public Date getRemoved() {
+		return removed;
+	}
+
+	public void setRemoved(Date removed) {
+		this.removed = removed;
+	}
+	
+    @Override
+    public String getInstanceType() {
+		return instanceType;
+	}
+
+	public void setInstanceType(String instanceType) {
+		this.instanceType = instanceType;
+	}
+
+	@Override
+    public Long getInstanceId() {
+		return instanceId;
+	}
+
+	public void setInstanceId(Long instanceId) {
+		this.instanceId = instanceId;
+	}
+	
+	@Override
+    public SyncQueueItem getSyncSource() {
+        return syncSource;
+    }
+    
+	@Override
+    public void setSyncSource(SyncQueueItem syncSource) {
+        this.syncSource = syncSource;
+    }
+    
+    @Override
+    public String getUuid() {
+    	return uuid;
+    }
+    
+    public void setUuid(String uuid) {
+    	this.uuid = uuid;
+    }
+    
+	@Override
+    public String toString() {
+		StringBuffer sb = new StringBuffer();
+		sb.append("AsyncJobVO {id:").append(getId());
+		sb.append(", userId: ").append(getUserId());
+		sb.append(", accountId: ").append(getAccountId());
+		sb.append(", instanceType: ").append(getInstanceType());
+		sb.append(", instanceId: ").append(getInstanceId());
+		sb.append(", cmd: ").append(getCmd());
+		sb.append(", cmdInfo: ").append(getCmdInfo());
+		sb.append(", cmdVersion: ").append(getCmdVersion());
+		sb.append(", status: ").append(getStatus());
+		sb.append(", processStatus: ").append(getProcessStatus());
+		sb.append(", resultCode: ").append(getResultCode());
+		sb.append(", result: ").append(getResult());
+		sb.append(", initMsid: ").append(getInitMsid());
+		sb.append(", completeMsid: ").append(getCompleteMsid());
+		sb.append(", lastUpdated: ").append(getLastUpdated());
+		sb.append(", lastPolled: ").append(getLastPolled());
+		sb.append(", created: ").append(getCreated());
+		sb.append("}");
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
new file mode 100644
index 0000000..17dd1cc
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.log4j.Logger;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import com.cloud.utils.exception.CloudRuntimeException;
+
+/**
+ * Note: toPairList and appendPairList only support simple POJO objects currently
+ */
+public class JobSerializerHelper {
+    private static final Logger s_logger = Logger.getLogger(JobSerializerHelper.class);
+    public static String token = "/";
+
+    private static Gson s_gson;
+    static {
+        GsonBuilder gsonBuilder = new GsonBuilder();
+        gsonBuilder.setVersion(1.5);
+        s_logger.debug("Job GSON Builder initialized.");
+        s_gson = gsonBuilder.create();
+    }
+
+    public static String toSerializedString(Object result) {
+        if(result != null) {
+            Class<?> clz = result.getClass();
+            return clz.getName() + token + s_gson.toJson(result);
+        }
+        return null;
+    }
+
+    public static Object fromSerializedString(String result) {
+        try {
+            if(result != null && !result.isEmpty()) {
+
+                String[] serializedParts = result.split(token);
+
+                if (serializedParts.length < 2) {
+                    return null;
+                }
+                String clzName = serializedParts[0];
+                String nameField = null;
+                String content = null;
+                if (serializedParts.length == 2) {
+                    content = serializedParts[1];
+                } else {
+                    nameField = serializedParts[1];
+                    int index = result.indexOf(token + nameField + token);
+                    content = result.substring(index + nameField.length() + 2);
+                }
+
+                Class<?> clz;
+                try {
+                    clz = Class.forName(clzName);
+                } catch (ClassNotFoundException e) {
+                    return null;
+                }
+
+                Object obj = s_gson.fromJson(content, clz);
+                return obj;
+            }
+            return null;
+        } catch(RuntimeException e) {
+            throw new CloudRuntimeException("Unable to deserialize: " + result, e);
+        }
+    }
+    
+    public static String toObjectSerializedString(Serializable object) {
+    	assert(object != null);
+    	
+    	ByteArrayOutputStream bs = new ByteArrayOutputStream();
+    	try {
+    		ObjectOutputStream os = new ObjectOutputStream(bs);
+    		os.writeObject(object);
+    		os.close();
+    		bs.close();
+    		
+    		return Base64.encodeBase64URLSafeString(bs.toByteArray());
+    	} catch(IOException e) {
+            throw new CloudRuntimeException("Unable to serialize: " + object, e);
+    	}
+    }
+    
+    public static Object fromObjectSerializedString(String base64EncodedString) {
+    	if(base64EncodedString == null)
+    		return null;
+    	
+    	byte[] content = Base64.decodeBase64(base64EncodedString);
+    	ByteArrayInputStream bs = new ByteArrayInputStream(content);
+    	try {
+    		ObjectInputStream is = new ObjectInputStream(bs);
+    		Object obj = is.readObject();
+    		is.close();
+    		bs.close();
+    		return obj;
+    	} catch(IOException e) {
+            throw new CloudRuntimeException("Unable to serialize: " + base64EncodedString, e);
+        } catch (ClassNotFoundException e) {
+            throw new CloudRuntimeException("Unable to serialize: " + base64EncodedString, e);
+    	}
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
new file mode 100644
index 0000000..04519e7
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+public interface SyncQueueItem {
+    public final String AsyncJobContentType = "AsyncJob";
+
+    /**
+     * @return queue item id
+     */
+    long getId();
+
+    /**
+     * @return queue id
+     */
+    Long getQueueId();
+
+    /**
+     * @return subject object type pointed by the queue item
+     */
+    String getContentType();
+    
+    /**
+     * @return subject object id pointed by the queue item
+     */
+    Long getContentId();
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
new file mode 100644
index 0000000..f8bba02
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import org.apache.cloudstack.api.InternalIdentity;
+
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+@Entity
+@Table(name="sync_queue_item")
+public class SyncQueueItemVO implements SyncQueueItem, InternalIdentity {
+
+    @Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id = null;
+    
+    @Column(name="queue_id")
+    private Long queueId;
+    
+    @Column(name="content_type")
+    private String contentType;
+    
+    @Column(name="content_id")
+    private Long contentId;
+    
+    @Column(name="queue_proc_msid")
+    private Long lastProcessMsid;
+
+    @Column(name="queue_proc_number")
+    private Long lastProcessNumber;
+    
+    @Column(name="queue_proc_time")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastProcessTime;
+    
+    @Column(name="created")
+    private Date created;
+    
+    public long getId() {
+        return id;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    @Override
+    public Long getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(Long queueId) {
+        this.queueId = queueId;
+    }
+
+    @Override
+    public String getContentType() {
+        return contentType;
+    }
+
+    public void setContentType(String contentType) {
+        this.contentType = contentType;
+    }
+    
+    @Override
+    public Long getContentId() {
+        return contentId;
+    }
+
+    public void setContentId(Long contentId) {
+        this.contentId = contentId;
+    }
+
+    public Long getLastProcessMsid() {
+        return lastProcessMsid;
+    }
+
+    public void setLastProcessMsid(Long lastProcessMsid) {
+        this.lastProcessMsid = lastProcessMsid;
+    }
+
+    public Long getLastProcessNumber() {
+        return lastProcessNumber;
+    }
+
+    public void setLastProcessNumber(Long lastProcessNumber) {
+        this.lastProcessNumber = lastProcessNumber;
+    }
+
+    public Date getCreated() {
+        return created;
+    }
+
+    public void setCreated(Date created) {
+        this.created = created;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("SyncQueueItemVO {id:").append(getId()).append(", queueId: ").append(getQueueId());
+        sb.append(", contentType: ").append(getContentType());
+        sb.append(", contentId: ").append(getContentId());
+        sb.append(", lastProcessMsid: ").append(getLastProcessMsid());
+        sb.append(", lastprocessNumber: ").append(getLastProcessNumber());
+        sb.append(", lastProcessTime: ").append(getLastProcessTime());
+        sb.append(", created: ").append(getCreated());
+        sb.append("}");
+        return sb.toString();
+    }
+
+       public Date getLastProcessTime() {
+            return lastProcessTime;
+        }
+
+        public void setLastProcessTime(Date lastProcessTime) {
+            this.lastProcessTime = lastProcessTime;
+        }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
new file mode 100644
index 0000000..202a704
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.List;
+
+import com.cloud.utils.component.Manager;
+
+public interface SyncQueueManager extends Manager {
+    public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit);
+    public SyncQueueItemVO dequeueFromOne(long queueId, Long msid);
+    public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems);
+    public void purgeItem(long queueItemId);
+    public void returnItem(long queueItemId);
+
+	public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
+    public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
+
+    void purgeAsyncJobQueueItemId(long asyncJobId);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
new file mode 100644
index 0000000..b9b5d6b
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import javax.inject.Inject;
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
+import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManager {
+    public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
+
+    @Inject private SyncQueueDao _syncQueueDao;
+    @Inject private SyncQueueItemDao _syncQueueItemDao;
+
+    @Override
+    @DB
+    public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) {
+        Transaction txn = Transaction.currentTxn();
+        try {
+            txn.start();
+
+            _syncQueueDao.ensureQueue(syncObjType, syncObjId);
+            SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
+            if(queueVO == null)
+                throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
+
+            queueVO.setQueueSizeLimit(queueSizeLimit);
+            _syncQueueDao.update(queueVO.getId(), queueVO);
+
+            Date dt = DateUtil.currentGMTTime();
+            SyncQueueItemVO item = new SyncQueueItemVO();
+            item.setQueueId(queueVO.getId());
+            item.setContentType(itemType);
+            item.setContentId(itemId);
+            item.setCreated(dt);
+
+            _syncQueueItemDao.persist(item);
+            txn.commit();
+
+            return queueVO;
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txn.rollback();
+        }
+        return null;
+    }
+
+    @Override
+    @DB
+    public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
+            if(queueVO == null) {
+                s_logger.error("Sync queue(id: " + queueId + ") does not exist");
+                txt.commit();
+                return null;
+            }
+
+            if(queueReadyToProcess(queueVO)) {
+                SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
+                if(itemVO != null) {
+                    Long processNumber = queueVO.getLastProcessNumber();
+                    if(processNumber == null)
+                        processNumber = new Long(1);
+                    else
+                        processNumber = processNumber + 1;
+                    Date dt = DateUtil.currentGMTTime();
+                    queueVO.setLastProcessNumber(processNumber);
+                    queueVO.setLastUpdated(dt);
+                    queueVO.setQueueSize(queueVO.getQueueSize() + 1);
+                    _syncQueueDao.update(queueVO.getId(), queueVO);
+
+                    itemVO.setLastProcessMsid(msid);
+                    itemVO.setLastProcessNumber(processNumber);
+                    itemVO.setLastProcessTime(dt);
+                    _syncQueueItemDao.update(itemVO.getId(), itemVO);
+
+                    txt.commit();
+                    return itemVO;
+                } else {
+                    if(s_logger.isDebugEnabled())
+                        s_logger.debug("Sync queue (" + queueId + ") is currently empty");
+                }
+            } else {
+                if(s_logger.isDebugEnabled())
+                    s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+
+        return null;
+    }
+
+    @Override
+    @DB
+    public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
+
+        List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
+            if(l != null && l.size() > 0) {
+                for(SyncQueueItemVO item : l) {
+                    SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
+                    SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
+                    if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
+                        Long processNumber = queueVO.getLastProcessNumber();
+                        if(processNumber == null)
+                            processNumber = new Long(1);
+                        else
+                            processNumber = processNumber + 1;
+
+                        Date dt = DateUtil.currentGMTTime();
+                        queueVO.setLastProcessNumber(processNumber);
+                        queueVO.setLastUpdated(dt);
+                        queueVO.setQueueSize(queueVO.getQueueSize() + 1);
+                        _syncQueueDao.update(queueVO.getId(), queueVO);
+
+                        itemVO.setLastProcessMsid(msid);
+                        itemVO.setLastProcessNumber(processNumber);
+                        itemVO.setLastProcessTime(dt);
+                        _syncQueueItemDao.update(item.getId(), itemVO);
+
+                        resultList.add(item);
+                    }
+                }
+            }
+            txt.commit();
+            return resultList;
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+        return null;
+    }
+
+    @Override
+    @DB
+    public void purgeItem(long queueItemId) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
+            if(itemVO != null) {
+                SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
+
+                _syncQueueItemDao.expunge(itemVO.getId());
+
+                // if item is active, reset queue information
+                if (itemVO.getLastProcessMsid() != null) {
+                    queueVO.setLastUpdated(DateUtil.currentGMTTime());
+                    // decrement the count
+                    assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
+                    queueVO.setQueueSize(queueVO.getQueueSize() - 1);
+                    _syncQueueDao.update(queueVO.getId(), queueVO);
+                }
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+    }
+
+    @Override
+    @DB
+    public void returnItem(long queueItemId) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
+            if(itemVO != null) {
+                SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
+
+                itemVO.setLastProcessMsid(null);
+                itemVO.setLastProcessNumber(null);
+                itemVO.setLastProcessTime(null);
+                _syncQueueItemDao.update(queueItemId, itemVO);
+
+                queueVO.setLastUpdated(DateUtil.currentGMTTime());
+                _syncQueueDao.update(queueVO.getId(), queueVO);
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+    }
+
+    @Override
+    public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
+        return _syncQueueItemDao.getActiveQueueItems(msid, exclusive);
+    }
+
+    @Override
+    public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
+        return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
+    }
+
+    private boolean queueReadyToProcess(SyncQueueVO queueVO) {
+    	return true;
+    	
+    	//
+    	// TODO
+    	//
+    	// Need to disable concurrency disable at queue level due to the need to support
+    	// job wake-up dispatching task
+    	//
+    	// Concurrency control is better done at higher level and leave the job scheduling/serializing simpler
+    	//
+    	
+        // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+    }
+
+    @Override
+    public void purgeAsyncJobQueueItemId(long asyncJobId) {
+        Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
+        if (itemId != null) {
+            purgeItem(itemId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
new file mode 100644
index 0000000..4fd4740
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.framework.jobs.impl;
+
+import org.apache.cloudstack.api.InternalIdentity;
+
+import java.util.Date;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+@Entity
+@Table(name="sync_queue")
+public class SyncQueueVO implements InternalIdentity {
+
+    @Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id;
+
+    @Column(name="sync_objtype")
+
+    private String syncObjType;
+
+    @Column(name="sync_objid")
+    private Long syncObjId;
+
+    @Column(name="queue_proc_number")
+    private Long lastProcessNumber;
+
+    @Column(name="created")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date created;
+
+    @Column(name="last_updated")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastUpdated;
+
+    @Column(name="queue_size")
+    private long queueSize = 0;
+
+    @Column(name="queue_size_limit")
+    private long queueSizeLimit = 0;
+
+    public long getId() {
+        return id;
+    }
+
+    public String getSyncObjType() {
+        return syncObjType;
+    }
+
+    public void setSyncObjType(String syncObjType) {
+        this.syncObjType = syncObjType;
+    }
+
+    public Long getSyncObjId() {
+        return syncObjId;
+    }
+
+    public void setSyncObjId(Long syncObjId) {
+        this.syncObjId = syncObjId;
+    }
+    
+    public Long getLastProcessNumber() {
+        return lastProcessNumber;
+    }
+    
+    public void setLastProcessNumber(Long number) {
+        lastProcessNumber = number;
+    }
+
+    public Date getCreated() {
+        return created;
+    }
+
+    public void setCreated(Date created) {
+        this.created = created;
+    }
+
+    public Date getLastUpdated() {
+        return lastUpdated;
+    }
+
+    public void setLastUpdated(Date lastUpdated) {
+        this.lastUpdated = lastUpdated;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("SyncQueueVO {id:").append(getId());
+        sb.append(", syncObjType: ").append(getSyncObjType());
+        sb.append(", syncObjId: ").append(getSyncObjId());
+        sb.append(", lastProcessNumber: ").append(getLastProcessNumber());
+        sb.append(", lastUpdated: ").append(getLastUpdated());
+        sb.append(", created: ").append(getCreated());
+        sb.append(", count: ").append(getQueueSize());
+        sb.append("}");
+        return sb.toString();
+    }
+
+    public long getQueueSize() {
+        return queueSize;
+    }
+
+    public void setQueueSize(long queueSize) {
+        this.queueSize = queueSize;
+    }
+
+    public long getQueueSizeLimit() {
+        return queueSizeLimit;
+    }
+
+    public void setQueueSizeLimit(long queueSizeLimit) {
+        this.queueSizeLimit = queueSizeLimit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/api/ApiGsonHelper.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiGsonHelper.java b/server/src/com/cloud/api/ApiGsonHelper.java
index 6163860..c24808b 100644
--- a/server/src/com/cloud/api/ApiGsonHelper.java
+++ b/server/src/com/cloud/api/ApiGsonHelper.java
@@ -16,12 +16,27 @@
 // under the License.
 package com.cloud.api;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
 import com.google.gson.GsonBuilder;
+
 import org.apache.cloudstack.api.ResponseObject;
 
-import java.util.Map;
+import com.cloud.serializer.Param;
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.Pair;
 
 public class ApiGsonHelper {
+    private static final Logger s_logger = Logger.getLogger(ApiGsonHelper.class);
     private static final GsonBuilder s_gBuilder;
     static {
         s_gBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
@@ -33,4 +48,107 @@ public class ApiGsonHelper {
     public static GsonBuilder getBuilder() {
         return s_gBuilder;
     }
+
+    public static List<Pair<String, Object>> toPairList(Object o, String name) {
+        List<Pair<String, Object>> l = new ArrayList<Pair<String, Object>>();
+        return appendPairList(l, o, name);
+    }
+
+    public static List<Pair<String, Object>> appendPairList(List<Pair<String, Object>> l, Object o, String name) {
+        if (o != null) {
+            Class<?> clz = o.getClass();
+
+            if (clz.isPrimitive() || clz.getSuperclass() == Number.class || clz == String.class || clz == Date.class) {
+                l.add(new Pair<String, Object>(name, o.toString()));
+                return l;
+            }
+
+            for (Field f : clz.getDeclaredFields()) {
+                if ((f.getModifiers() & Modifier.STATIC) != 0) {
+                    continue;
+                }
+
+                Param param = f.getAnnotation(Param.class);
+                if (param == null) {
+                    continue;
+                }
+
+                String propName = f.getName();
+                if (!param.propName().isEmpty()) {
+                    propName = param.propName();
+                }
+
+                String paramName = param.name();
+                if (paramName.isEmpty()) {
+                    paramName = propName;
+                }
+
+                Method method = getGetMethod(o, propName);
+                if (method != null) {
+                    try {
+                        Object fieldValue = method.invoke(o);
+                        if (fieldValue != null) {
+                            if (f.getType() == Date.class) {
+                                l.add(new Pair<String, Object>(paramName, DateUtil.getOutputString((Date)fieldValue)));
+                            } else {
+                                l.add(new Pair<String, Object>(paramName, fieldValue.toString()));
+                            }
+                        }
+                        //else
+                        //  l.add(new Pair<String, Object>(paramName, ""));
+                    } catch (IllegalArgumentException e) {
+                        s_logger.error("Illegal argument exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
+
+                    } catch (IllegalAccessException e) {
+                        s_logger.error("Illegal access exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
+                    } catch (InvocationTargetException e) {
+                        s_logger.error("Invocation target exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
+                    }
+                }
+            }
+        }
+        return l;
+    }
+
+    private static Method getGetMethod(Object o, String propName) {
+        Method method = null;
+        String methodName = getGetMethodName("get", propName);
+        try {
+            method = o.getClass().getMethod(methodName);
+        } catch (SecurityException e1) {
+            s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
+        } catch (NoSuchMethodException e1) {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName
+                        + ", will check is-prefixed method to see if it is boolean property");
+            }
+        }
+
+        if (method != null) {
+            return method;
+        }
+
+        methodName = getGetMethodName("is", propName);
+        try {
+            method = o.getClass().getMethod(methodName);
+        } catch (SecurityException e1) {
+            s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
+        } catch (NoSuchMethodException e1) {
+            s_logger.warn("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName);
+        }
+        return method;
+    }
+
+    private static String getGetMethodName(String prefix, String fieldName) {
+        StringBuffer sb = new StringBuffer(prefix);
+
+        if (fieldName.length() >= prefix.length() && fieldName.substring(0, prefix.length()).equals(prefix)) {
+            return fieldName;
+        } else {
+            sb.append(fieldName.substring(0, 1).toUpperCase());
+            sb.append(fieldName.substring(1));
+        }
+
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/api/ApiServer.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java
index 175c8b8..4db9bb8 100755
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@ -116,7 +116,7 @@ import org.apache.cloudstack.api.response.ListResponse;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 import org.apache.cloudstack.region.RegionManager;
 
 import com.cloud.api.response.ApiResponseSerializer;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/async/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobExecutionContext.java b/server/src/com/cloud/async/AsyncJobExecutionContext.java
index e6efd03..9ca38a9 100644
--- a/server/src/com/cloud/async/AsyncJobExecutionContext.java
+++ b/server/src/com/cloud/async/AsyncJobExecutionContext.java
@@ -20,15 +20,15 @@ import javax.inject.Inject;
 
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.SyncQueueItem;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
 
 import com.cloud.exception.ConcurrentOperationException;
 import com.cloud.exception.InsufficientCapacityException;
 import com.cloud.exception.ResourceUnavailableException;
-import com.cloud.serializer.SerializerHelper;
 import com.cloud.utils.component.ComponentContext;
 
 public class AsyncJobExecutionContext  {
@@ -109,7 +109,7 @@ public class AsyncJobExecutionContext  {
     	
     	AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
     	if(record.getJoinStatus() == AsyncJobConstants.STATUS_FAILED && record.getJoinResult() != null) {
-    		Object exception = SerializerHelper.fromObjectSerializedString(record.getJoinResult());
+    		Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
     		if(exception != null && exception instanceof Exception) {
     			if(exception instanceof InsufficientCapacityException)
     				throw (InsufficientCapacityException)exception;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 42cbae8..f147bb0 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -43,19 +43,19 @@ import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
 import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
-import org.apache.cloudstack.framework.jobs.AsyncJobJournalVO;
-import org.apache.cloudstack.framework.jobs.AsyncJobMBeanImpl;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobMonitor;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
-import org.apache.cloudstack.framework.jobs.SyncQueueItem;
-import org.apache.cloudstack.framework.jobs.SyncQueueItemVO;
-import org.apache.cloudstack.framework.jobs.SyncQueueManager;
-import org.apache.cloudstack.framework.jobs.SyncQueueVO;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobMBeanImpl;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueManager;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
 import org.apache.cloudstack.framework.messagebus.MessageBus;
 import org.apache.cloudstack.framework.messagebus.MessageDetector;
 import org.apache.cloudstack.framework.messagebus.PublishScope;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
index edc4216..32d1c19 100644
--- a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
+++ b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
@@ -35,8 +35,8 @@ import org.apache.cloudstack.api.command.user.snapshot.CreateSnapshotCmd;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 
 import com.cloud.api.ApiDispatcher;
 import com.cloud.api.ApiGsonHelper;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/template/TemplateManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/template/TemplateManagerImpl.java b/server/src/com/cloud/template/TemplateManagerImpl.java
index c811b5e..3317fa1 100755
--- a/server/src/com/cloud/template/TemplateManagerImpl.java
+++ b/server/src/com/cloud/template/TemplateManagerImpl.java
@@ -68,7 +68,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
 import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
 import org.apache.cloudstack.framework.async.AsyncCallFuture;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
index a5eceec..8109128 100644
--- a/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
+++ b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
@@ -29,8 +29,8 @@ import org.apache.log4j.Logger;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
 import org.apache.cloudstack.vm.jobs.VmWorkJobDao;
 import org.apache.cloudstack.vm.jobs.VmWorkJobVO;
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/test/com/cloud/vm/VmWorkTest.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTest.java b/server/test/com/cloud/vm/VmWorkTest.java
index 3cb78da..d236cc3 100644
--- a/server/test/com/cloud/vm/VmWorkTest.java
+++ b/server/test/com/cloud/vm/VmWorkTest.java
@@ -41,7 +41,6 @@ import com.cloud.deploy.DeploymentPlan;
 import com.cloud.deploy.DeploymentPlanner.ExcludeList;
 import com.cloud.exception.InsufficientCapacityException;
 import com.cloud.exception.InsufficientStorageCapacityException;
-import com.cloud.serializer.SerializerHelper;
 import com.cloud.utils.LogUtils;
 import com.cloud.utils.Predicate;
 import com.cloud.utils.component.ComponentContext;
@@ -49,7 +48,8 @@ import com.cloud.utils.db.Transaction;
 import com.google.gson.Gson;
 
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
 import org.apache.cloudstack.vm.jobs.VmWorkJobDao;
 import org.apache.cloudstack.vm.jobs.VmWorkJobVO;
 import org.apache.cloudstack.vm.jobs.VmWorkJobVO.Step;
@@ -167,10 +167,10 @@ public class VmWorkTest extends TestCase {
 	public void testExceptionSerialization() {
 		InsufficientCapacityException exception = new InsufficientStorageCapacityException("foo", VmWorkJobVO.class, 1L);
 		
-		String encodedString = SerializerHelper.toObjectSerializedString(exception);
+		String encodedString = JobSerializerHelper.toObjectSerializedString(exception);
 		System.out.println(encodedString);
 
-		exception = (InsufficientCapacityException)SerializerHelper.fromObjectSerializedString(encodedString);
+		exception = (InsufficientCapacityException)JobSerializerHelper.fromObjectSerializedString(encodedString);
 		Assert.assertTrue(exception.getScope() == VmWorkJobVO.class);
 		Assert.assertTrue(exception.getMessage().equals("foo"));
 	}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/test/com/cloud/vm/VmWorkTestConfiguration.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTestConfiguration.java b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
index b436b16..cd0dc2c 100644
--- a/server/test/com/cloud/vm/VmWorkTestConfiguration.java
+++ b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
@@ -20,9 +20,6 @@ import org.mockito.Mockito;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import org.apache.cloudstack.framework.jobs.AsyncJobMonitor;
-import org.apache.cloudstack.framework.jobs.SyncQueueManager;
-import org.apache.cloudstack.framework.jobs.SyncQueueManagerImpl;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobDaoImpl;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
@@ -33,6 +30,9 @@ import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
 import org.apache.cloudstack.framework.jobs.dao.SyncQueueDaoImpl;
 import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
 import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDaoImpl;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueManager;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueManagerImpl;
 import org.apache.cloudstack.vm.jobs.VmWorkJobDao;
 import org.apache.cloudstack.vm.jobs.VmWorkJobDaoImpl;
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/utils/pom.xml
----------------------------------------------------------------------
diff --git a/utils/pom.xml b/utils/pom.xml
index 0690c35..878e91d 100644
--- a/utils/pom.xml
+++ b/utils/pom.xml
@@ -64,6 +64,11 @@
       <version>${cs.codec.version}</version>
     </dependency>
     <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>${cs.gson.version}</version>
+    </dependency>
+    <dependency>
       <groupId>commons-collections</groupId>
       <artifactId>commons-collections</artifactId>
       <version>${cs.collections.version}</version>


[3/3] git commit: updated refs/heads/vmsync to bd0c239

Posted by ah...@apache.org.
Added missing call context


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/bd0c239f
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/bd0c239f
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/bd0c239f

Branch: refs/heads/vmsync
Commit: bd0c239f6826c9fa0892e01d2499619a960641be
Parents: dd11254
Author: Alex Huang <al...@gmail.com>
Authored: Tue Jun 4 15:32:39 2013 -0700
Committer: Alex Huang <al...@gmail.com>
Committed: Tue Jun 4 15:32:39 2013 -0700

----------------------------------------------------------------------
 server/src/com/cloud/vm/SystemVmLoadScanner.java |    9 +++++++++
 1 files changed, 9 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/bd0c239f/server/src/com/cloud/vm/SystemVmLoadScanner.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/SystemVmLoadScanner.java b/server/src/com/cloud/vm/SystemVmLoadScanner.java
index 4251b40..a220757 100644
--- a/server/src/com/cloud/vm/SystemVmLoadScanner.java
+++ b/server/src/com/cloud/vm/SystemVmLoadScanner.java
@@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 
+import org.apache.cloudstack.context.CallContext;
+
+import com.cloud.exception.CloudAuthenticationException;
 import com.cloud.utils.Pair;
 import com.cloud.utils.concurrency.NamedThreadFactory;
 import com.cloud.utils.db.GlobalLock;
@@ -67,6 +70,12 @@ public class SystemVmLoadScanner<T> {
 
             @Override
             public void run() {
+                try {
+                    CallContext.registerOnceOnly();
+                } catch (CloudAuthenticationException e) {
+                    s_logger.error("Unable to start the capacity scan task", e);
+                    System.exit(1);
+                }
                 Transaction txn = Transaction.open(Transaction.CLOUD_DB);
                 try {
                     reallyRun();


[2/3] git commit: updated refs/heads/vmsync to bd0c239

Posted by ah...@apache.org.
further refactored jobs


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/dd112540
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/dd112540
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/dd112540

Branch: refs/heads/vmsync
Commit: dd11254087fbe2a46bb7a09467ab124f48204849
Parents: 51f533e
Author: Alex Huang <al...@gmail.com>
Authored: Tue Jun 4 13:54:33 2013 -0700
Committer: Alex Huang <al...@gmail.com>
Committed: Tue Jun 4 13:54:33 2013 -0700

----------------------------------------------------------------------
 client/tomcatconf/applicationContext.xml.in        |    4 +-
 .../src/com/cloud/serializer/SerializerHelper.java |  233 ---------
 .../org/apache/cloudstack/vm/jobs/VmWorkJobVO.java |    2 +-
 .../apache/cloudstack/framework/jobs/AsyncJob.java |    1 +
 .../framework/jobs/AsyncJobJoinMapVO.java          |  210 --------
 .../framework/jobs/AsyncJobJournalVO.java          |  108 ----
 .../framework/jobs/AsyncJobMBeanImpl.java          |  143 ------
 .../cloudstack/framework/jobs/AsyncJobManager.java |    1 +
 .../cloudstack/framework/jobs/AsyncJobMonitor.java |  179 -------
 .../cloudstack/framework/jobs/AsyncJobVO.java      |  384 --------------
 .../cloudstack/framework/jobs/SyncQueueItem.java   |   41 --
 .../cloudstack/framework/jobs/SyncQueueItemVO.java |  143 ------
 .../framework/jobs/SyncQueueManager.java           |   34 --
 .../framework/jobs/SyncQueueManagerImpl.java       |  258 ----------
 .../cloudstack/framework/jobs/SyncQueueVO.java     |  137 -----
 .../cloudstack/framework/jobs/dao/AsyncJobDao.java |    2 +-
 .../framework/jobs/dao/AsyncJobDaoImpl.java        |    2 +-
 .../framework/jobs/dao/AsyncJobJoinMapDao.java     |    2 +-
 .../framework/jobs/dao/AsyncJobJoinMapDaoImpl.java |    2 +-
 .../framework/jobs/dao/AsyncJobJournalDao.java     |    2 +-
 .../framework/jobs/dao/AsyncJobJournalDaoImpl.java |    2 +-
 .../framework/jobs/dao/SyncQueueDao.java           |    2 +-
 .../framework/jobs/dao/SyncQueueDaoImpl.java       |    2 +-
 .../framework/jobs/dao/SyncQueueItemDao.java       |    2 +-
 .../framework/jobs/dao/SyncQueueItemDaoImpl.java   |    2 +-
 .../framework/jobs/impl/AsyncJobJoinMapVO.java     |  210 ++++++++
 .../framework/jobs/impl/AsyncJobJournalVO.java     |  111 +++++
 .../framework/jobs/impl/AsyncJobMBeanImpl.java     |  147 ++++++
 .../framework/jobs/impl/AsyncJobMonitor.java       |  182 +++++++
 .../cloudstack/framework/jobs/impl/AsyncJobVO.java |  385 +++++++++++++++
 .../framework/jobs/impl/JobSerializerHelper.java   |  127 +++++
 .../framework/jobs/impl/SyncQueueItem.java         |   41 ++
 .../framework/jobs/impl/SyncQueueItemVO.java       |  143 ++++++
 .../framework/jobs/impl/SyncQueueManager.java      |   34 ++
 .../framework/jobs/impl/SyncQueueManagerImpl.java  |  258 ++++++++++
 .../framework/jobs/impl/SyncQueueVO.java           |  137 +++++
 server/src/com/cloud/api/ApiGsonHelper.java        |  120 +++++-
 server/src/com/cloud/api/ApiServer.java            |    2 +-
 .../com/cloud/async/AsyncJobExecutionContext.java  |    8 +-
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |   18 +-
 .../storage/snapshot/SnapshotSchedulerImpl.java    |    2 +-
 .../com/cloud/template/TemplateManagerImpl.java    |    2 +-
 .../com/cloud/vm/VmWorkJobWakeupDispatcher.java    |    2 +-
 server/test/com/cloud/vm/VmWorkTest.java           |    8 +-
 .../test/com/cloud/vm/VmWorkTestConfiguration.java |    6 +-
 utils/pom.xml                                      |    5 +
 46 files changed, 1938 insertions(+), 1908 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/client/tomcatconf/applicationContext.xml.in
----------------------------------------------------------------------
diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in
index 1c98346..80f380b 100644
--- a/client/tomcatconf/applicationContext.xml.in
+++ b/client/tomcatconf/applicationContext.xml.in
@@ -809,10 +809,10 @@
   <bean id="asyncJobJournalDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDaoImpl" />
   <bean id="asyncJobJoinMapDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDaoImpl" />
   <bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl"/>
-  <bean id="asyncJobMonitor" class="org.apache.cloudstack.framework.jobs.AsyncJobMonitor"/>
+  <bean id="asyncJobMonitor" class="org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor"/>
   <bean id="syncQueueDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.SyncQueueDaoImpl" />
   <bean id="syncQueueItemDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDaoImpl" />
-  <bean id="syncQueueManagerImpl" class="org.apache.cloudstack.framework.jobs.SyncQueueManagerImpl" />
+  <bean id="syncQueueManagerImpl" class="org.apache.cloudstack.framework.jobs.impl.SyncQueueManagerImpl" />
 
   <bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher">
     <property name="name" value="ApiAsyncJobDispatcher" />

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/core/src/com/cloud/serializer/SerializerHelper.java
----------------------------------------------------------------------
diff --git a/core/src/com/cloud/serializer/SerializerHelper.java b/core/src/com/cloud/serializer/SerializerHelper.java
deleted file mode 100644
index e3449f6..0000000
--- a/core/src/com/cloud/serializer/SerializerHelper.java
+++ /dev/null
@@ -1,233 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.serializer;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.log4j.Logger;
-
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.Pair;
-import com.google.gson.Gson;
-
-/**
- * Note: toPairList and appendPairList only support simple POJO objects currently
- */
-public class SerializerHelper {
-    public static final Logger s_logger = Logger.getLogger(SerializerHelper.class.getName());
-    public static String token = "/";
-
-    public static String toSerializedString(Object result) {
-        if(result != null) {
-            Class<?> clz = result.getClass();
-            Gson gson = GsonHelper.getGson();
-            return clz.getName() + token + gson.toJson(result);
-        }
-        return null;
-    }
-
-    public static Object fromSerializedString(String result) {
-        try {
-            if(result != null && !result.isEmpty()) {
-
-                String[] serializedParts = result.split(token);
-
-                if (serializedParts.length < 2) {
-                    return null;
-                }
-                String clzName = serializedParts[0];
-                String nameField = null;
-                String content = null;
-                if (serializedParts.length == 2) {
-                    content = serializedParts[1];
-                } else {
-                    nameField = serializedParts[1];
-                    int index = result.indexOf(token + nameField + token);
-                    content = result.substring(index + nameField.length() + 2);
-                }
-
-                Class<?> clz;
-                try {
-                    clz = Class.forName(clzName);
-                } catch (ClassNotFoundException e) {
-                    return null;
-                }
-
-                Gson gson = GsonHelper.getGson();
-                Object obj = gson.fromJson(content, clz);
-                return obj;
-            }
-            return null;
-        } catch(RuntimeException e) {
-            s_logger.error("Caught runtime exception when doing GSON deserialization on: " + result);
-            throw e;
-        }
-    }
-    
-    public static String toObjectSerializedString(Serializable object) {
-    	assert(object != null);
-    	
-    	ByteArrayOutputStream bs = new ByteArrayOutputStream();
-    	try {
-    		ObjectOutputStream os = new ObjectOutputStream(bs);
-    		os.writeObject(object);
-    		os.close();
-    		bs.close();
-    		
-    		return Base64.encodeBase64URLSafeString(bs.toByteArray());
-    	} catch(IOException e) {
-    		s_logger.error("Unexpected exception", e);
-    	}
-    	return null;
-    }
-    
-    public static Object fromObjectSerializedString(String base64EncodedString) {
-    	if(base64EncodedString == null)
-    		return null;
-    	
-    	byte[] content = Base64.decodeBase64(base64EncodedString);
-    	ByteArrayInputStream bs = new ByteArrayInputStream(content);
-    	try {
-    		ObjectInputStream is = new ObjectInputStream(bs);
-    		Object obj = is.readObject();
-    		is.close();
-    		bs.close();
-    		return obj;
-    	} catch(IOException e) {
-    		s_logger.error("Unexpected exception", e);
-    	} catch(ClassNotFoundException e) {
-    		s_logger.error("Unexpected exception", e);
-    	}
-    	
-    	return null;
-    }
-
-    public static List<Pair<String, Object>> toPairList(Object o, String name) {
-        List<Pair<String, Object>> l = new ArrayList<Pair<String, Object>>();
-        return appendPairList(l, o, name);
-    }
-
-    public static List<Pair<String, Object>> appendPairList(List<Pair<String, Object>> l, Object o, String name) {
-        if(o != null) {
-            Class<?> clz = o.getClass();
-
-            if(clz.isPrimitive() || clz.getSuperclass() == Number.class || clz == String.class || clz == Date.class) {
-                l.add(new Pair<String, Object>(name, o.toString()));
-                return l;
-            }
-
-            for(Field f : clz.getDeclaredFields()) {
-                if((f.getModifiers() & Modifier.STATIC) != 0) {
-                    continue;
-                }
-
-                Param param = f.getAnnotation(Param.class);
-                if(param == null) {
-                    continue;
-                }
-
-                String propName = f.getName();
-                if(!param.propName().isEmpty()) {
-                    propName = param.propName();
-                }
-
-                String paramName = param.name();
-                if(paramName.isEmpty()) {
-                    paramName = propName;
-                }
-
-                Method method = getGetMethod(o, propName);
-                if(method != null) {
-                    try {
-                        Object fieldValue = method.invoke(o);
-                        if(fieldValue != null) {
-                            if (f.getType() == Date.class) {
-                                l.add(new Pair<String, Object>(paramName, DateUtil.getOutputString((Date)fieldValue)));
-                            } else {
-                                l.add(new Pair<String, Object>(paramName, fieldValue.toString()));
-                            }
-                        }
-                        //else
-                        //	l.add(new Pair<String, Object>(paramName, ""));
-                    } catch (IllegalArgumentException e) {
-                        s_logger.error("Illegal argument exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
-
-                    } catch (IllegalAccessException e) {
-                        s_logger.error("Illegal access exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
-                    } catch (InvocationTargetException e) {
-                        s_logger.error("Invocation target exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
-                    }
-                }
-            }
-        }
-        return l;
-    }
-
-    private static Method getGetMethod(Object o, String propName) {
-        Method method = null;
-        String methodName = getGetMethodName("get", propName);
-        try {
-            method = o.getClass().getMethod(methodName);
-        } catch (SecurityException e1) {
-            s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
-        } catch (NoSuchMethodException e1) {
-            if(s_logger.isTraceEnabled()) {
-                s_logger.trace("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName + ", will check is-prefixed method to see if it is boolean property");
-            }
-        }
-
-        if(method != null) {
-            return method;
-        }
-
-        methodName = getGetMethodName("is", propName);
-        try {
-            method = o.getClass().getMethod(methodName);
-        } catch (SecurityException e1) {
-            s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
-        } catch (NoSuchMethodException e1) {
-            s_logger.warn("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName);
-        }
-        return method;
-    }
-
-    private static String getGetMethodName(String prefix, String fieldName) {
-        StringBuffer sb = new StringBuffer(prefix);
-
-        if(fieldName.length() >= prefix.length() && fieldName.substring(0, prefix.length()).equals(prefix)) {
-            return fieldName;
-        } else {
-            sb.append(fieldName.substring(0, 1).toUpperCase());
-            sb.append(fieldName.substring(1));
-        }
-
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobVO.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobVO.java b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobVO.java
index bdf8301..f4dd395 100644
--- a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobVO.java
+++ b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobVO.java
@@ -24,7 +24,7 @@ import javax.persistence.Enumerated;
 import javax.persistence.PrimaryKeyJoinColumn;
 import javax.persistence.Table;
 
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 
 import com.cloud.vm.VirtualMachine;
 import com.cloud.vm.VirtualMachine.Type;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
index 7a898e8..92c89ee 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
@@ -18,6 +18,7 @@ package org.apache.cloudstack.framework.jobs;
 
 import java.util.Date;
 
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
 import org.apache.cloudstack.jobs.Job;
 
 public interface AsyncJob extends Job {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJoinMapVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJoinMapVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJoinMapVO.java
deleted file mode 100644
index 56eddf9..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJoinMapVO.java
+++ /dev/null
@@ -1,210 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.Date;
-
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-import javax.persistence.Temporal;
-import javax.persistence.TemporalType;
-
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.db.GenericDao;
-
-@Entity
-@Table(name="async_job_join_map")
-public class AsyncJobJoinMapVO {
-	@Id
-    @GeneratedValue(strategy=GenerationType.IDENTITY)
-    @Column(name="id")
-    private Long id = null;
-
-    @Column(name="job_id")
-	private long jobId;
-    
-    @Column(name="join_job_id")
-	private long joinJobId;
-
-    @Column(name="join_status")
-    private int joinStatus;
-    
-    @Column(name="join_result", length=1024)
-    private String joinResult;
-
-    @Column(name="join_msid")
-    private long joinMsid;
-    
-    @Column(name="complete_msid")
-    private Long completeMsid;
-    
-    @Column(name="sync_source_id")
-    private Long syncSourceId;
-    
-    @Column(name="wakeup_handler")
-    private String wakeupHandler;
-
-    @Column(name="wakeup_dispatcher")
-    private String wakeupDispatcher;
-
-    @Column(name="wakeup_interval")
-    private long wakeupInterval;
-    
-    @Column(name=GenericDao.CREATED_COLUMN)
-    private Date created;
-    
-    @Column(name="last_updated")
-    @Temporal(TemporalType.TIMESTAMP)
-    private Date lastUpdated;
-
-    @Column(name="next_wakeup")
-    @Temporal(TemporalType.TIMESTAMP)
-    private Date nextWakeupTime;
-    
-    @Column(name="expiration")
-    @Temporal(TemporalType.TIMESTAMP)
-    private Date expiration;
-
-    public AsyncJobJoinMapVO() {
-    	created = DateUtil.currentGMTTime();
-    	lastUpdated = DateUtil.currentGMTTime();
-    }
-    
-	public Long getId() {
-		return id;
-	}
-
-	public void setId(Long id) {
-		this.id = id;
-	}
-
-	public long getJobId() {
-		return jobId;
-	}
-
-	public void setJobId(long jobId) {
-		this.jobId = jobId;
-	}
-
-	public long getJoinJobId() {
-		return joinJobId;
-	}
-
-	public void setJoinJobId(long joinJobId) {
-		this.joinJobId = joinJobId;
-	}
-
-	public int getJoinStatus() {
-		return joinStatus;
-	}
-
-	public void setJoinStatus(int joinStatus) {
-		this.joinStatus = joinStatus;
-	}
-
-	public String getJoinResult() {
-		return joinResult;
-	}
-
-	public void setJoinResult(String joinResult) {
-		this.joinResult = joinResult;
-	}
-
-	public long getJoinMsid() {
-		return joinMsid;
-	}
-
-	public void setJoinMsid(long joinMsid) {
-		this.joinMsid = joinMsid;
-	}
-
-	public Long getCompleteMsid() {
-		return completeMsid;
-	}
-
-	public void setCompleteMsid(Long completeMsid) {
-		this.completeMsid = completeMsid;
-	}
-
-	public Date getCreated() {
-		return created;
-	}
-
-	public void setCreated(Date created) {
-		this.created = created;
-	}
-
-	public Date getLastUpdated() {
-		return lastUpdated;
-	}
-
-	public void setLastUpdated(Date lastUpdated) {
-		this.lastUpdated = lastUpdated;
-	}
-
-	public Long getSyncSourceId() {
-		return syncSourceId;
-	}
-
-	public void setSyncSourceId(Long syncSourceId) {
-		this.syncSourceId = syncSourceId;
-	}
-
-	public String getWakeupHandler() {
-		return wakeupHandler;
-	}
-
-	public void setWakeupHandler(String wakeupHandler) {
-		this.wakeupHandler = wakeupHandler;
-	}
-
-	public String getWakeupDispatcher() {
-		return wakeupDispatcher;
-	}
-
-	public void setWakeupDispatcher(String wakeupDispatcher) {
-		this.wakeupDispatcher = wakeupDispatcher;
-	}
-
-	public long getWakeupInterval() {
-		return wakeupInterval;
-	}
-
-	public void setWakeupInterval(long wakeupInterval) {
-		this.wakeupInterval = wakeupInterval;
-	}
-
-	public Date getNextWakeupTime() {
-		return nextWakeupTime;
-	}
-
-	public void setNextWakeupTime(Date nextWakeupTime) {
-		this.nextWakeupTime = nextWakeupTime;
-	}
-
-	public Date getExpiration() {
-		return expiration;
-	}
-
-	public void setExpiration(Date expiration) {
-		this.expiration = expiration;
-	}
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJournalVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJournalVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJournalVO.java
deleted file mode 100644
index 8cb5078..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJournalVO.java
+++ /dev/null
@@ -1,108 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.Date;
-
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.EnumType;
-import javax.persistence.Enumerated;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.db.GenericDao;
-
-@Entity
-@Table(name="async_job_journal")
-public class AsyncJobJournalVO {
-	@Id
-    @GeneratedValue(strategy=GenerationType.IDENTITY)
-    @Column(name="id")
-    private Long id = null;
-
-    @Column(name="job_id")
-	private long jobId;
-    
-    @Column(name="journal_type", updatable=false, nullable=false, length=32)
-    @Enumerated(value=EnumType.STRING)
-    private AsyncJob.JournalType journalType;
-    
-    @Column(name="journal_text", length=1024)
-    private String journalText;
-
-    @Column(name="journal_obj", length=1024)
-    private String journalObjJsonString;
-    
-    @Column(name=GenericDao.CREATED_COLUMN)
-    protected Date created;
-
-    public AsyncJobJournalVO() {
-    	created = DateUtil.currentGMTTime();
-    }
-    
-	public Long getId() {
-		return id;
-	}
-
-	public void setId(Long id) {
-		this.id = id;
-	}
-
-	public long getJobId() {
-		return jobId;
-	}
-
-	public void setJobId(long jobId) {
-		this.jobId = jobId;
-	}
-
-	public AsyncJob.JournalType getJournalType() {
-		return journalType;
-	}
-
-	public void setJournalType(AsyncJob.JournalType journalType) {
-		this.journalType = journalType;
-	}
-
-	public String getJournalText() {
-		return journalText;
-	}
-
-	public void setJournalText(String journalText) {
-		this.journalText = journalText;
-	}
-
-	public String getJournalObjJsonString() {
-		return journalObjJsonString;
-	}
-
-	public void setJournalObjJsonString(String journalObjJsonString) {
-		this.journalObjJsonString = journalObjJsonString;
-	}
-
-	public Date getCreated() {
-		return created;
-	}
-
-	public void setCreated(Date created) {
-		this.created = created;
-	}
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBeanImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBeanImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBeanImpl.java
deleted file mode 100644
index e420a90..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBeanImpl.java
+++ /dev/null
@@ -1,143 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.Date;
-import java.util.TimeZone;
-
-import javax.management.StandardMBean;
-
-import com.cloud.utils.DateUtil;
-
-public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
-	private AsyncJob _job;
-	
-	public AsyncJobMBeanImpl(AsyncJob job) {
-		super(AsyncJobMBean.class, false);
-		
-		_job = job;
-	}
-	
-	public long getAccountId() {
-		return _job.getAccountId();
-	}
-	
-	public long getUserId() {
-		return _job.getUserId();
-	}
-	
-	public String getCmd() {
-		return _job.getCmd();
-	}
-	
-	public String getCmdInfo() {
-		return _job.getCmdInfo();
-	}
-	
-	public String getStatus() {
-		int jobStatus = _job.getStatus();
-		switch(jobStatus) {
-		case AsyncJobConstants.STATUS_SUCCEEDED :
-			return "Completed";
-		
-		case AsyncJobConstants.STATUS_IN_PROGRESS:
-			return "In preogress";
-			
-		case AsyncJobConstants.STATUS_FAILED:
-			return "failed";
-		}
-		
-		return "Unknow";
-	}
-	
-	public int getProcessStatus() {
-		return _job.getProcessStatus();
-	}
-	
-	public int getResultCode() {
-		return _job.getResultCode();
-	}
-	
-	public String getResult() {
-		return _job.getResult();
-	}
-	
-	public String getInstanceType() {
-		if(_job.getInstanceType() != null)
-			return _job.getInstanceType().toString();
-		return "N/A";
-	}
-	
-	public String getInstanceId() {
-		if(_job.getInstanceId() != null)
-			return String.valueOf(_job.getInstanceId());
-		return "N/A";
-	}
-	
-	public String getInitMsid() {
-		if(_job.getInitMsid() != null) {
-			return String.valueOf(_job.getInitMsid());
-		}
-		return "N/A";
-	}
-	
-	public String getCreateTime() {
-		Date time = _job.getCreated();
-		if(time != null)
-			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
-		return "N/A";
-	}
-	
-	public String getLastUpdateTime() {
-		Date time = _job.getLastUpdated();
-		if(time != null)
-			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
-		return "N/A";
-	}
-	
-	public String getLastPollTime() {
-		Date time = _job.getLastPolled();
-	
-		if(time != null)
-			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
-		return "N/A";
-	}
-	
-	public String getSyncQueueId() {
-		SyncQueueItem item = _job.getSyncSource();
-		if(item != null && item.getQueueId() != null) {
-			return String.valueOf(item.getQueueId());
-		}
-		return "N/A";
-	}
-	
-	public String getSyncQueueContentType() {
-		SyncQueueItem item = _job.getSyncSource();
-		if(item != null) {
-			return item.getContentType();
-		}
-		return "N/A";
-	}
-	
-	public String getSyncQueueContentId() {
-		SyncQueueItem item = _job.getSyncSource();
-		if(item != null && item.getContentId() != null) {
-			return String.valueOf(item.getContentId());
-		}
-		return "N/A";
-	}
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
index 86e48ed..66c56e2 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
@@ -19,6 +19,7 @@ package org.apache.cloudstack.framework.jobs;
 import java.util.List;
 
 import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 
 import com.cloud.utils.Predicate;
 import com.cloud.utils.component.Manager;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMonitor.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMonitor.java
deleted file mode 100644
index ed2c943..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMonitor.java
+++ /dev/null
@@ -1,179 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.inject.Inject;
-import javax.naming.ConfigurationException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cloudstack.framework.messagebus.MessageBus;
-import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
-import org.apache.cloudstack.framework.messagebus.MessageHandler;
-
-import com.cloud.utils.component.ManagerBase;
-
-public class AsyncJobMonitor extends ManagerBase {
-    public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class);
-    
-    @Inject private MessageBus _messageBus;
-	
-	private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
-	private final Timer _timer = new Timer();
-	
-	private volatile int _activePoolThreads = 0;
-	private volatile int _activeInplaceThreads = 0;
-	
-	// configuration
-	private long _inactivityCheckIntervalMs = 60000;
-	private long _inactivityWarningThresholdMs = 90000;
-	
-	public AsyncJobMonitor() {
-	}
-	
-	public long getInactivityCheckIntervalMs() {
-		return _inactivityCheckIntervalMs;
-	}
-	
-	public void setInactivityCheckIntervalMs(long intervalMs) {
-		_inactivityCheckIntervalMs = intervalMs;
-	}
-	
-	public long getInactivityWarningThresholdMs() {
-		return _inactivityWarningThresholdMs;
-	}
-	
-	public void setInactivityWarningThresholdMs(long thresholdMs) {
-		_inactivityWarningThresholdMs = thresholdMs;
-	}
-	
-    @MessageHandler(topic = AsyncJob.Topics.JOB_HEARTBEAT)
-	public void onJobHeartbeatNotify(String subject, String senderAddress, Object args) {
-		if(args != null && args instanceof Long) {
-			synchronized(this) {
-				ActiveTaskRecord record = _activeTasks.get(args);
-				if(record != null) {
-					record.updateJobHeartbeatTick();
-				}
-			}
-		}
-	}
-	
-	private void heartbeat() {
-		synchronized(this) {
-			for(Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) {
-				if(entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) {
-					s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for "
-						+ entry.getValue().millisSinceLastJobHeartbeat()/1000 + " seconds");
-				}
-			}
-		}
-	}
-	
-	@Override
-	public boolean configure(String name, Map<String, Object> params)
-			throws ConfigurationException {
-		
-        _messageBus.subscribe(AsyncJob.Topics.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this));
-		_timer.scheduleAtFixedRate(new TimerTask() {
-
-			@Override
-			public void run() {
-				heartbeat();
-			}
-			
-		}, _inactivityCheckIntervalMs, _inactivityCheckIntervalMs);
-		return true;
-	}
-	
-	public void registerActiveTask(long jobId) {
-		synchronized(this) {
-			assert(_activeTasks.get(jobId) == null);
-			
-			long threadId = Thread.currentThread().getId();
-			boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX);
-			ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread);
-			_activeTasks.put(jobId, record);
-			if(fromPoolThread)
-				_activePoolThreads++;
-			else
-				_activeInplaceThreads++;
-		}
-	}
-	
-	public void unregisterActiveTask(long jobId) {
-		synchronized(this) {
-			ActiveTaskRecord record = _activeTasks.get(jobId);
-			assert(record != null);
-			if(record != null) {
-				if(record.isPoolThread())
-					_activePoolThreads--;
-				else
-					_activeInplaceThreads--;
-				
-				_activeTasks.remove(jobId);
-			}
-		}
-	}
-	
-	public int getActivePoolThreads() {
-		return _activePoolThreads;
-	}
-	
-	public int getActiveInplaceThread() {
-		return _activeInplaceThreads;
-	}
-	
-	private static class ActiveTaskRecord {
-		long _jobId;
-		long _threadId;
-		boolean _fromPoolThread;
-		long _jobLastHeartbeatTick;
-		
-		public ActiveTaskRecord(long jobId, long threadId, boolean fromPoolThread) {
-			_threadId = threadId;
-			_jobId = jobId;
-			_fromPoolThread = fromPoolThread;
-			_jobLastHeartbeatTick = System.currentTimeMillis();
-		}
-		
-		public long getThreadId() {
-			return _threadId;
-		}
-		
-		public long getJobId() {
-			return _jobId;
-		}
-		
-		public boolean isPoolThread() {
-			return _fromPoolThread;
-		}
-		
-		public void updateJobHeartbeatTick() {
-			_jobLastHeartbeatTick = System.currentTimeMillis();
-		}
-		
-		public long millisSinceLastJobHeartbeat() {
-			return System.currentTimeMillis() - _jobLastHeartbeatTick;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobVO.java
deleted file mode 100644
index 9c9001a..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobVO.java
+++ /dev/null
@@ -1,384 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.Date;
-import java.util.UUID;
-
-import javax.persistence.Column;
-import javax.persistence.DiscriminatorColumn;
-import javax.persistence.DiscriminatorType;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Inheritance;
-import javax.persistence.InheritanceType;
-import javax.persistence.Table;
-import javax.persistence.Temporal;
-import javax.persistence.TemporalType;
-import javax.persistence.Transient;
-
-import org.apache.cloudstack.jobs.Job;
-
-import com.cloud.utils.UuidUtils;
-import com.cloud.utils.db.GenericDao;
-
-@Entity
-@Table(name="async_job")
-@Inheritance(strategy=InheritanceType.JOINED)
-@DiscriminatorColumn(name="job_type", discriminatorType=DiscriminatorType.STRING, length=32)
-public class AsyncJobVO implements AsyncJob, Job {
-	
-	@Id
-    @GeneratedValue(strategy=GenerationType.IDENTITY)
-    @Column(name="id")
-    private Long id = null;
-	
-    @Column(name="job_type", length=32)
-    protected String type;
-    
-    @Column(name="job_dispatcher", length=64)
-    protected String dispatcher;
-    
-    @Column(name="job_pending_signals")
-    protected int pendingSignals;
-    
-    @Column(name="user_id")
-    private long userId;
-    
-    @Column(name="account_id")
-    private long accountId;
-    
-	@Column(name="job_cmd")
-    private String cmd;
-
-	@Column(name="job_cmd_ver")
-    private int cmdVersion;
-	
-    @Column(name="job_cmd_info", length=65535)
-    private String cmdInfo;
-  
-    @Column(name="job_status")
-    private int status;
-    
-    @Column(name="job_process_status")
-    private int processStatus;
-    
-    @Column(name="job_result_code")
-    private int resultCode;
-    
-    @Column(name="job_result", length=65535)
-    private String result;
-    
-    @Column(name="instance_type", length=64)
-    private String instanceType;
-    
-	@Column(name="instance_id", length=64)
-    private Long instanceId;
-    
-    @Column(name="job_init_msid")
-    private Long initMsid;
-
-    @Column(name="job_complete_msid")
-    private Long completeMsid;
-    
-    @Column(name="job_executing_msid")
-    private Long executingMsid;
-
-    @Column(name=GenericDao.CREATED_COLUMN)
-    private Date created;
-    
-    @Column(name="last_updated")
-    @Temporal(TemporalType.TIMESTAMP)
-    private Date lastUpdated;
-    
-    @Column(name="last_polled")
-    @Temporal(TemporalType.TIMESTAMP)
-    private Date lastPolled;
-    
-    @Column(name=GenericDao.REMOVED_COLUMN)
-    private Date removed;
-    
-    @Column(name="uuid")
-    private String uuid;
-
-    @Transient
-    private SyncQueueItem syncSource = null;
-
-    public AsyncJobVO() {
-        uuid = UUID.randomUUID().toString();
-    }
-
-    public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo, Long instanceId, String instanceType) {
-		this.userId = userId;
-		this.accountId = accountId;
-		this.cmd = cmd;
-		this.cmdInfo = cmdInfo;
-	    uuid = UUID.randomUUID().toString();
-	    this.instanceId = instanceId;
-	    this.instanceType = instanceType;
-    }
-
-    @Override
-    public long getId() {
-		return id;
-	}
-
-	public void setId(Long id) {
-		this.id = id;
-	}
-	
-    @Override
-    public String getShortUuid() {
-        return UuidUtils.first(uuid);
-    }
-
-	@Override
-	public String getType() {
-		return type;
-	}
-	
-	public void setType(String type) {
-		this.type = type;
-	}
-	
-	@Override
-	public String getDispatcher() {
-		return dispatcher;
-	}
-	
-	public void setDispatcher(String dispatcher) {
-		this.dispatcher = dispatcher;
-	}
-	
-	@Override
-	public int getPendingSignals() {
-		return pendingSignals;
-	}
-	
-	public void setPendingSignals(int signals) {
-		pendingSignals = signals;
-	}
-
-	@Override
-    public long getUserId() {
-		return userId;
-	}
-
-	public void setUserId(long userId) {
-		this.userId = userId;
-	}
-
-	@Override
-    public long getAccountId() {
-		return accountId;
-	}
-
-	public void setAccountId(long accountId) {
-		this.accountId = accountId;
-	}
-
-	@Override
-    public String getCmd() {
-		return cmd;
-	}
-
-	public void setCmd(String cmd) {
-		this.cmd = cmd;
-	}
-	
-	@Override
-    public int getCmdVersion() {
-		return cmdVersion;
-	}
-	
-	public void setCmdVersion(int version) {
-		cmdVersion = version;
-	}
-
-	@Override
-    public String getCmdInfo() {
-		return cmdInfo;
-	}
-
-	public void setCmdInfo(String cmdInfo) {
-		this.cmdInfo = cmdInfo;
-	}
-	
-	@Override
-    public int getStatus() {
-		return status;
-	}
-
-	public void setStatus(int status) {
-		this.status = status;
-	}
-	
-	@Override
-    public int getProcessStatus() {
-		return processStatus;
-	}
-	
-	public void setProcessStatus(int status) {
-		processStatus = status;
-	}
-	
-	@Override
-    public int getResultCode() {
-		return resultCode;
-	}
-	
-	public void setResultCode(int resultCode) {
-		this.resultCode = resultCode;
-	}
-
-	@Override
-    public String getResult() {
-		return result;
-	}
-
-	public void setResult(String result) {
-		this.result = result;
-	}
-
-	@Override
-    public Long getInitMsid() {
-		return initMsid;
-	}
-
-	@Override
-	public void setInitMsid(Long initMsid) {
-		this.initMsid = initMsid;
-	}
-	
-	@Override
-	public Long getExecutingMsid() {
-		return executingMsid;
-	}
-	
-	public void setExecutingMsid(Long executingMsid) {
-		this.executingMsid = executingMsid;
-	}
-
-	@Override
-    public Long getCompleteMsid() {
-		return completeMsid;
-	}
-
-	@Override
-	public void setCompleteMsid(Long completeMsid) {
-		this.completeMsid = completeMsid;
-	}
-
-	@Override
-    public Date getCreated() {
-		return created;
-	}
-
-	public void setCreated(Date created) {
-		this.created = created;
-	}
-
-	@Override
-    public Date getLastUpdated() {
-		return lastUpdated;
-	}
-
-	public void setLastUpdated(Date lastUpdated) {
-		this.lastUpdated = lastUpdated;
-	}
-
-	@Override
-    public Date getLastPolled() {
-		return lastPolled;
-	}
-
-	public void setLastPolled(Date lastPolled) {
-		this.lastPolled = lastPolled;
-	}
-
-	@Override
-    public Date getRemoved() {
-		return removed;
-	}
-
-	public void setRemoved(Date removed) {
-		this.removed = removed;
-	}
-	
-    @Override
-    public String getInstanceType() {
-		return instanceType;
-	}
-
-	public void setInstanceType(String instanceType) {
-		this.instanceType = instanceType;
-	}
-
-	@Override
-    public Long getInstanceId() {
-		return instanceId;
-	}
-
-	public void setInstanceId(Long instanceId) {
-		this.instanceId = instanceId;
-	}
-	
-	@Override
-    public SyncQueueItem getSyncSource() {
-        return syncSource;
-    }
-    
-	@Override
-    public void setSyncSource(SyncQueueItem syncSource) {
-        this.syncSource = syncSource;
-    }
-    
-    @Override
-    public String getUuid() {
-    	return uuid;
-    }
-    
-    public void setUuid(String uuid) {
-    	this.uuid = uuid;
-    }
-    
-	@Override
-    public String toString() {
-		StringBuffer sb = new StringBuffer();
-		sb.append("AsyncJobVO {id:").append(getId());
-		sb.append(", userId: ").append(getUserId());
-		sb.append(", accountId: ").append(getAccountId());
-		sb.append(", instanceType: ").append(getInstanceType());
-		sb.append(", instanceId: ").append(getInstanceId());
-		sb.append(", cmd: ").append(getCmd());
-		sb.append(", cmdInfo: ").append(getCmdInfo());
-		sb.append(", cmdVersion: ").append(getCmdVersion());
-		sb.append(", status: ").append(getStatus());
-		sb.append(", processStatus: ").append(getProcessStatus());
-		sb.append(", resultCode: ").append(getResultCode());
-		sb.append(", result: ").append(getResult());
-		sb.append(", initMsid: ").append(getInitMsid());
-		sb.append(", completeMsid: ").append(getCompleteMsid());
-		sb.append(", lastUpdated: ").append(getLastUpdated());
-		sb.append(", lastPolled: ").append(getLastPolled());
-		sb.append(", created: ").append(getCreated());
-		sb.append("}");
-		return sb.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItem.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItem.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItem.java
deleted file mode 100644
index 43aa0f6..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItem.java
+++ /dev/null
@@ -1,41 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-public interface SyncQueueItem {
-    public final String AsyncJobContentType = "AsyncJob";
-
-    /**
-     * @return queue item id
-     */
-    long getId();
-
-    /**
-     * @return queue id
-     */
-    Long getQueueId();
-
-    /**
-     * @return subject object type pointed by the queue item
-     */
-    String getContentType();
-    
-    /**
-     * @return subject object id pointed by the queue item
-     */
-    Long getContentId();
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItemVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItemVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItemVO.java
deleted file mode 100644
index 7a5d15f..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItemVO.java
+++ /dev/null
@@ -1,143 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import org.apache.cloudstack.api.InternalIdentity;
-
-
-import java.util.Date;
-
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-import javax.persistence.Temporal;
-import javax.persistence.TemporalType;
-
-@Entity
-@Table(name="sync_queue_item")
-public class SyncQueueItemVO implements SyncQueueItem, InternalIdentity {
-
-    @Id
-    @GeneratedValue(strategy=GenerationType.IDENTITY)
-    @Column(name="id")
-    private Long id = null;
-    
-    @Column(name="queue_id")
-    private Long queueId;
-    
-    @Column(name="content_type")
-    private String contentType;
-    
-    @Column(name="content_id")
-    private Long contentId;
-    
-    @Column(name="queue_proc_msid")
-    private Long lastProcessMsid;
-
-    @Column(name="queue_proc_number")
-    private Long lastProcessNumber;
-    
-    @Column(name="queue_proc_time")
-    @Temporal(TemporalType.TIMESTAMP)
-    private Date lastProcessTime;
-    
-    @Column(name="created")
-    private Date created;
-    
-    public long getId() {
-        return id;
-    }
-
-    public void setId(Long id) {
-        this.id = id;
-    }
-
-    @Override
-    public Long getQueueId() {
-        return queueId;
-    }
-
-    public void setQueueId(Long queueId) {
-        this.queueId = queueId;
-    }
-
-    @Override
-    public String getContentType() {
-        return contentType;
-    }
-
-    public void setContentType(String contentType) {
-        this.contentType = contentType;
-    }
-    
-    @Override
-    public Long getContentId() {
-        return contentId;
-    }
-
-    public void setContentId(Long contentId) {
-        this.contentId = contentId;
-    }
-
-    public Long getLastProcessMsid() {
-        return lastProcessMsid;
-    }
-
-    public void setLastProcessMsid(Long lastProcessMsid) {
-        this.lastProcessMsid = lastProcessMsid;
-    }
-
-    public Long getLastProcessNumber() {
-        return lastProcessNumber;
-    }
-
-    public void setLastProcessNumber(Long lastProcessNumber) {
-        this.lastProcessNumber = lastProcessNumber;
-    }
-
-    public Date getCreated() {
-        return created;
-    }
-
-    public void setCreated(Date created) {
-        this.created = created;
-    }
-
-    public String toString() {
-        StringBuffer sb = new StringBuffer();
-        sb.append("SyncQueueItemVO {id:").append(getId()).append(", queueId: ").append(getQueueId());
-        sb.append(", contentType: ").append(getContentType());
-        sb.append(", contentId: ").append(getContentId());
-        sb.append(", lastProcessMsid: ").append(getLastProcessMsid());
-        sb.append(", lastprocessNumber: ").append(getLastProcessNumber());
-        sb.append(", lastProcessTime: ").append(getLastProcessTime());
-        sb.append(", created: ").append(getCreated());
-        sb.append("}");
-        return sb.toString();
-    }
-
-       public Date getLastProcessTime() {
-            return lastProcessTime;
-        }
-
-        public void setLastProcessTime(Date lastProcessTime) {
-            this.lastProcessTime = lastProcessTime;
-        }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManager.java
deleted file mode 100644
index f0516c8..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManager.java
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.List;
-
-import com.cloud.utils.component.Manager;
-
-public interface SyncQueueManager extends Manager {
-    public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit);
-    public SyncQueueItemVO dequeueFromOne(long queueId, Long msid);
-    public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems);
-    public void purgeItem(long queueItemId);
-    public void returnItem(long queueItemId);
-
-	public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
-    public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
-
-    void purgeAsyncJobQueueItemId(long asyncJobId);
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java
deleted file mode 100644
index 7c57076..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java
+++ /dev/null
@@ -1,258 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.inject.Inject;
-import org.apache.log4j.Logger;
-
-import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
-import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
-
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.component.ManagerBase;
-import com.cloud.utils.db.DB;
-import com.cloud.utils.db.Transaction;
-import com.cloud.utils.exception.CloudRuntimeException;
-
-public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManager {
-    public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
-
-    @Inject private SyncQueueDao _syncQueueDao;
-    @Inject private SyncQueueItemDao _syncQueueItemDao;
-
-    @Override
-    @DB
-    public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) {
-        Transaction txn = Transaction.currentTxn();
-        try {
-            txn.start();
-
-            _syncQueueDao.ensureQueue(syncObjType, syncObjId);
-            SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
-            if(queueVO == null)
-                throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
-
-            queueVO.setQueueSizeLimit(queueSizeLimit);
-            _syncQueueDao.update(queueVO.getId(), queueVO);
-
-            Date dt = DateUtil.currentGMTTime();
-            SyncQueueItemVO item = new SyncQueueItemVO();
-            item.setQueueId(queueVO.getId());
-            item.setContentType(itemType);
-            item.setContentId(itemId);
-            item.setCreated(dt);
-
-            _syncQueueItemDao.persist(item);
-            txn.commit();
-
-            return queueVO;
-        } catch(Exception e) {
-            s_logger.error("Unexpected exception: ", e);
-            txn.rollback();
-        }
-        return null;
-    }
-
-    @Override
-    @DB
-    public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
-        Transaction txt = Transaction.currentTxn();
-        try {
-            txt.start();
-
-            SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
-            if(queueVO == null) {
-                s_logger.error("Sync queue(id: " + queueId + ") does not exist");
-                txt.commit();
-                return null;
-            }
-
-            if(queueReadyToProcess(queueVO)) {
-                SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
-                if(itemVO != null) {
-                    Long processNumber = queueVO.getLastProcessNumber();
-                    if(processNumber == null)
-                        processNumber = new Long(1);
-                    else
-                        processNumber = processNumber + 1;
-                    Date dt = DateUtil.currentGMTTime();
-                    queueVO.setLastProcessNumber(processNumber);
-                    queueVO.setLastUpdated(dt);
-                    queueVO.setQueueSize(queueVO.getQueueSize() + 1);
-                    _syncQueueDao.update(queueVO.getId(), queueVO);
-
-                    itemVO.setLastProcessMsid(msid);
-                    itemVO.setLastProcessNumber(processNumber);
-                    itemVO.setLastProcessTime(dt);
-                    _syncQueueItemDao.update(itemVO.getId(), itemVO);
-
-                    txt.commit();
-                    return itemVO;
-                } else {
-                    if(s_logger.isDebugEnabled())
-                        s_logger.debug("Sync queue (" + queueId + ") is currently empty");
-                }
-            } else {
-                if(s_logger.isDebugEnabled())
-                    s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
-            }
-            txt.commit();
-        } catch(Exception e) {
-            s_logger.error("Unexpected exception: ", e);
-            txt.rollback();
-        }
-
-        return null;
-    }
-
-    @Override
-    @DB
-    public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
-
-        List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
-        Transaction txt = Transaction.currentTxn();
-        try {
-            txt.start();
-
-            List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
-            if(l != null && l.size() > 0) {
-                for(SyncQueueItemVO item : l) {
-                    SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
-                    SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
-                    if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
-                        Long processNumber = queueVO.getLastProcessNumber();
-                        if(processNumber == null)
-                            processNumber = new Long(1);
-                        else
-                            processNumber = processNumber + 1;
-
-                        Date dt = DateUtil.currentGMTTime();
-                        queueVO.setLastProcessNumber(processNumber);
-                        queueVO.setLastUpdated(dt);
-                        queueVO.setQueueSize(queueVO.getQueueSize() + 1);
-                        _syncQueueDao.update(queueVO.getId(), queueVO);
-
-                        itemVO.setLastProcessMsid(msid);
-                        itemVO.setLastProcessNumber(processNumber);
-                        itemVO.setLastProcessTime(dt);
-                        _syncQueueItemDao.update(item.getId(), itemVO);
-
-                        resultList.add(item);
-                    }
-                }
-            }
-            txt.commit();
-            return resultList;
-        } catch(Exception e) {
-            s_logger.error("Unexpected exception: ", e);
-            txt.rollback();
-        }
-        return null;
-    }
-
-    @Override
-    @DB
-    public void purgeItem(long queueItemId) {
-        Transaction txt = Transaction.currentTxn();
-        try {
-            txt.start();
-
-            SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
-            if(itemVO != null) {
-                SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
-
-                _syncQueueItemDao.expunge(itemVO.getId());
-
-                // if item is active, reset queue information
-                if (itemVO.getLastProcessMsid() != null) {
-                    queueVO.setLastUpdated(DateUtil.currentGMTTime());
-                    // decrement the count
-                    assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
-                    queueVO.setQueueSize(queueVO.getQueueSize() - 1);
-                    _syncQueueDao.update(queueVO.getId(), queueVO);
-                }
-            }
-            txt.commit();
-        } catch(Exception e) {
-            s_logger.error("Unexpected exception: ", e);
-            txt.rollback();
-        }
-    }
-
-    @Override
-    @DB
-    public void returnItem(long queueItemId) {
-        Transaction txt = Transaction.currentTxn();
-        try {
-            txt.start();
-
-            SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
-            if(itemVO != null) {
-                SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
-
-                itemVO.setLastProcessMsid(null);
-                itemVO.setLastProcessNumber(null);
-                itemVO.setLastProcessTime(null);
-                _syncQueueItemDao.update(queueItemId, itemVO);
-
-                queueVO.setLastUpdated(DateUtil.currentGMTTime());
-                _syncQueueDao.update(queueVO.getId(), queueVO);
-            }
-            txt.commit();
-        } catch(Exception e) {
-            s_logger.error("Unexpected exception: ", e);
-            txt.rollback();
-        }
-    }
-
-    @Override
-    public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
-        return _syncQueueItemDao.getActiveQueueItems(msid, exclusive);
-    }
-
-    @Override
-    public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
-        return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
-    }
-
-    private boolean queueReadyToProcess(SyncQueueVO queueVO) {
-    	return true;
-    	
-    	//
-    	// TODO
-    	//
-    	// Need to disable concurrency disable at queue level due to the need to support
-    	// job wake-up dispatching task
-    	//
-    	// Concurrency control is better done at higher level and leave the job scheduling/serializing simpler
-    	//
-    	
-        // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
-    }
-
-    @Override
-    public void purgeAsyncJobQueueItemId(long asyncJobId) {
-        Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
-        if (itemId != null) {
-            purgeItem(itemId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java
deleted file mode 100644
index cad3462..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.cloudstack.framework.jobs;
-
-import org.apache.cloudstack.api.InternalIdentity;
-
-import java.util.Date;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-import javax.persistence.Temporal;
-import javax.persistence.TemporalType;
-
-@Entity
-@Table(name="sync_queue")
-public class SyncQueueVO implements InternalIdentity {
-
-    @Id
-    @GeneratedValue(strategy=GenerationType.IDENTITY)
-    @Column(name="id")
-    private Long id;
-
-    @Column(name="sync_objtype")
-
-    private String syncObjType;
-
-    @Column(name="sync_objid")
-    private Long syncObjId;
-
-    @Column(name="queue_proc_number")
-    private Long lastProcessNumber;
-
-    @Column(name="created")
-    @Temporal(TemporalType.TIMESTAMP)
-    private Date created;
-
-    @Column(name="last_updated")
-    @Temporal(TemporalType.TIMESTAMP)
-    private Date lastUpdated;
-
-    @Column(name="queue_size")
-    private long queueSize = 0;
-
-    @Column(name="queue_size_limit")
-    private long queueSizeLimit = 0;
-
-    public long getId() {
-        return id;
-    }
-
-    public String getSyncObjType() {
-        return syncObjType;
-    }
-
-    public void setSyncObjType(String syncObjType) {
-        this.syncObjType = syncObjType;
-    }
-
-    public Long getSyncObjId() {
-        return syncObjId;
-    }
-
-    public void setSyncObjId(Long syncObjId) {
-        this.syncObjId = syncObjId;
-    }
-    
-    public Long getLastProcessNumber() {
-        return lastProcessNumber;
-    }
-    
-    public void setLastProcessNumber(Long number) {
-        lastProcessNumber = number;
-    }
-
-    public Date getCreated() {
-        return created;
-    }
-
-    public void setCreated(Date created) {
-        this.created = created;
-    }
-
-    public Date getLastUpdated() {
-        return lastUpdated;
-    }
-
-    public void setLastUpdated(Date lastUpdated) {
-        this.lastUpdated = lastUpdated;
-    }
-
-    public String toString() {
-        StringBuffer sb = new StringBuffer();
-        sb.append("SyncQueueVO {id:").append(getId());
-        sb.append(", syncObjType: ").append(getSyncObjType());
-        sb.append(", syncObjId: ").append(getSyncObjId());
-        sb.append(", lastProcessNumber: ").append(getLastProcessNumber());
-        sb.append(", lastUpdated: ").append(getLastUpdated());
-        sb.append(", created: ").append(getCreated());
-        sb.append(", count: ").append(getQueueSize());
-        sb.append("}");
-        return sb.toString();
-    }
-
-    public long getQueueSize() {
-        return queueSize;
-    }
-
-    public void setQueueSize(long queueSize) {
-        this.queueSize = queueSize;
-    }
-
-    public long getQueueSizeLimit() {
-        return queueSizeLimit;
-    }
-
-    public void setQueueSizeLimit(long queueSizeLimit) {
-        this.queueSizeLimit = queueSizeLimit;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
index 4c442c8..cfcd173 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
@@ -19,7 +19,7 @@ package org.apache.cloudstack.framework.jobs.dao;
 import java.util.Date;
 import java.util.List;
 
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 
 import com.cloud.utils.db.GenericDao;
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
index 276323c..c30dbde 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.apache.log4j.Logger;
 
 import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 
 import com.cloud.utils.db.DB;
 import com.cloud.utils.db.Filter;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
index acf1324..a9e82a7 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
@@ -18,7 +18,7 @@ package org.apache.cloudstack.framework.jobs.dao;
 
 import java.util.List;
 
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
 
 import com.cloud.utils.db.GenericDao;
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
index 0b41953..4cc2218 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -27,7 +27,7 @@ import java.util.TimeZone;
 import org.apache.log4j.Logger;
 
 import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
 
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.db.GenericDaoBase;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
index e8d8287..fb6a242 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
@@ -18,7 +18,7 @@ package org.apache.cloudstack.framework.jobs.dao;
 
 import java.util.List;
 
-import org.apache.cloudstack.framework.jobs.AsyncJobJournalVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
 
 import com.cloud.utils.db.GenericDao;
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
index 9d02307..d26e6ed 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
@@ -18,7 +18,7 @@ package org.apache.cloudstack.framework.jobs.dao;
 
 import java.util.List;
 
-import org.apache.cloudstack.framework.jobs.AsyncJobJournalVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
 
 import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.SearchBuilder;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java
index f45245a..fa617ad 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java
@@ -16,7 +16,7 @@
 // under the License.
 package org.apache.cloudstack.framework.jobs.dao;
 
-import org.apache.cloudstack.framework.jobs.SyncQueueVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
 
 import com.cloud.utils.db.GenericDao;
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
index 05e2a73..f7d9d72 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
@@ -24,7 +24,7 @@ import java.util.TimeZone;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cloudstack.framework.jobs.SyncQueueVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
 
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.db.GenericDaoBase;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
index b78ccf7..61670bf 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
@@ -18,7 +18,7 @@ package org.apache.cloudstack.framework.jobs.dao;
 
 import java.util.List;
 
-import org.apache.cloudstack.framework.jobs.SyncQueueItemVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
 
 import com.cloud.utils.db.GenericDao;
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
index 08b777c..ccb7f10 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
@@ -27,7 +27,7 @@ import java.util.TimeZone;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cloudstack.framework.jobs.SyncQueueItemVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
 
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.db.DB;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
new file mode 100644
index 0000000..0bcdc3b
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.db.GenericDao;
+
+@Entity
+@Table(name="async_job_join_map")
+public class AsyncJobJoinMapVO {
+	@Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id = null;
+
+    @Column(name="job_id")
+	private long jobId;
+    
+    @Column(name="join_job_id")
+	private long joinJobId;
+
+    @Column(name="join_status")
+    private int joinStatus;
+    
+    @Column(name="join_result", length=1024)
+    private String joinResult;
+
+    @Column(name="join_msid")
+    private long joinMsid;
+    
+    @Column(name="complete_msid")
+    private Long completeMsid;
+    
+    @Column(name="sync_source_id")
+    private Long syncSourceId;
+    
+    @Column(name="wakeup_handler")
+    private String wakeupHandler;
+
+    @Column(name="wakeup_dispatcher")
+    private String wakeupDispatcher;
+
+    @Column(name="wakeup_interval")
+    private long wakeupInterval;
+    
+    @Column(name=GenericDao.CREATED_COLUMN)
+    private Date created;
+    
+    @Column(name="last_updated")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastUpdated;
+
+    @Column(name="next_wakeup")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date nextWakeupTime;
+    
+    @Column(name="expiration")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date expiration;
+
+    public AsyncJobJoinMapVO() {
+    	created = DateUtil.currentGMTTime();
+    	lastUpdated = DateUtil.currentGMTTime();
+    }
+    
+	public Long getId() {
+		return id;
+	}
+
+	public void setId(Long id) {
+		this.id = id;
+	}
+
+	public long getJobId() {
+		return jobId;
+	}
+
+	public void setJobId(long jobId) {
+		this.jobId = jobId;
+	}
+
+	public long getJoinJobId() {
+		return joinJobId;
+	}
+
+	public void setJoinJobId(long joinJobId) {
+		this.joinJobId = joinJobId;
+	}
+
+	public int getJoinStatus() {
+		return joinStatus;
+	}
+
+	public void setJoinStatus(int joinStatus) {
+		this.joinStatus = joinStatus;
+	}
+
+	public String getJoinResult() {
+		return joinResult;
+	}
+
+	public void setJoinResult(String joinResult) {
+		this.joinResult = joinResult;
+	}
+
+	public long getJoinMsid() {
+		return joinMsid;
+	}
+
+	public void setJoinMsid(long joinMsid) {
+		this.joinMsid = joinMsid;
+	}
+
+	public Long getCompleteMsid() {
+		return completeMsid;
+	}
+
+	public void setCompleteMsid(Long completeMsid) {
+		this.completeMsid = completeMsid;
+	}
+
+	public Date getCreated() {
+		return created;
+	}
+
+	public void setCreated(Date created) {
+		this.created = created;
+	}
+
+	public Date getLastUpdated() {
+		return lastUpdated;
+	}
+
+	public void setLastUpdated(Date lastUpdated) {
+		this.lastUpdated = lastUpdated;
+	}
+
+	public Long getSyncSourceId() {
+		return syncSourceId;
+	}
+
+	public void setSyncSourceId(Long syncSourceId) {
+		this.syncSourceId = syncSourceId;
+	}
+
+	public String getWakeupHandler() {
+		return wakeupHandler;
+	}
+
+	public void setWakeupHandler(String wakeupHandler) {
+		this.wakeupHandler = wakeupHandler;
+	}
+
+	public String getWakeupDispatcher() {
+		return wakeupDispatcher;
+	}
+
+	public void setWakeupDispatcher(String wakeupDispatcher) {
+		this.wakeupDispatcher = wakeupDispatcher;
+	}
+
+	public long getWakeupInterval() {
+		return wakeupInterval;
+	}
+
+	public void setWakeupInterval(long wakeupInterval) {
+		this.wakeupInterval = wakeupInterval;
+	}
+
+	public Date getNextWakeupTime() {
+		return nextWakeupTime;
+	}
+
+	public void setNextWakeupTime(Date nextWakeupTime) {
+		this.nextWakeupTime = nextWakeupTime;
+	}
+
+	public Date getExpiration() {
+		return expiration;
+	}
+
+	public void setExpiration(Date expiration) {
+		this.expiration = expiration;
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJournalVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJournalVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJournalVO.java
new file mode 100644
index 0000000..b78a7e0
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJournalVO.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJob.JournalType;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.db.GenericDao;
+
+@Entity
+@Table(name="async_job_journal")
+public class AsyncJobJournalVO {
+	@Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id = null;
+
+    @Column(name="job_id")
+	private long jobId;
+    
+    @Column(name="journal_type", updatable=false, nullable=false, length=32)
+    @Enumerated(value=EnumType.STRING)
+    private AsyncJob.JournalType journalType;
+    
+    @Column(name="journal_text", length=1024)
+    private String journalText;
+
+    @Column(name="journal_obj", length=1024)
+    private String journalObjJsonString;
+    
+    @Column(name=GenericDao.CREATED_COLUMN)
+    protected Date created;
+
+    public AsyncJobJournalVO() {
+    	created = DateUtil.currentGMTTime();
+    }
+    
+	public Long getId() {
+		return id;
+	}
+
+	public void setId(Long id) {
+		this.id = id;
+	}
+
+	public long getJobId() {
+		return jobId;
+	}
+
+	public void setJobId(long jobId) {
+		this.jobId = jobId;
+	}
+
+	public AsyncJob.JournalType getJournalType() {
+		return journalType;
+	}
+
+	public void setJournalType(AsyncJob.JournalType journalType) {
+		this.journalType = journalType;
+	}
+
+	public String getJournalText() {
+		return journalText;
+	}
+
+	public void setJournalText(String journalText) {
+		this.journalText = journalText;
+	}
+
+	public String getJournalObjJsonString() {
+		return journalObjJsonString;
+	}
+
+	public void setJournalObjJsonString(String journalObjJsonString) {
+		this.journalObjJsonString = journalObjJsonString;
+	}
+
+	public Date getCreated() {
+		return created;
+	}
+
+	public void setCreated(Date created) {
+		this.created = created;
+	}
+}