You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/06/05 00:29:54 UTC
[1/3] further refactored jobs
Updated Branches:
refs/heads/vmsync 51f533e97 -> bd0c239f6
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
new file mode 100644
index 0000000..95f01e5
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.Date;
+import java.util.TimeZone;
+
+import javax.management.StandardMBean;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.framework.jobs.AsyncJobMBean;
+
+import com.cloud.utils.DateUtil;
+
+public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
+ private AsyncJob _job;
+
+ public AsyncJobMBeanImpl(AsyncJob job) {
+ super(AsyncJobMBean.class, false);
+
+ _job = job;
+ }
+
+ public long getAccountId() {
+ return _job.getAccountId();
+ }
+
+ public long getUserId() {
+ return _job.getUserId();
+ }
+
+ public String getCmd() {
+ return _job.getCmd();
+ }
+
+ public String getCmdInfo() {
+ return _job.getCmdInfo();
+ }
+
+ public String getStatus() {
+ int jobStatus = _job.getStatus();
+ switch(jobStatus) {
+ case AsyncJobConstants.STATUS_SUCCEEDED :
+ return "Completed";
+
+ case AsyncJobConstants.STATUS_IN_PROGRESS:
+ return "In preogress";
+
+ case AsyncJobConstants.STATUS_FAILED:
+ return "failed";
+ }
+
+ return "Unknow";
+ }
+
+ public int getProcessStatus() {
+ return _job.getProcessStatus();
+ }
+
+ public int getResultCode() {
+ return _job.getResultCode();
+ }
+
+ public String getResult() {
+ return _job.getResult();
+ }
+
+ public String getInstanceType() {
+ if(_job.getInstanceType() != null)
+ return _job.getInstanceType().toString();
+ return "N/A";
+ }
+
+ public String getInstanceId() {
+ if(_job.getInstanceId() != null)
+ return String.valueOf(_job.getInstanceId());
+ return "N/A";
+ }
+
+ public String getInitMsid() {
+ if(_job.getInitMsid() != null) {
+ return String.valueOf(_job.getInitMsid());
+ }
+ return "N/A";
+ }
+
+ public String getCreateTime() {
+ Date time = _job.getCreated();
+ if(time != null)
+ return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
+ return "N/A";
+ }
+
+ public String getLastUpdateTime() {
+ Date time = _job.getLastUpdated();
+ if(time != null)
+ return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
+ return "N/A";
+ }
+
+ public String getLastPollTime() {
+ Date time = _job.getLastPolled();
+
+ if(time != null)
+ return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
+ return "N/A";
+ }
+
+ public String getSyncQueueId() {
+ SyncQueueItem item = _job.getSyncSource();
+ if(item != null && item.getQueueId() != null) {
+ return String.valueOf(item.getQueueId());
+ }
+ return "N/A";
+ }
+
+ public String getSyncQueueContentType() {
+ SyncQueueItem item = _job.getSyncSource();
+ if(item != null) {
+ return item.getContentType();
+ }
+ return "N/A";
+ }
+
+ public String getSyncQueueContentId() {
+ SyncQueueItem item = _job.getSyncSource();
+ if(item != null && item.getContentId() != null) {
+ return String.valueOf(item.getContentId());
+ }
+ return "N/A";
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
new file mode 100644
index 0000000..7a11195
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.framework.jobs.AsyncJob.Topics;
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
+import org.apache.cloudstack.framework.messagebus.MessageHandler;
+
+import com.cloud.utils.component.ManagerBase;
+
+public class AsyncJobMonitor extends ManagerBase {
+ public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class);
+
+ @Inject private MessageBus _messageBus;
+
+ private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
+ private final Timer _timer = new Timer();
+
+ private volatile int _activePoolThreads = 0;
+ private volatile int _activeInplaceThreads = 0;
+
+ // configuration
+ private long _inactivityCheckIntervalMs = 60000;
+ private long _inactivityWarningThresholdMs = 90000;
+
+ public AsyncJobMonitor() {
+ }
+
+ public long getInactivityCheckIntervalMs() {
+ return _inactivityCheckIntervalMs;
+ }
+
+ public void setInactivityCheckIntervalMs(long intervalMs) {
+ _inactivityCheckIntervalMs = intervalMs;
+ }
+
+ public long getInactivityWarningThresholdMs() {
+ return _inactivityWarningThresholdMs;
+ }
+
+ public void setInactivityWarningThresholdMs(long thresholdMs) {
+ _inactivityWarningThresholdMs = thresholdMs;
+ }
+
+ @MessageHandler(topic = AsyncJob.Topics.JOB_HEARTBEAT)
+ public void onJobHeartbeatNotify(String subject, String senderAddress, Object args) {
+ if(args != null && args instanceof Long) {
+ synchronized(this) {
+ ActiveTaskRecord record = _activeTasks.get(args);
+ if(record != null) {
+ record.updateJobHeartbeatTick();
+ }
+ }
+ }
+ }
+
+ private void heartbeat() {
+ synchronized(this) {
+ for(Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) {
+ if(entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) {
+ s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for "
+ + entry.getValue().millisSinceLastJobHeartbeat()/1000 + " seconds");
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean configure(String name, Map<String, Object> params)
+ throws ConfigurationException {
+
+ _messageBus.subscribe(AsyncJob.Topics.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this));
+ _timer.scheduleAtFixedRate(new TimerTask() {
+
+ @Override
+ public void run() {
+ heartbeat();
+ }
+
+ }, _inactivityCheckIntervalMs, _inactivityCheckIntervalMs);
+ return true;
+ }
+
+ public void registerActiveTask(long jobId) {
+ synchronized(this) {
+ assert(_activeTasks.get(jobId) == null);
+
+ long threadId = Thread.currentThread().getId();
+ boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX);
+ ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread);
+ _activeTasks.put(jobId, record);
+ if(fromPoolThread)
+ _activePoolThreads++;
+ else
+ _activeInplaceThreads++;
+ }
+ }
+
+ public void unregisterActiveTask(long jobId) {
+ synchronized(this) {
+ ActiveTaskRecord record = _activeTasks.get(jobId);
+ assert(record != null);
+ if(record != null) {
+ if(record.isPoolThread())
+ _activePoolThreads--;
+ else
+ _activeInplaceThreads--;
+
+ _activeTasks.remove(jobId);
+ }
+ }
+ }
+
+ public int getActivePoolThreads() {
+ return _activePoolThreads;
+ }
+
+ public int getActiveInplaceThread() {
+ return _activeInplaceThreads;
+ }
+
+ private static class ActiveTaskRecord {
+ long _jobId;
+ long _threadId;
+ boolean _fromPoolThread;
+ long _jobLastHeartbeatTick;
+
+ public ActiveTaskRecord(long jobId, long threadId, boolean fromPoolThread) {
+ _threadId = threadId;
+ _jobId = jobId;
+ _fromPoolThread = fromPoolThread;
+ _jobLastHeartbeatTick = System.currentTimeMillis();
+ }
+
+ public long getThreadId() {
+ return _threadId;
+ }
+
+ public long getJobId() {
+ return _jobId;
+ }
+
+ public boolean isPoolThread() {
+ return _fromPoolThread;
+ }
+
+ public void updateJobHeartbeatTick() {
+ _jobLastHeartbeatTick = System.currentTimeMillis();
+ }
+
+ public long millisSinceLastJobHeartbeat() {
+ return System.currentTimeMillis() - _jobLastHeartbeatTick;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
new file mode 100644
index 0000000..f2ea4ac
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.Date;
+import java.util.UUID;
+
+import javax.persistence.Column;
+import javax.persistence.DiscriminatorColumn;
+import javax.persistence.DiscriminatorType;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Inheritance;
+import javax.persistence.InheritanceType;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+import javax.persistence.Transient;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.jobs.Job;
+
+import com.cloud.utils.UuidUtils;
+import com.cloud.utils.db.GenericDao;
+
+@Entity
+@Table(name="async_job")
+@Inheritance(strategy=InheritanceType.JOINED)
+@DiscriminatorColumn(name="job_type", discriminatorType=DiscriminatorType.STRING, length=32)
+public class AsyncJobVO implements AsyncJob, Job {
+
+ @Id
+ @GeneratedValue(strategy=GenerationType.IDENTITY)
+ @Column(name="id")
+ private Long id = null;
+
+ @Column(name="job_type", length=32)
+ protected String type;
+
+ @Column(name="job_dispatcher", length=64)
+ protected String dispatcher;
+
+ @Column(name="job_pending_signals")
+ protected int pendingSignals;
+
+ @Column(name="user_id")
+ private long userId;
+
+ @Column(name="account_id")
+ private long accountId;
+
+ @Column(name="job_cmd")
+ private String cmd;
+
+ @Column(name="job_cmd_ver")
+ private int cmdVersion;
+
+ @Column(name="job_cmd_info", length=65535)
+ private String cmdInfo;
+
+ @Column(name="job_status")
+ private int status;
+
+ @Column(name="job_process_status")
+ private int processStatus;
+
+ @Column(name="job_result_code")
+ private int resultCode;
+
+ @Column(name="job_result", length=65535)
+ private String result;
+
+ @Column(name="instance_type", length=64)
+ private String instanceType;
+
+ @Column(name="instance_id", length=64)
+ private Long instanceId;
+
+ @Column(name="job_init_msid")
+ private Long initMsid;
+
+ @Column(name="job_complete_msid")
+ private Long completeMsid;
+
+ @Column(name="job_executing_msid")
+ private Long executingMsid;
+
+ @Column(name=GenericDao.CREATED_COLUMN)
+ private Date created;
+
+ @Column(name="last_updated")
+ @Temporal(TemporalType.TIMESTAMP)
+ private Date lastUpdated;
+
+ @Column(name="last_polled")
+ @Temporal(TemporalType.TIMESTAMP)
+ private Date lastPolled;
+
+ @Column(name=GenericDao.REMOVED_COLUMN)
+ private Date removed;
+
+ @Column(name="uuid")
+ private String uuid;
+
+ @Transient
+ private SyncQueueItem syncSource = null;
+
+ public AsyncJobVO() {
+ uuid = UUID.randomUUID().toString();
+ }
+
+ public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo, Long instanceId, String instanceType) {
+ this.userId = userId;
+ this.accountId = accountId;
+ this.cmd = cmd;
+ this.cmdInfo = cmdInfo;
+ uuid = UUID.randomUUID().toString();
+ this.instanceId = instanceId;
+ this.instanceType = instanceType;
+ }
+
+ @Override
+ public long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getShortUuid() {
+ return UuidUtils.first(uuid);
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ @Override
+ public String getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(String dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public int getPendingSignals() {
+ return pendingSignals;
+ }
+
+ public void setPendingSignals(int signals) {
+ pendingSignals = signals;
+ }
+
+ @Override
+ public long getUserId() {
+ return userId;
+ }
+
+ public void setUserId(long userId) {
+ this.userId = userId;
+ }
+
+ @Override
+ public long getAccountId() {
+ return accountId;
+ }
+
+ public void setAccountId(long accountId) {
+ this.accountId = accountId;
+ }
+
+ @Override
+ public String getCmd() {
+ return cmd;
+ }
+
+ public void setCmd(String cmd) {
+ this.cmd = cmd;
+ }
+
+ @Override
+ public int getCmdVersion() {
+ return cmdVersion;
+ }
+
+ public void setCmdVersion(int version) {
+ cmdVersion = version;
+ }
+
+ @Override
+ public String getCmdInfo() {
+ return cmdInfo;
+ }
+
+ public void setCmdInfo(String cmdInfo) {
+ this.cmdInfo = cmdInfo;
+ }
+
+ @Override
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ @Override
+ public int getProcessStatus() {
+ return processStatus;
+ }
+
+ public void setProcessStatus(int status) {
+ processStatus = status;
+ }
+
+ @Override
+ public int getResultCode() {
+ return resultCode;
+ }
+
+ public void setResultCode(int resultCode) {
+ this.resultCode = resultCode;
+ }
+
+ @Override
+ public String getResult() {
+ return result;
+ }
+
+ public void setResult(String result) {
+ this.result = result;
+ }
+
+ @Override
+ public Long getInitMsid() {
+ return initMsid;
+ }
+
+ @Override
+ public void setInitMsid(Long initMsid) {
+ this.initMsid = initMsid;
+ }
+
+ @Override
+ public Long getExecutingMsid() {
+ return executingMsid;
+ }
+
+ public void setExecutingMsid(Long executingMsid) {
+ this.executingMsid = executingMsid;
+ }
+
+ @Override
+ public Long getCompleteMsid() {
+ return completeMsid;
+ }
+
+ @Override
+ public void setCompleteMsid(Long completeMsid) {
+ this.completeMsid = completeMsid;
+ }
+
+ @Override
+ public Date getCreated() {
+ return created;
+ }
+
+ public void setCreated(Date created) {
+ this.created = created;
+ }
+
+ @Override
+ public Date getLastUpdated() {
+ return lastUpdated;
+ }
+
+ public void setLastUpdated(Date lastUpdated) {
+ this.lastUpdated = lastUpdated;
+ }
+
+ @Override
+ public Date getLastPolled() {
+ return lastPolled;
+ }
+
+ public void setLastPolled(Date lastPolled) {
+ this.lastPolled = lastPolled;
+ }
+
+ @Override
+ public Date getRemoved() {
+ return removed;
+ }
+
+ public void setRemoved(Date removed) {
+ this.removed = removed;
+ }
+
+ @Override
+ public String getInstanceType() {
+ return instanceType;
+ }
+
+ public void setInstanceType(String instanceType) {
+ this.instanceType = instanceType;
+ }
+
+ @Override
+ public Long getInstanceId() {
+ return instanceId;
+ }
+
+ public void setInstanceId(Long instanceId) {
+ this.instanceId = instanceId;
+ }
+
+ @Override
+ public SyncQueueItem getSyncSource() {
+ return syncSource;
+ }
+
+ @Override
+ public void setSyncSource(SyncQueueItem syncSource) {
+ this.syncSource = syncSource;
+ }
+
+ @Override
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("AsyncJobVO {id:").append(getId());
+ sb.append(", userId: ").append(getUserId());
+ sb.append(", accountId: ").append(getAccountId());
+ sb.append(", instanceType: ").append(getInstanceType());
+ sb.append(", instanceId: ").append(getInstanceId());
+ sb.append(", cmd: ").append(getCmd());
+ sb.append(", cmdInfo: ").append(getCmdInfo());
+ sb.append(", cmdVersion: ").append(getCmdVersion());
+ sb.append(", status: ").append(getStatus());
+ sb.append(", processStatus: ").append(getProcessStatus());
+ sb.append(", resultCode: ").append(getResultCode());
+ sb.append(", result: ").append(getResult());
+ sb.append(", initMsid: ").append(getInitMsid());
+ sb.append(", completeMsid: ").append(getCompleteMsid());
+ sb.append(", lastUpdated: ").append(getLastUpdated());
+ sb.append(", lastPolled: ").append(getLastPolled());
+ sb.append(", created: ").append(getCreated());
+ sb.append("}");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
new file mode 100644
index 0000000..17dd1cc
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.log4j.Logger;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import com.cloud.utils.exception.CloudRuntimeException;
+
+/**
+ * Note: toPairList and appendPairList only support simple POJO objects currently
+ */
+public class JobSerializerHelper {
+ private static final Logger s_logger = Logger.getLogger(JobSerializerHelper.class);
+ public static String token = "/";
+
+ private static Gson s_gson;
+ static {
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ gsonBuilder.setVersion(1.5);
+ s_logger.debug("Job GSON Builder initialized.");
+ s_gson = gsonBuilder.create();
+ }
+
+ public static String toSerializedString(Object result) {
+ if(result != null) {
+ Class<?> clz = result.getClass();
+ return clz.getName() + token + s_gson.toJson(result);
+ }
+ return null;
+ }
+
+ public static Object fromSerializedString(String result) {
+ try {
+ if(result != null && !result.isEmpty()) {
+
+ String[] serializedParts = result.split(token);
+
+ if (serializedParts.length < 2) {
+ return null;
+ }
+ String clzName = serializedParts[0];
+ String nameField = null;
+ String content = null;
+ if (serializedParts.length == 2) {
+ content = serializedParts[1];
+ } else {
+ nameField = serializedParts[1];
+ int index = result.indexOf(token + nameField + token);
+ content = result.substring(index + nameField.length() + 2);
+ }
+
+ Class<?> clz;
+ try {
+ clz = Class.forName(clzName);
+ } catch (ClassNotFoundException e) {
+ return null;
+ }
+
+ Object obj = s_gson.fromJson(content, clz);
+ return obj;
+ }
+ return null;
+ } catch(RuntimeException e) {
+ throw new CloudRuntimeException("Unable to deserialize: " + result, e);
+ }
+ }
+
+ public static String toObjectSerializedString(Serializable object) {
+ assert(object != null);
+
+ ByteArrayOutputStream bs = new ByteArrayOutputStream();
+ try {
+ ObjectOutputStream os = new ObjectOutputStream(bs);
+ os.writeObject(object);
+ os.close();
+ bs.close();
+
+ return Base64.encodeBase64URLSafeString(bs.toByteArray());
+ } catch(IOException e) {
+ throw new CloudRuntimeException("Unable to serialize: " + object, e);
+ }
+ }
+
+ public static Object fromObjectSerializedString(String base64EncodedString) {
+ if(base64EncodedString == null)
+ return null;
+
+ byte[] content = Base64.decodeBase64(base64EncodedString);
+ ByteArrayInputStream bs = new ByteArrayInputStream(content);
+ try {
+ ObjectInputStream is = new ObjectInputStream(bs);
+ Object obj = is.readObject();
+ is.close();
+ bs.close();
+ return obj;
+ } catch(IOException e) {
+ throw new CloudRuntimeException("Unable to serialize: " + base64EncodedString, e);
+ } catch (ClassNotFoundException e) {
+ throw new CloudRuntimeException("Unable to serialize: " + base64EncodedString, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
new file mode 100644
index 0000000..04519e7
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+public interface SyncQueueItem {
+ public final String AsyncJobContentType = "AsyncJob";
+
+ /**
+ * @return queue item id
+ */
+ long getId();
+
+ /**
+ * @return queue id
+ */
+ Long getQueueId();
+
+ /**
+ * @return subject object type pointed by the queue item
+ */
+ String getContentType();
+
+ /**
+ * @return subject object id pointed by the queue item
+ */
+ Long getContentId();
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
new file mode 100644
index 0000000..f8bba02
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import org.apache.cloudstack.api.InternalIdentity;
+
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+@Entity
+@Table(name="sync_queue_item")
+public class SyncQueueItemVO implements SyncQueueItem, InternalIdentity {
+
+ @Id
+ @GeneratedValue(strategy=GenerationType.IDENTITY)
+ @Column(name="id")
+ private Long id = null;
+
+ @Column(name="queue_id")
+ private Long queueId;
+
+ @Column(name="content_type")
+ private String contentType;
+
+ @Column(name="content_id")
+ private Long contentId;
+
+ @Column(name="queue_proc_msid")
+ private Long lastProcessMsid;
+
+ @Column(name="queue_proc_number")
+ private Long lastProcessNumber;
+
+ @Column(name="queue_proc_time")
+ @Temporal(TemporalType.TIMESTAMP)
+ private Date lastProcessTime;
+
+ @Column(name="created")
+ private Date created;
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ @Override
+ public Long getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(Long queueId) {
+ this.queueId = queueId;
+ }
+
+ @Override
+ public String getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ @Override
+ public Long getContentId() {
+ return contentId;
+ }
+
+ public void setContentId(Long contentId) {
+ this.contentId = contentId;
+ }
+
+ public Long getLastProcessMsid() {
+ return lastProcessMsid;
+ }
+
+ public void setLastProcessMsid(Long lastProcessMsid) {
+ this.lastProcessMsid = lastProcessMsid;
+ }
+
+ public Long getLastProcessNumber() {
+ return lastProcessNumber;
+ }
+
+ public void setLastProcessNumber(Long lastProcessNumber) {
+ this.lastProcessNumber = lastProcessNumber;
+ }
+
+ public Date getCreated() {
+ return created;
+ }
+
+ public void setCreated(Date created) {
+ this.created = created;
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("SyncQueueItemVO {id:").append(getId()).append(", queueId: ").append(getQueueId());
+ sb.append(", contentType: ").append(getContentType());
+ sb.append(", contentId: ").append(getContentId());
+ sb.append(", lastProcessMsid: ").append(getLastProcessMsid());
+ sb.append(", lastprocessNumber: ").append(getLastProcessNumber());
+ sb.append(", lastProcessTime: ").append(getLastProcessTime());
+ sb.append(", created: ").append(getCreated());
+ sb.append("}");
+ return sb.toString();
+ }
+
+ public Date getLastProcessTime() {
+ return lastProcessTime;
+ }
+
+ public void setLastProcessTime(Date lastProcessTime) {
+ this.lastProcessTime = lastProcessTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
new file mode 100644
index 0000000..202a704
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.List;
+
+import com.cloud.utils.component.Manager;
+
+public interface SyncQueueManager extends Manager {
+ public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit);
+ public SyncQueueItemVO dequeueFromOne(long queueId, Long msid);
+ public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems);
+ public void purgeItem(long queueItemId);
+ public void returnItem(long queueItemId);
+
+ public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
+ public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
+
+ void purgeAsyncJobQueueItemId(long asyncJobId);
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
new file mode 100644
index 0000000..b9b5d6b
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import javax.inject.Inject;
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
+import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManager {
+ public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
+
+ @Inject private SyncQueueDao _syncQueueDao;
+ @Inject private SyncQueueItemDao _syncQueueItemDao;
+
+ @Override
+ @DB
+ public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) {
+ Transaction txn = Transaction.currentTxn();
+ try {
+ txn.start();
+
+ _syncQueueDao.ensureQueue(syncObjType, syncObjId);
+ SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
+ if(queueVO == null)
+ throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
+
+ queueVO.setQueueSizeLimit(queueSizeLimit);
+ _syncQueueDao.update(queueVO.getId(), queueVO);
+
+ Date dt = DateUtil.currentGMTTime();
+ SyncQueueItemVO item = new SyncQueueItemVO();
+ item.setQueueId(queueVO.getId());
+ item.setContentType(itemType);
+ item.setContentId(itemId);
+ item.setCreated(dt);
+
+ _syncQueueItemDao.persist(item);
+ txn.commit();
+
+ return queueVO;
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception: ", e);
+ txn.rollback();
+ }
+ return null;
+ }
+
+ @Override
+ @DB
+ public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
+ Transaction txt = Transaction.currentTxn();
+ try {
+ txt.start();
+
+ SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
+ if(queueVO == null) {
+ s_logger.error("Sync queue(id: " + queueId + ") does not exist");
+ txt.commit();
+ return null;
+ }
+
+ if(queueReadyToProcess(queueVO)) {
+ SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
+ if(itemVO != null) {
+ Long processNumber = queueVO.getLastProcessNumber();
+ if(processNumber == null)
+ processNumber = new Long(1);
+ else
+ processNumber = processNumber + 1;
+ Date dt = DateUtil.currentGMTTime();
+ queueVO.setLastProcessNumber(processNumber);
+ queueVO.setLastUpdated(dt);
+ queueVO.setQueueSize(queueVO.getQueueSize() + 1);
+ _syncQueueDao.update(queueVO.getId(), queueVO);
+
+ itemVO.setLastProcessMsid(msid);
+ itemVO.setLastProcessNumber(processNumber);
+ itemVO.setLastProcessTime(dt);
+ _syncQueueItemDao.update(itemVO.getId(), itemVO);
+
+ txt.commit();
+ return itemVO;
+ } else {
+ if(s_logger.isDebugEnabled())
+ s_logger.debug("Sync queue (" + queueId + ") is currently empty");
+ }
+ } else {
+ if(s_logger.isDebugEnabled())
+ s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
+ }
+ txt.commit();
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception: ", e);
+ txt.rollback();
+ }
+
+ return null;
+ }
+
+ @Override
+ @DB
+ public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
+
+ List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
+ Transaction txt = Transaction.currentTxn();
+ try {
+ txt.start();
+
+ List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
+ if(l != null && l.size() > 0) {
+ for(SyncQueueItemVO item : l) {
+ SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
+ SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
+ if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
+ Long processNumber = queueVO.getLastProcessNumber();
+ if(processNumber == null)
+ processNumber = new Long(1);
+ else
+ processNumber = processNumber + 1;
+
+ Date dt = DateUtil.currentGMTTime();
+ queueVO.setLastProcessNumber(processNumber);
+ queueVO.setLastUpdated(dt);
+ queueVO.setQueueSize(queueVO.getQueueSize() + 1);
+ _syncQueueDao.update(queueVO.getId(), queueVO);
+
+ itemVO.setLastProcessMsid(msid);
+ itemVO.setLastProcessNumber(processNumber);
+ itemVO.setLastProcessTime(dt);
+ _syncQueueItemDao.update(item.getId(), itemVO);
+
+ resultList.add(item);
+ }
+ }
+ }
+ txt.commit();
+ return resultList;
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception: ", e);
+ txt.rollback();
+ }
+ return null;
+ }
+
+ @Override
+ @DB
+ public void purgeItem(long queueItemId) {
+ Transaction txt = Transaction.currentTxn();
+ try {
+ txt.start();
+
+ SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
+ if(itemVO != null) {
+ SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
+
+ _syncQueueItemDao.expunge(itemVO.getId());
+
+ // if item is active, reset queue information
+ if (itemVO.getLastProcessMsid() != null) {
+ queueVO.setLastUpdated(DateUtil.currentGMTTime());
+ // decrement the count
+ assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
+ queueVO.setQueueSize(queueVO.getQueueSize() - 1);
+ _syncQueueDao.update(queueVO.getId(), queueVO);
+ }
+ }
+ txt.commit();
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception: ", e);
+ txt.rollback();
+ }
+ }
+
+ @Override
+ @DB
+ public void returnItem(long queueItemId) {
+ Transaction txt = Transaction.currentTxn();
+ try {
+ txt.start();
+
+ SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
+ if(itemVO != null) {
+ SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
+
+ itemVO.setLastProcessMsid(null);
+ itemVO.setLastProcessNumber(null);
+ itemVO.setLastProcessTime(null);
+ _syncQueueItemDao.update(queueItemId, itemVO);
+
+ queueVO.setLastUpdated(DateUtil.currentGMTTime());
+ _syncQueueDao.update(queueVO.getId(), queueVO);
+ }
+ txt.commit();
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception: ", e);
+ txt.rollback();
+ }
+ }
+
+ @Override
+ public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
+ return _syncQueueItemDao.getActiveQueueItems(msid, exclusive);
+ }
+
+ @Override
+ public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
+ return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
+ }
+
+ private boolean queueReadyToProcess(SyncQueueVO queueVO) {
+ return true;
+
+ //
+ // TODO
+ //
+ // Need to disable concurrency disable at queue level due to the need to support
+ // job wake-up dispatching task
+ //
+ // Concurrency control is better done at higher level and leave the job scheduling/serializing simpler
+ //
+
+ // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+ }
+
+ @Override
+ public void purgeAsyncJobQueueItemId(long asyncJobId) {
+ Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
+ if (itemId != null) {
+ purgeItem(itemId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
new file mode 100644
index 0000000..4fd4740
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.framework.jobs.impl;
+
+import org.apache.cloudstack.api.InternalIdentity;
+
+import java.util.Date;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+@Entity
+@Table(name="sync_queue")
+public class SyncQueueVO implements InternalIdentity {
+
+ @Id
+ @GeneratedValue(strategy=GenerationType.IDENTITY)
+ @Column(name="id")
+ private Long id;
+
+ @Column(name="sync_objtype")
+
+ private String syncObjType;
+
+ @Column(name="sync_objid")
+ private Long syncObjId;
+
+ @Column(name="queue_proc_number")
+ private Long lastProcessNumber;
+
+ @Column(name="created")
+ @Temporal(TemporalType.TIMESTAMP)
+ private Date created;
+
+ @Column(name="last_updated")
+ @Temporal(TemporalType.TIMESTAMP)
+ private Date lastUpdated;
+
+ @Column(name="queue_size")
+ private long queueSize = 0;
+
+ @Column(name="queue_size_limit")
+ private long queueSizeLimit = 0;
+
+ public long getId() {
+ return id;
+ }
+
+ public String getSyncObjType() {
+ return syncObjType;
+ }
+
+ public void setSyncObjType(String syncObjType) {
+ this.syncObjType = syncObjType;
+ }
+
+ public Long getSyncObjId() {
+ return syncObjId;
+ }
+
+ public void setSyncObjId(Long syncObjId) {
+ this.syncObjId = syncObjId;
+ }
+
+ public Long getLastProcessNumber() {
+ return lastProcessNumber;
+ }
+
+ public void setLastProcessNumber(Long number) {
+ lastProcessNumber = number;
+ }
+
+ public Date getCreated() {
+ return created;
+ }
+
+ public void setCreated(Date created) {
+ this.created = created;
+ }
+
+ public Date getLastUpdated() {
+ return lastUpdated;
+ }
+
+ public void setLastUpdated(Date lastUpdated) {
+ this.lastUpdated = lastUpdated;
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("SyncQueueVO {id:").append(getId());
+ sb.append(", syncObjType: ").append(getSyncObjType());
+ sb.append(", syncObjId: ").append(getSyncObjId());
+ sb.append(", lastProcessNumber: ").append(getLastProcessNumber());
+ sb.append(", lastUpdated: ").append(getLastUpdated());
+ sb.append(", created: ").append(getCreated());
+ sb.append(", count: ").append(getQueueSize());
+ sb.append("}");
+ return sb.toString();
+ }
+
+ public long getQueueSize() {
+ return queueSize;
+ }
+
+ public void setQueueSize(long queueSize) {
+ this.queueSize = queueSize;
+ }
+
+ public long getQueueSizeLimit() {
+ return queueSizeLimit;
+ }
+
+ public void setQueueSizeLimit(long queueSizeLimit) {
+ this.queueSizeLimit = queueSizeLimit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/api/ApiGsonHelper.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiGsonHelper.java b/server/src/com/cloud/api/ApiGsonHelper.java
index 6163860..c24808b 100644
--- a/server/src/com/cloud/api/ApiGsonHelper.java
+++ b/server/src/com/cloud/api/ApiGsonHelper.java
@@ -16,12 +16,27 @@
// under the License.
package com.cloud.api;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
import com.google.gson.GsonBuilder;
+
import org.apache.cloudstack.api.ResponseObject;
-import java.util.Map;
+import com.cloud.serializer.Param;
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.Pair;
public class ApiGsonHelper {
+ private static final Logger s_logger = Logger.getLogger(ApiGsonHelper.class);
private static final GsonBuilder s_gBuilder;
static {
s_gBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
@@ -33,4 +48,107 @@ public class ApiGsonHelper {
public static GsonBuilder getBuilder() {
return s_gBuilder;
}
+
+ public static List<Pair<String, Object>> toPairList(Object o, String name) {
+ List<Pair<String, Object>> l = new ArrayList<Pair<String, Object>>();
+ return appendPairList(l, o, name);
+ }
+
+ public static List<Pair<String, Object>> appendPairList(List<Pair<String, Object>> l, Object o, String name) {
+ if (o != null) {
+ Class<?> clz = o.getClass();
+
+ if (clz.isPrimitive() || clz.getSuperclass() == Number.class || clz == String.class || clz == Date.class) {
+ l.add(new Pair<String, Object>(name, o.toString()));
+ return l;
+ }
+
+ for (Field f : clz.getDeclaredFields()) {
+ if ((f.getModifiers() & Modifier.STATIC) != 0) {
+ continue;
+ }
+
+ Param param = f.getAnnotation(Param.class);
+ if (param == null) {
+ continue;
+ }
+
+ String propName = f.getName();
+ if (!param.propName().isEmpty()) {
+ propName = param.propName();
+ }
+
+ String paramName = param.name();
+ if (paramName.isEmpty()) {
+ paramName = propName;
+ }
+
+ Method method = getGetMethod(o, propName);
+ if (method != null) {
+ try {
+ Object fieldValue = method.invoke(o);
+ if (fieldValue != null) {
+ if (f.getType() == Date.class) {
+ l.add(new Pair<String, Object>(paramName, DateUtil.getOutputString((Date)fieldValue)));
+ } else {
+ l.add(new Pair<String, Object>(paramName, fieldValue.toString()));
+ }
+ }
+ //else
+ // l.add(new Pair<String, Object>(paramName, ""));
+ } catch (IllegalArgumentException e) {
+ s_logger.error("Illegal argument exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
+
+ } catch (IllegalAccessException e) {
+ s_logger.error("Illegal access exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
+ } catch (InvocationTargetException e) {
+ s_logger.error("Invocation target exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
+ }
+ }
+ }
+ }
+ return l;
+ }
+
+ private static Method getGetMethod(Object o, String propName) {
+ Method method = null;
+ String methodName = getGetMethodName("get", propName);
+ try {
+ method = o.getClass().getMethod(methodName);
+ } catch (SecurityException e1) {
+ s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
+ } catch (NoSuchMethodException e1) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName
+ + ", will check is-prefixed method to see if it is boolean property");
+ }
+ }
+
+ if (method != null) {
+ return method;
+ }
+
+ methodName = getGetMethodName("is", propName);
+ try {
+ method = o.getClass().getMethod(methodName);
+ } catch (SecurityException e1) {
+ s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
+ } catch (NoSuchMethodException e1) {
+ s_logger.warn("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName);
+ }
+ return method;
+ }
+
+ private static String getGetMethodName(String prefix, String fieldName) {
+ StringBuffer sb = new StringBuffer(prefix);
+
+ if (fieldName.length() >= prefix.length() && fieldName.substring(0, prefix.length()).equals(prefix)) {
+ return fieldName;
+ } else {
+ sb.append(fieldName.substring(0, 1).toUpperCase());
+ sb.append(fieldName.substring(1));
+ }
+
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/api/ApiServer.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java
index 175c8b8..4db9bb8 100755
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@ -116,7 +116,7 @@ import org.apache.cloudstack.api.response.ListResponse;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.region.RegionManager;
import com.cloud.api.response.ApiResponseSerializer;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/async/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobExecutionContext.java b/server/src/com/cloud/async/AsyncJobExecutionContext.java
index e6efd03..9ca38a9 100644
--- a/server/src/com/cloud/async/AsyncJobExecutionContext.java
+++ b/server/src/com/cloud/async/AsyncJobExecutionContext.java
@@ -20,15 +20,15 @@ import javax.inject.Inject;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.SyncQueueItem;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
import com.cloud.exception.ConcurrentOperationException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.ResourceUnavailableException;
-import com.cloud.serializer.SerializerHelper;
import com.cloud.utils.component.ComponentContext;
public class AsyncJobExecutionContext {
@@ -109,7 +109,7 @@ public class AsyncJobExecutionContext {
AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
if(record.getJoinStatus() == AsyncJobConstants.STATUS_FAILED && record.getJoinResult() != null) {
- Object exception = SerializerHelper.fromObjectSerializedString(record.getJoinResult());
+ Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
if(exception != null && exception instanceof Exception) {
if(exception instanceof InsufficientCapacityException)
throw (InsufficientCapacityException)exception;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 42cbae8..f147bb0 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -43,19 +43,19 @@ import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
-import org.apache.cloudstack.framework.jobs.AsyncJobJournalVO;
-import org.apache.cloudstack.framework.jobs.AsyncJobMBeanImpl;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobMonitor;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
-import org.apache.cloudstack.framework.jobs.SyncQueueItem;
-import org.apache.cloudstack.framework.jobs.SyncQueueItemVO;
-import org.apache.cloudstack.framework.jobs.SyncQueueManager;
-import org.apache.cloudstack.framework.jobs.SyncQueueVO;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobMBeanImpl;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueManager;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDetector;
import org.apache.cloudstack.framework.messagebus.PublishScope;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
index edc4216..32d1c19 100644
--- a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
+++ b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
@@ -35,8 +35,8 @@ import org.apache.cloudstack.api.command.user.snapshot.CreateSnapshotCmd;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import com.cloud.api.ApiDispatcher;
import com.cloud.api.ApiGsonHelper;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/template/TemplateManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/template/TemplateManagerImpl.java b/server/src/com/cloud/template/TemplateManagerImpl.java
index c811b5e..3317fa1 100755
--- a/server/src/com/cloud/template/TemplateManagerImpl.java
+++ b/server/src/com/cloud/template/TemplateManagerImpl.java
@@ -68,7 +68,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
import org.apache.cloudstack.framework.async.AsyncCallFuture;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
index a5eceec..8109128 100644
--- a/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
+++ b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
@@ -29,8 +29,8 @@ import org.apache.log4j.Logger;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
import org.apache.cloudstack.vm.jobs.VmWorkJobDao;
import org.apache.cloudstack.vm.jobs.VmWorkJobVO;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/test/com/cloud/vm/VmWorkTest.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTest.java b/server/test/com/cloud/vm/VmWorkTest.java
index 3cb78da..d236cc3 100644
--- a/server/test/com/cloud/vm/VmWorkTest.java
+++ b/server/test/com/cloud/vm/VmWorkTest.java
@@ -41,7 +41,6 @@ import com.cloud.deploy.DeploymentPlan;
import com.cloud.deploy.DeploymentPlanner.ExcludeList;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.InsufficientStorageCapacityException;
-import com.cloud.serializer.SerializerHelper;
import com.cloud.utils.LogUtils;
import com.cloud.utils.Predicate;
import com.cloud.utils.component.ComponentContext;
@@ -49,7 +48,8 @@ import com.cloud.utils.db.Transaction;
import com.google.gson.Gson;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
import org.apache.cloudstack.vm.jobs.VmWorkJobDao;
import org.apache.cloudstack.vm.jobs.VmWorkJobVO;
import org.apache.cloudstack.vm.jobs.VmWorkJobVO.Step;
@@ -167,10 +167,10 @@ public class VmWorkTest extends TestCase {
public void testExceptionSerialization() {
InsufficientCapacityException exception = new InsufficientStorageCapacityException("foo", VmWorkJobVO.class, 1L);
- String encodedString = SerializerHelper.toObjectSerializedString(exception);
+ String encodedString = JobSerializerHelper.toObjectSerializedString(exception);
System.out.println(encodedString);
- exception = (InsufficientCapacityException)SerializerHelper.fromObjectSerializedString(encodedString);
+ exception = (InsufficientCapacityException)JobSerializerHelper.fromObjectSerializedString(encodedString);
Assert.assertTrue(exception.getScope() == VmWorkJobVO.class);
Assert.assertTrue(exception.getMessage().equals("foo"));
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/test/com/cloud/vm/VmWorkTestConfiguration.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTestConfiguration.java b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
index b436b16..cd0dc2c 100644
--- a/server/test/com/cloud/vm/VmWorkTestConfiguration.java
+++ b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
@@ -20,9 +20,6 @@ import org.mockito.Mockito;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.apache.cloudstack.framework.jobs.AsyncJobMonitor;
-import org.apache.cloudstack.framework.jobs.SyncQueueManager;
-import org.apache.cloudstack.framework.jobs.SyncQueueManagerImpl;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobDaoImpl;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
@@ -33,6 +30,9 @@ import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
import org.apache.cloudstack.framework.jobs.dao.SyncQueueDaoImpl;
import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDaoImpl;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueManager;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueManagerImpl;
import org.apache.cloudstack.vm.jobs.VmWorkJobDao;
import org.apache.cloudstack.vm.jobs.VmWorkJobDaoImpl;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/utils/pom.xml
----------------------------------------------------------------------
diff --git a/utils/pom.xml b/utils/pom.xml
index 0690c35..878e91d 100644
--- a/utils/pom.xml
+++ b/utils/pom.xml
@@ -64,6 +64,11 @@
<version>${cs.codec.version}</version>
</dependency>
<dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${cs.gson.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>${cs.collections.version}</version>
[3/3] git commit: updated refs/heads/vmsync to bd0c239
Posted by ah...@apache.org.
Added missing call context
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/bd0c239f
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/bd0c239f
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/bd0c239f
Branch: refs/heads/vmsync
Commit: bd0c239f6826c9fa0892e01d2499619a960641be
Parents: dd11254
Author: Alex Huang <al...@gmail.com>
Authored: Tue Jun 4 15:32:39 2013 -0700
Committer: Alex Huang <al...@gmail.com>
Committed: Tue Jun 4 15:32:39 2013 -0700
----------------------------------------------------------------------
server/src/com/cloud/vm/SystemVmLoadScanner.java | 9 +++++++++
1 files changed, 9 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/bd0c239f/server/src/com/cloud/vm/SystemVmLoadScanner.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/SystemVmLoadScanner.java b/server/src/com/cloud/vm/SystemVmLoadScanner.java
index 4251b40..a220757 100644
--- a/server/src/com/cloud/vm/SystemVmLoadScanner.java
+++ b/server/src/com/cloud/vm/SystemVmLoadScanner.java
@@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
+import org.apache.cloudstack.context.CallContext;
+
+import com.cloud.exception.CloudAuthenticationException;
import com.cloud.utils.Pair;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.GlobalLock;
@@ -67,6 +70,12 @@ public class SystemVmLoadScanner<T> {
@Override
public void run() {
+ try {
+ CallContext.registerOnceOnly();
+ } catch (CloudAuthenticationException e) {
+ s_logger.error("Unable to start the capacity scan task", e);
+ System.exit(1);
+ }
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
reallyRun();
[2/3] git commit: updated refs/heads/vmsync to bd0c239
Posted by ah...@apache.org.
further refactored jobs
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/dd112540
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/dd112540
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/dd112540
Branch: refs/heads/vmsync
Commit: dd11254087fbe2a46bb7a09467ab124f48204849
Parents: 51f533e
Author: Alex Huang <al...@gmail.com>
Authored: Tue Jun 4 13:54:33 2013 -0700
Committer: Alex Huang <al...@gmail.com>
Committed: Tue Jun 4 13:54:33 2013 -0700
----------------------------------------------------------------------
client/tomcatconf/applicationContext.xml.in | 4 +-
.../src/com/cloud/serializer/SerializerHelper.java | 233 ---------
.../org/apache/cloudstack/vm/jobs/VmWorkJobVO.java | 2 +-
.../apache/cloudstack/framework/jobs/AsyncJob.java | 1 +
.../framework/jobs/AsyncJobJoinMapVO.java | 210 --------
.../framework/jobs/AsyncJobJournalVO.java | 108 ----
.../framework/jobs/AsyncJobMBeanImpl.java | 143 ------
.../cloudstack/framework/jobs/AsyncJobManager.java | 1 +
.../cloudstack/framework/jobs/AsyncJobMonitor.java | 179 -------
.../cloudstack/framework/jobs/AsyncJobVO.java | 384 --------------
.../cloudstack/framework/jobs/SyncQueueItem.java | 41 --
.../cloudstack/framework/jobs/SyncQueueItemVO.java | 143 ------
.../framework/jobs/SyncQueueManager.java | 34 --
.../framework/jobs/SyncQueueManagerImpl.java | 258 ----------
.../cloudstack/framework/jobs/SyncQueueVO.java | 137 -----
.../cloudstack/framework/jobs/dao/AsyncJobDao.java | 2 +-
.../framework/jobs/dao/AsyncJobDaoImpl.java | 2 +-
.../framework/jobs/dao/AsyncJobJoinMapDao.java | 2 +-
.../framework/jobs/dao/AsyncJobJoinMapDaoImpl.java | 2 +-
.../framework/jobs/dao/AsyncJobJournalDao.java | 2 +-
.../framework/jobs/dao/AsyncJobJournalDaoImpl.java | 2 +-
.../framework/jobs/dao/SyncQueueDao.java | 2 +-
.../framework/jobs/dao/SyncQueueDaoImpl.java | 2 +-
.../framework/jobs/dao/SyncQueueItemDao.java | 2 +-
.../framework/jobs/dao/SyncQueueItemDaoImpl.java | 2 +-
.../framework/jobs/impl/AsyncJobJoinMapVO.java | 210 ++++++++
.../framework/jobs/impl/AsyncJobJournalVO.java | 111 +++++
.../framework/jobs/impl/AsyncJobMBeanImpl.java | 147 ++++++
.../framework/jobs/impl/AsyncJobMonitor.java | 182 +++++++
.../cloudstack/framework/jobs/impl/AsyncJobVO.java | 385 +++++++++++++++
.../framework/jobs/impl/JobSerializerHelper.java | 127 +++++
.../framework/jobs/impl/SyncQueueItem.java | 41 ++
.../framework/jobs/impl/SyncQueueItemVO.java | 143 ++++++
.../framework/jobs/impl/SyncQueueManager.java | 34 ++
.../framework/jobs/impl/SyncQueueManagerImpl.java | 258 ++++++++++
.../framework/jobs/impl/SyncQueueVO.java | 137 +++++
server/src/com/cloud/api/ApiGsonHelper.java | 120 +++++-
server/src/com/cloud/api/ApiServer.java | 2 +-
.../com/cloud/async/AsyncJobExecutionContext.java | 8 +-
.../src/com/cloud/async/AsyncJobManagerImpl.java | 18 +-
.../storage/snapshot/SnapshotSchedulerImpl.java | 2 +-
.../com/cloud/template/TemplateManagerImpl.java | 2 +-
.../com/cloud/vm/VmWorkJobWakeupDispatcher.java | 2 +-
server/test/com/cloud/vm/VmWorkTest.java | 8 +-
.../test/com/cloud/vm/VmWorkTestConfiguration.java | 6 +-
utils/pom.xml | 5 +
46 files changed, 1938 insertions(+), 1908 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/client/tomcatconf/applicationContext.xml.in
----------------------------------------------------------------------
diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in
index 1c98346..80f380b 100644
--- a/client/tomcatconf/applicationContext.xml.in
+++ b/client/tomcatconf/applicationContext.xml.in
@@ -809,10 +809,10 @@
<bean id="asyncJobJournalDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDaoImpl" />
<bean id="asyncJobJoinMapDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDaoImpl" />
<bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl"/>
- <bean id="asyncJobMonitor" class="org.apache.cloudstack.framework.jobs.AsyncJobMonitor"/>
+ <bean id="asyncJobMonitor" class="org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor"/>
<bean id="syncQueueDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.SyncQueueDaoImpl" />
<bean id="syncQueueItemDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDaoImpl" />
- <bean id="syncQueueManagerImpl" class="org.apache.cloudstack.framework.jobs.SyncQueueManagerImpl" />
+ <bean id="syncQueueManagerImpl" class="org.apache.cloudstack.framework.jobs.impl.SyncQueueManagerImpl" />
<bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher">
<property name="name" value="ApiAsyncJobDispatcher" />
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/core/src/com/cloud/serializer/SerializerHelper.java
----------------------------------------------------------------------
diff --git a/core/src/com/cloud/serializer/SerializerHelper.java b/core/src/com/cloud/serializer/SerializerHelper.java
deleted file mode 100644
index e3449f6..0000000
--- a/core/src/com/cloud/serializer/SerializerHelper.java
+++ /dev/null
@@ -1,233 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.serializer;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.log4j.Logger;
-
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.Pair;
-import com.google.gson.Gson;
-
-/**
- * Note: toPairList and appendPairList only support simple POJO objects currently
- */
-public class SerializerHelper {
- public static final Logger s_logger = Logger.getLogger(SerializerHelper.class.getName());
- public static String token = "/";
-
- public static String toSerializedString(Object result) {
- if(result != null) {
- Class<?> clz = result.getClass();
- Gson gson = GsonHelper.getGson();
- return clz.getName() + token + gson.toJson(result);
- }
- return null;
- }
-
- public static Object fromSerializedString(String result) {
- try {
- if(result != null && !result.isEmpty()) {
-
- String[] serializedParts = result.split(token);
-
- if (serializedParts.length < 2) {
- return null;
- }
- String clzName = serializedParts[0];
- String nameField = null;
- String content = null;
- if (serializedParts.length == 2) {
- content = serializedParts[1];
- } else {
- nameField = serializedParts[1];
- int index = result.indexOf(token + nameField + token);
- content = result.substring(index + nameField.length() + 2);
- }
-
- Class<?> clz;
- try {
- clz = Class.forName(clzName);
- } catch (ClassNotFoundException e) {
- return null;
- }
-
- Gson gson = GsonHelper.getGson();
- Object obj = gson.fromJson(content, clz);
- return obj;
- }
- return null;
- } catch(RuntimeException e) {
- s_logger.error("Caught runtime exception when doing GSON deserialization on: " + result);
- throw e;
- }
- }
-
- public static String toObjectSerializedString(Serializable object) {
- assert(object != null);
-
- ByteArrayOutputStream bs = new ByteArrayOutputStream();
- try {
- ObjectOutputStream os = new ObjectOutputStream(bs);
- os.writeObject(object);
- os.close();
- bs.close();
-
- return Base64.encodeBase64URLSafeString(bs.toByteArray());
- } catch(IOException e) {
- s_logger.error("Unexpected exception", e);
- }
- return null;
- }
-
- public static Object fromObjectSerializedString(String base64EncodedString) {
- if(base64EncodedString == null)
- return null;
-
- byte[] content = Base64.decodeBase64(base64EncodedString);
- ByteArrayInputStream bs = new ByteArrayInputStream(content);
- try {
- ObjectInputStream is = new ObjectInputStream(bs);
- Object obj = is.readObject();
- is.close();
- bs.close();
- return obj;
- } catch(IOException e) {
- s_logger.error("Unexpected exception", e);
- } catch(ClassNotFoundException e) {
- s_logger.error("Unexpected exception", e);
- }
-
- return null;
- }
-
- public static List<Pair<String, Object>> toPairList(Object o, String name) {
- List<Pair<String, Object>> l = new ArrayList<Pair<String, Object>>();
- return appendPairList(l, o, name);
- }
-
- public static List<Pair<String, Object>> appendPairList(List<Pair<String, Object>> l, Object o, String name) {
- if(o != null) {
- Class<?> clz = o.getClass();
-
- if(clz.isPrimitive() || clz.getSuperclass() == Number.class || clz == String.class || clz == Date.class) {
- l.add(new Pair<String, Object>(name, o.toString()));
- return l;
- }
-
- for(Field f : clz.getDeclaredFields()) {
- if((f.getModifiers() & Modifier.STATIC) != 0) {
- continue;
- }
-
- Param param = f.getAnnotation(Param.class);
- if(param == null) {
- continue;
- }
-
- String propName = f.getName();
- if(!param.propName().isEmpty()) {
- propName = param.propName();
- }
-
- String paramName = param.name();
- if(paramName.isEmpty()) {
- paramName = propName;
- }
-
- Method method = getGetMethod(o, propName);
- if(method != null) {
- try {
- Object fieldValue = method.invoke(o);
- if(fieldValue != null) {
- if (f.getType() == Date.class) {
- l.add(new Pair<String, Object>(paramName, DateUtil.getOutputString((Date)fieldValue)));
- } else {
- l.add(new Pair<String, Object>(paramName, fieldValue.toString()));
- }
- }
- //else
- // l.add(new Pair<String, Object>(paramName, ""));
- } catch (IllegalArgumentException e) {
- s_logger.error("Illegal argument exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
-
- } catch (IllegalAccessException e) {
- s_logger.error("Illegal access exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
- } catch (InvocationTargetException e) {
- s_logger.error("Invocation target exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
- }
- }
- }
- }
- return l;
- }
-
- private static Method getGetMethod(Object o, String propName) {
- Method method = null;
- String methodName = getGetMethodName("get", propName);
- try {
- method = o.getClass().getMethod(methodName);
- } catch (SecurityException e1) {
- s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
- } catch (NoSuchMethodException e1) {
- if(s_logger.isTraceEnabled()) {
- s_logger.trace("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName + ", will check is-prefixed method to see if it is boolean property");
- }
- }
-
- if(method != null) {
- return method;
- }
-
- methodName = getGetMethodName("is", propName);
- try {
- method = o.getClass().getMethod(methodName);
- } catch (SecurityException e1) {
- s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
- } catch (NoSuchMethodException e1) {
- s_logger.warn("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName);
- }
- return method;
- }
-
- private static String getGetMethodName(String prefix, String fieldName) {
- StringBuffer sb = new StringBuffer(prefix);
-
- if(fieldName.length() >= prefix.length() && fieldName.substring(0, prefix.length()).equals(prefix)) {
- return fieldName;
- } else {
- sb.append(fieldName.substring(0, 1).toUpperCase());
- sb.append(fieldName.substring(1));
- }
-
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobVO.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobVO.java b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobVO.java
index bdf8301..f4dd395 100644
--- a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobVO.java
+++ b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobVO.java
@@ -24,7 +24,7 @@ import javax.persistence.Enumerated;
import javax.persistence.PrimaryKeyJoinColumn;
import javax.persistence.Table;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachine.Type;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
index 7a898e8..92c89ee 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
@@ -18,6 +18,7 @@ package org.apache.cloudstack.framework.jobs;
import java.util.Date;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
import org.apache.cloudstack.jobs.Job;
public interface AsyncJob extends Job {
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJoinMapVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJoinMapVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJoinMapVO.java
deleted file mode 100644
index 56eddf9..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJoinMapVO.java
+++ /dev/null
@@ -1,210 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.Date;
-
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-import javax.persistence.Temporal;
-import javax.persistence.TemporalType;
-
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.db.GenericDao;
-
-@Entity
-@Table(name="async_job_join_map")
-public class AsyncJobJoinMapVO {
- @Id
- @GeneratedValue(strategy=GenerationType.IDENTITY)
- @Column(name="id")
- private Long id = null;
-
- @Column(name="job_id")
- private long jobId;
-
- @Column(name="join_job_id")
- private long joinJobId;
-
- @Column(name="join_status")
- private int joinStatus;
-
- @Column(name="join_result", length=1024)
- private String joinResult;
-
- @Column(name="join_msid")
- private long joinMsid;
-
- @Column(name="complete_msid")
- private Long completeMsid;
-
- @Column(name="sync_source_id")
- private Long syncSourceId;
-
- @Column(name="wakeup_handler")
- private String wakeupHandler;
-
- @Column(name="wakeup_dispatcher")
- private String wakeupDispatcher;
-
- @Column(name="wakeup_interval")
- private long wakeupInterval;
-
- @Column(name=GenericDao.CREATED_COLUMN)
- private Date created;
-
- @Column(name="last_updated")
- @Temporal(TemporalType.TIMESTAMP)
- private Date lastUpdated;
-
- @Column(name="next_wakeup")
- @Temporal(TemporalType.TIMESTAMP)
- private Date nextWakeupTime;
-
- @Column(name="expiration")
- @Temporal(TemporalType.TIMESTAMP)
- private Date expiration;
-
- public AsyncJobJoinMapVO() {
- created = DateUtil.currentGMTTime();
- lastUpdated = DateUtil.currentGMTTime();
- }
-
- public Long getId() {
- return id;
- }
-
- public void setId(Long id) {
- this.id = id;
- }
-
- public long getJobId() {
- return jobId;
- }
-
- public void setJobId(long jobId) {
- this.jobId = jobId;
- }
-
- public long getJoinJobId() {
- return joinJobId;
- }
-
- public void setJoinJobId(long joinJobId) {
- this.joinJobId = joinJobId;
- }
-
- public int getJoinStatus() {
- return joinStatus;
- }
-
- public void setJoinStatus(int joinStatus) {
- this.joinStatus = joinStatus;
- }
-
- public String getJoinResult() {
- return joinResult;
- }
-
- public void setJoinResult(String joinResult) {
- this.joinResult = joinResult;
- }
-
- public long getJoinMsid() {
- return joinMsid;
- }
-
- public void setJoinMsid(long joinMsid) {
- this.joinMsid = joinMsid;
- }
-
- public Long getCompleteMsid() {
- return completeMsid;
- }
-
- public void setCompleteMsid(Long completeMsid) {
- this.completeMsid = completeMsid;
- }
-
- public Date getCreated() {
- return created;
- }
-
- public void setCreated(Date created) {
- this.created = created;
- }
-
- public Date getLastUpdated() {
- return lastUpdated;
- }
-
- public void setLastUpdated(Date lastUpdated) {
- this.lastUpdated = lastUpdated;
- }
-
- public Long getSyncSourceId() {
- return syncSourceId;
- }
-
- public void setSyncSourceId(Long syncSourceId) {
- this.syncSourceId = syncSourceId;
- }
-
- public String getWakeupHandler() {
- return wakeupHandler;
- }
-
- public void setWakeupHandler(String wakeupHandler) {
- this.wakeupHandler = wakeupHandler;
- }
-
- public String getWakeupDispatcher() {
- return wakeupDispatcher;
- }
-
- public void setWakeupDispatcher(String wakeupDispatcher) {
- this.wakeupDispatcher = wakeupDispatcher;
- }
-
- public long getWakeupInterval() {
- return wakeupInterval;
- }
-
- public void setWakeupInterval(long wakeupInterval) {
- this.wakeupInterval = wakeupInterval;
- }
-
- public Date getNextWakeupTime() {
- return nextWakeupTime;
- }
-
- public void setNextWakeupTime(Date nextWakeupTime) {
- this.nextWakeupTime = nextWakeupTime;
- }
-
- public Date getExpiration() {
- return expiration;
- }
-
- public void setExpiration(Date expiration) {
- this.expiration = expiration;
- }
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJournalVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJournalVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJournalVO.java
deleted file mode 100644
index 8cb5078..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobJournalVO.java
+++ /dev/null
@@ -1,108 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.Date;
-
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.EnumType;
-import javax.persistence.Enumerated;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.db.GenericDao;
-
-@Entity
-@Table(name="async_job_journal")
-public class AsyncJobJournalVO {
- @Id
- @GeneratedValue(strategy=GenerationType.IDENTITY)
- @Column(name="id")
- private Long id = null;
-
- @Column(name="job_id")
- private long jobId;
-
- @Column(name="journal_type", updatable=false, nullable=false, length=32)
- @Enumerated(value=EnumType.STRING)
- private AsyncJob.JournalType journalType;
-
- @Column(name="journal_text", length=1024)
- private String journalText;
-
- @Column(name="journal_obj", length=1024)
- private String journalObjJsonString;
-
- @Column(name=GenericDao.CREATED_COLUMN)
- protected Date created;
-
- public AsyncJobJournalVO() {
- created = DateUtil.currentGMTTime();
- }
-
- public Long getId() {
- return id;
- }
-
- public void setId(Long id) {
- this.id = id;
- }
-
- public long getJobId() {
- return jobId;
- }
-
- public void setJobId(long jobId) {
- this.jobId = jobId;
- }
-
- public AsyncJob.JournalType getJournalType() {
- return journalType;
- }
-
- public void setJournalType(AsyncJob.JournalType journalType) {
- this.journalType = journalType;
- }
-
- public String getJournalText() {
- return journalText;
- }
-
- public void setJournalText(String journalText) {
- this.journalText = journalText;
- }
-
- public String getJournalObjJsonString() {
- return journalObjJsonString;
- }
-
- public void setJournalObjJsonString(String journalObjJsonString) {
- this.journalObjJsonString = journalObjJsonString;
- }
-
- public Date getCreated() {
- return created;
- }
-
- public void setCreated(Date created) {
- this.created = created;
- }
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBeanImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBeanImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBeanImpl.java
deleted file mode 100644
index e420a90..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBeanImpl.java
+++ /dev/null
@@ -1,143 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.Date;
-import java.util.TimeZone;
-
-import javax.management.StandardMBean;
-
-import com.cloud.utils.DateUtil;
-
-public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
- private AsyncJob _job;
-
- public AsyncJobMBeanImpl(AsyncJob job) {
- super(AsyncJobMBean.class, false);
-
- _job = job;
- }
-
- public long getAccountId() {
- return _job.getAccountId();
- }
-
- public long getUserId() {
- return _job.getUserId();
- }
-
- public String getCmd() {
- return _job.getCmd();
- }
-
- public String getCmdInfo() {
- return _job.getCmdInfo();
- }
-
- public String getStatus() {
- int jobStatus = _job.getStatus();
- switch(jobStatus) {
- case AsyncJobConstants.STATUS_SUCCEEDED :
- return "Completed";
-
- case AsyncJobConstants.STATUS_IN_PROGRESS:
- return "In preogress";
-
- case AsyncJobConstants.STATUS_FAILED:
- return "failed";
- }
-
- return "Unknow";
- }
-
- public int getProcessStatus() {
- return _job.getProcessStatus();
- }
-
- public int getResultCode() {
- return _job.getResultCode();
- }
-
- public String getResult() {
- return _job.getResult();
- }
-
- public String getInstanceType() {
- if(_job.getInstanceType() != null)
- return _job.getInstanceType().toString();
- return "N/A";
- }
-
- public String getInstanceId() {
- if(_job.getInstanceId() != null)
- return String.valueOf(_job.getInstanceId());
- return "N/A";
- }
-
- public String getInitMsid() {
- if(_job.getInitMsid() != null) {
- return String.valueOf(_job.getInitMsid());
- }
- return "N/A";
- }
-
- public String getCreateTime() {
- Date time = _job.getCreated();
- if(time != null)
- return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
- return "N/A";
- }
-
- public String getLastUpdateTime() {
- Date time = _job.getLastUpdated();
- if(time != null)
- return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
- return "N/A";
- }
-
- public String getLastPollTime() {
- Date time = _job.getLastPolled();
-
- if(time != null)
- return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
- return "N/A";
- }
-
- public String getSyncQueueId() {
- SyncQueueItem item = _job.getSyncSource();
- if(item != null && item.getQueueId() != null) {
- return String.valueOf(item.getQueueId());
- }
- return "N/A";
- }
-
- public String getSyncQueueContentType() {
- SyncQueueItem item = _job.getSyncSource();
- if(item != null) {
- return item.getContentType();
- }
- return "N/A";
- }
-
- public String getSyncQueueContentId() {
- SyncQueueItem item = _job.getSyncSource();
- if(item != null && item.getContentId() != null) {
- return String.valueOf(item.getContentId());
- }
- return "N/A";
- }
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
index 86e48ed..66c56e2 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
@@ -19,6 +19,7 @@ package org.apache.cloudstack.framework.jobs;
import java.util.List;
import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import com.cloud.utils.Predicate;
import com.cloud.utils.component.Manager;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMonitor.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMonitor.java
deleted file mode 100644
index ed2c943..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMonitor.java
+++ /dev/null
@@ -1,179 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.inject.Inject;
-import javax.naming.ConfigurationException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cloudstack.framework.messagebus.MessageBus;
-import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
-import org.apache.cloudstack.framework.messagebus.MessageHandler;
-
-import com.cloud.utils.component.ManagerBase;
-
-public class AsyncJobMonitor extends ManagerBase {
- public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class);
-
- @Inject private MessageBus _messageBus;
-
- private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
- private final Timer _timer = new Timer();
-
- private volatile int _activePoolThreads = 0;
- private volatile int _activeInplaceThreads = 0;
-
- // configuration
- private long _inactivityCheckIntervalMs = 60000;
- private long _inactivityWarningThresholdMs = 90000;
-
- public AsyncJobMonitor() {
- }
-
- public long getInactivityCheckIntervalMs() {
- return _inactivityCheckIntervalMs;
- }
-
- public void setInactivityCheckIntervalMs(long intervalMs) {
- _inactivityCheckIntervalMs = intervalMs;
- }
-
- public long getInactivityWarningThresholdMs() {
- return _inactivityWarningThresholdMs;
- }
-
- public void setInactivityWarningThresholdMs(long thresholdMs) {
- _inactivityWarningThresholdMs = thresholdMs;
- }
-
- @MessageHandler(topic = AsyncJob.Topics.JOB_HEARTBEAT)
- public void onJobHeartbeatNotify(String subject, String senderAddress, Object args) {
- if(args != null && args instanceof Long) {
- synchronized(this) {
- ActiveTaskRecord record = _activeTasks.get(args);
- if(record != null) {
- record.updateJobHeartbeatTick();
- }
- }
- }
- }
-
- private void heartbeat() {
- synchronized(this) {
- for(Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) {
- if(entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) {
- s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for "
- + entry.getValue().millisSinceLastJobHeartbeat()/1000 + " seconds");
- }
- }
- }
- }
-
- @Override
- public boolean configure(String name, Map<String, Object> params)
- throws ConfigurationException {
-
- _messageBus.subscribe(AsyncJob.Topics.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this));
- _timer.scheduleAtFixedRate(new TimerTask() {
-
- @Override
- public void run() {
- heartbeat();
- }
-
- }, _inactivityCheckIntervalMs, _inactivityCheckIntervalMs);
- return true;
- }
-
- public void registerActiveTask(long jobId) {
- synchronized(this) {
- assert(_activeTasks.get(jobId) == null);
-
- long threadId = Thread.currentThread().getId();
- boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX);
- ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread);
- _activeTasks.put(jobId, record);
- if(fromPoolThread)
- _activePoolThreads++;
- else
- _activeInplaceThreads++;
- }
- }
-
- public void unregisterActiveTask(long jobId) {
- synchronized(this) {
- ActiveTaskRecord record = _activeTasks.get(jobId);
- assert(record != null);
- if(record != null) {
- if(record.isPoolThread())
- _activePoolThreads--;
- else
- _activeInplaceThreads--;
-
- _activeTasks.remove(jobId);
- }
- }
- }
-
- public int getActivePoolThreads() {
- return _activePoolThreads;
- }
-
- public int getActiveInplaceThread() {
- return _activeInplaceThreads;
- }
-
- private static class ActiveTaskRecord {
- long _jobId;
- long _threadId;
- boolean _fromPoolThread;
- long _jobLastHeartbeatTick;
-
- public ActiveTaskRecord(long jobId, long threadId, boolean fromPoolThread) {
- _threadId = threadId;
- _jobId = jobId;
- _fromPoolThread = fromPoolThread;
- _jobLastHeartbeatTick = System.currentTimeMillis();
- }
-
- public long getThreadId() {
- return _threadId;
- }
-
- public long getJobId() {
- return _jobId;
- }
-
- public boolean isPoolThread() {
- return _fromPoolThread;
- }
-
- public void updateJobHeartbeatTick() {
- _jobLastHeartbeatTick = System.currentTimeMillis();
- }
-
- public long millisSinceLastJobHeartbeat() {
- return System.currentTimeMillis() - _jobLastHeartbeatTick;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobVO.java
deleted file mode 100644
index 9c9001a..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobVO.java
+++ /dev/null
@@ -1,384 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.Date;
-import java.util.UUID;
-
-import javax.persistence.Column;
-import javax.persistence.DiscriminatorColumn;
-import javax.persistence.DiscriminatorType;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Inheritance;
-import javax.persistence.InheritanceType;
-import javax.persistence.Table;
-import javax.persistence.Temporal;
-import javax.persistence.TemporalType;
-import javax.persistence.Transient;
-
-import org.apache.cloudstack.jobs.Job;
-
-import com.cloud.utils.UuidUtils;
-import com.cloud.utils.db.GenericDao;
-
-@Entity
-@Table(name="async_job")
-@Inheritance(strategy=InheritanceType.JOINED)
-@DiscriminatorColumn(name="job_type", discriminatorType=DiscriminatorType.STRING, length=32)
-public class AsyncJobVO implements AsyncJob, Job {
-
- @Id
- @GeneratedValue(strategy=GenerationType.IDENTITY)
- @Column(name="id")
- private Long id = null;
-
- @Column(name="job_type", length=32)
- protected String type;
-
- @Column(name="job_dispatcher", length=64)
- protected String dispatcher;
-
- @Column(name="job_pending_signals")
- protected int pendingSignals;
-
- @Column(name="user_id")
- private long userId;
-
- @Column(name="account_id")
- private long accountId;
-
- @Column(name="job_cmd")
- private String cmd;
-
- @Column(name="job_cmd_ver")
- private int cmdVersion;
-
- @Column(name="job_cmd_info", length=65535)
- private String cmdInfo;
-
- @Column(name="job_status")
- private int status;
-
- @Column(name="job_process_status")
- private int processStatus;
-
- @Column(name="job_result_code")
- private int resultCode;
-
- @Column(name="job_result", length=65535)
- private String result;
-
- @Column(name="instance_type", length=64)
- private String instanceType;
-
- @Column(name="instance_id", length=64)
- private Long instanceId;
-
- @Column(name="job_init_msid")
- private Long initMsid;
-
- @Column(name="job_complete_msid")
- private Long completeMsid;
-
- @Column(name="job_executing_msid")
- private Long executingMsid;
-
- @Column(name=GenericDao.CREATED_COLUMN)
- private Date created;
-
- @Column(name="last_updated")
- @Temporal(TemporalType.TIMESTAMP)
- private Date lastUpdated;
-
- @Column(name="last_polled")
- @Temporal(TemporalType.TIMESTAMP)
- private Date lastPolled;
-
- @Column(name=GenericDao.REMOVED_COLUMN)
- private Date removed;
-
- @Column(name="uuid")
- private String uuid;
-
- @Transient
- private SyncQueueItem syncSource = null;
-
- public AsyncJobVO() {
- uuid = UUID.randomUUID().toString();
- }
-
- public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo, Long instanceId, String instanceType) {
- this.userId = userId;
- this.accountId = accountId;
- this.cmd = cmd;
- this.cmdInfo = cmdInfo;
- uuid = UUID.randomUUID().toString();
- this.instanceId = instanceId;
- this.instanceType = instanceType;
- }
-
- @Override
- public long getId() {
- return id;
- }
-
- public void setId(Long id) {
- this.id = id;
- }
-
- @Override
- public String getShortUuid() {
- return UuidUtils.first(uuid);
- }
-
- @Override
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- @Override
- public String getDispatcher() {
- return dispatcher;
- }
-
- public void setDispatcher(String dispatcher) {
- this.dispatcher = dispatcher;
- }
-
- @Override
- public int getPendingSignals() {
- return pendingSignals;
- }
-
- public void setPendingSignals(int signals) {
- pendingSignals = signals;
- }
-
- @Override
- public long getUserId() {
- return userId;
- }
-
- public void setUserId(long userId) {
- this.userId = userId;
- }
-
- @Override
- public long getAccountId() {
- return accountId;
- }
-
- public void setAccountId(long accountId) {
- this.accountId = accountId;
- }
-
- @Override
- public String getCmd() {
- return cmd;
- }
-
- public void setCmd(String cmd) {
- this.cmd = cmd;
- }
-
- @Override
- public int getCmdVersion() {
- return cmdVersion;
- }
-
- public void setCmdVersion(int version) {
- cmdVersion = version;
- }
-
- @Override
- public String getCmdInfo() {
- return cmdInfo;
- }
-
- public void setCmdInfo(String cmdInfo) {
- this.cmdInfo = cmdInfo;
- }
-
- @Override
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
- @Override
- public int getProcessStatus() {
- return processStatus;
- }
-
- public void setProcessStatus(int status) {
- processStatus = status;
- }
-
- @Override
- public int getResultCode() {
- return resultCode;
- }
-
- public void setResultCode(int resultCode) {
- this.resultCode = resultCode;
- }
-
- @Override
- public String getResult() {
- return result;
- }
-
- public void setResult(String result) {
- this.result = result;
- }
-
- @Override
- public Long getInitMsid() {
- return initMsid;
- }
-
- @Override
- public void setInitMsid(Long initMsid) {
- this.initMsid = initMsid;
- }
-
- @Override
- public Long getExecutingMsid() {
- return executingMsid;
- }
-
- public void setExecutingMsid(Long executingMsid) {
- this.executingMsid = executingMsid;
- }
-
- @Override
- public Long getCompleteMsid() {
- return completeMsid;
- }
-
- @Override
- public void setCompleteMsid(Long completeMsid) {
- this.completeMsid = completeMsid;
- }
-
- @Override
- public Date getCreated() {
- return created;
- }
-
- public void setCreated(Date created) {
- this.created = created;
- }
-
- @Override
- public Date getLastUpdated() {
- return lastUpdated;
- }
-
- public void setLastUpdated(Date lastUpdated) {
- this.lastUpdated = lastUpdated;
- }
-
- @Override
- public Date getLastPolled() {
- return lastPolled;
- }
-
- public void setLastPolled(Date lastPolled) {
- this.lastPolled = lastPolled;
- }
-
- @Override
- public Date getRemoved() {
- return removed;
- }
-
- public void setRemoved(Date removed) {
- this.removed = removed;
- }
-
- @Override
- public String getInstanceType() {
- return instanceType;
- }
-
- public void setInstanceType(String instanceType) {
- this.instanceType = instanceType;
- }
-
- @Override
- public Long getInstanceId() {
- return instanceId;
- }
-
- public void setInstanceId(Long instanceId) {
- this.instanceId = instanceId;
- }
-
- @Override
- public SyncQueueItem getSyncSource() {
- return syncSource;
- }
-
- @Override
- public void setSyncSource(SyncQueueItem syncSource) {
- this.syncSource = syncSource;
- }
-
- @Override
- public String getUuid() {
- return uuid;
- }
-
- public void setUuid(String uuid) {
- this.uuid = uuid;
- }
-
- @Override
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("AsyncJobVO {id:").append(getId());
- sb.append(", userId: ").append(getUserId());
- sb.append(", accountId: ").append(getAccountId());
- sb.append(", instanceType: ").append(getInstanceType());
- sb.append(", instanceId: ").append(getInstanceId());
- sb.append(", cmd: ").append(getCmd());
- sb.append(", cmdInfo: ").append(getCmdInfo());
- sb.append(", cmdVersion: ").append(getCmdVersion());
- sb.append(", status: ").append(getStatus());
- sb.append(", processStatus: ").append(getProcessStatus());
- sb.append(", resultCode: ").append(getResultCode());
- sb.append(", result: ").append(getResult());
- sb.append(", initMsid: ").append(getInitMsid());
- sb.append(", completeMsid: ").append(getCompleteMsid());
- sb.append(", lastUpdated: ").append(getLastUpdated());
- sb.append(", lastPolled: ").append(getLastPolled());
- sb.append(", created: ").append(getCreated());
- sb.append("}");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItem.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItem.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItem.java
deleted file mode 100644
index 43aa0f6..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItem.java
+++ /dev/null
@@ -1,41 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-public interface SyncQueueItem {
- public final String AsyncJobContentType = "AsyncJob";
-
- /**
- * @return queue item id
- */
- long getId();
-
- /**
- * @return queue id
- */
- Long getQueueId();
-
- /**
- * @return subject object type pointed by the queue item
- */
- String getContentType();
-
- /**
- * @return subject object id pointed by the queue item
- */
- Long getContentId();
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItemVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItemVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItemVO.java
deleted file mode 100644
index 7a5d15f..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueItemVO.java
+++ /dev/null
@@ -1,143 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import org.apache.cloudstack.api.InternalIdentity;
-
-
-import java.util.Date;
-
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-import javax.persistence.Temporal;
-import javax.persistence.TemporalType;
-
-@Entity
-@Table(name="sync_queue_item")
-public class SyncQueueItemVO implements SyncQueueItem, InternalIdentity {
-
- @Id
- @GeneratedValue(strategy=GenerationType.IDENTITY)
- @Column(name="id")
- private Long id = null;
-
- @Column(name="queue_id")
- private Long queueId;
-
- @Column(name="content_type")
- private String contentType;
-
- @Column(name="content_id")
- private Long contentId;
-
- @Column(name="queue_proc_msid")
- private Long lastProcessMsid;
-
- @Column(name="queue_proc_number")
- private Long lastProcessNumber;
-
- @Column(name="queue_proc_time")
- @Temporal(TemporalType.TIMESTAMP)
- private Date lastProcessTime;
-
- @Column(name="created")
- private Date created;
-
- public long getId() {
- return id;
- }
-
- public void setId(Long id) {
- this.id = id;
- }
-
- @Override
- public Long getQueueId() {
- return queueId;
- }
-
- public void setQueueId(Long queueId) {
- this.queueId = queueId;
- }
-
- @Override
- public String getContentType() {
- return contentType;
- }
-
- public void setContentType(String contentType) {
- this.contentType = contentType;
- }
-
- @Override
- public Long getContentId() {
- return contentId;
- }
-
- public void setContentId(Long contentId) {
- this.contentId = contentId;
- }
-
- public Long getLastProcessMsid() {
- return lastProcessMsid;
- }
-
- public void setLastProcessMsid(Long lastProcessMsid) {
- this.lastProcessMsid = lastProcessMsid;
- }
-
- public Long getLastProcessNumber() {
- return lastProcessNumber;
- }
-
- public void setLastProcessNumber(Long lastProcessNumber) {
- this.lastProcessNumber = lastProcessNumber;
- }
-
- public Date getCreated() {
- return created;
- }
-
- public void setCreated(Date created) {
- this.created = created;
- }
-
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("SyncQueueItemVO {id:").append(getId()).append(", queueId: ").append(getQueueId());
- sb.append(", contentType: ").append(getContentType());
- sb.append(", contentId: ").append(getContentId());
- sb.append(", lastProcessMsid: ").append(getLastProcessMsid());
- sb.append(", lastprocessNumber: ").append(getLastProcessNumber());
- sb.append(", lastProcessTime: ").append(getLastProcessTime());
- sb.append(", created: ").append(getCreated());
- sb.append("}");
- return sb.toString();
- }
-
- public Date getLastProcessTime() {
- return lastProcessTime;
- }
-
- public void setLastProcessTime(Date lastProcessTime) {
- this.lastProcessTime = lastProcessTime;
- }
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManager.java
deleted file mode 100644
index f0516c8..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManager.java
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.List;
-
-import com.cloud.utils.component.Manager;
-
-public interface SyncQueueManager extends Manager {
- public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit);
- public SyncQueueItemVO dequeueFromOne(long queueId, Long msid);
- public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems);
- public void purgeItem(long queueItemId);
- public void returnItem(long queueItemId);
-
- public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
- public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
-
- void purgeAsyncJobQueueItemId(long asyncJobId);
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java
deleted file mode 100644
index 7c57076..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java
+++ /dev/null
@@ -1,258 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.inject.Inject;
-import org.apache.log4j.Logger;
-
-import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
-import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
-
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.component.ManagerBase;
-import com.cloud.utils.db.DB;
-import com.cloud.utils.db.Transaction;
-import com.cloud.utils.exception.CloudRuntimeException;
-
-public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManager {
- public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
-
- @Inject private SyncQueueDao _syncQueueDao;
- @Inject private SyncQueueItemDao _syncQueueItemDao;
-
- @Override
- @DB
- public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) {
- Transaction txn = Transaction.currentTxn();
- try {
- txn.start();
-
- _syncQueueDao.ensureQueue(syncObjType, syncObjId);
- SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
- if(queueVO == null)
- throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
-
- queueVO.setQueueSizeLimit(queueSizeLimit);
- _syncQueueDao.update(queueVO.getId(), queueVO);
-
- Date dt = DateUtil.currentGMTTime();
- SyncQueueItemVO item = new SyncQueueItemVO();
- item.setQueueId(queueVO.getId());
- item.setContentType(itemType);
- item.setContentId(itemId);
- item.setCreated(dt);
-
- _syncQueueItemDao.persist(item);
- txn.commit();
-
- return queueVO;
- } catch(Exception e) {
- s_logger.error("Unexpected exception: ", e);
- txn.rollback();
- }
- return null;
- }
-
- @Override
- @DB
- public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
-
- SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
- if(queueVO == null) {
- s_logger.error("Sync queue(id: " + queueId + ") does not exist");
- txt.commit();
- return null;
- }
-
- if(queueReadyToProcess(queueVO)) {
- SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
- if(itemVO != null) {
- Long processNumber = queueVO.getLastProcessNumber();
- if(processNumber == null)
- processNumber = new Long(1);
- else
- processNumber = processNumber + 1;
- Date dt = DateUtil.currentGMTTime();
- queueVO.setLastProcessNumber(processNumber);
- queueVO.setLastUpdated(dt);
- queueVO.setQueueSize(queueVO.getQueueSize() + 1);
- _syncQueueDao.update(queueVO.getId(), queueVO);
-
- itemVO.setLastProcessMsid(msid);
- itemVO.setLastProcessNumber(processNumber);
- itemVO.setLastProcessTime(dt);
- _syncQueueItemDao.update(itemVO.getId(), itemVO);
-
- txt.commit();
- return itemVO;
- } else {
- if(s_logger.isDebugEnabled())
- s_logger.debug("Sync queue (" + queueId + ") is currently empty");
- }
- } else {
- if(s_logger.isDebugEnabled())
- s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
- }
- txt.commit();
- } catch(Exception e) {
- s_logger.error("Unexpected exception: ", e);
- txt.rollback();
- }
-
- return null;
- }
-
- @Override
- @DB
- public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
-
- List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
-
- List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
- if(l != null && l.size() > 0) {
- for(SyncQueueItemVO item : l) {
- SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
- SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
- if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
- Long processNumber = queueVO.getLastProcessNumber();
- if(processNumber == null)
- processNumber = new Long(1);
- else
- processNumber = processNumber + 1;
-
- Date dt = DateUtil.currentGMTTime();
- queueVO.setLastProcessNumber(processNumber);
- queueVO.setLastUpdated(dt);
- queueVO.setQueueSize(queueVO.getQueueSize() + 1);
- _syncQueueDao.update(queueVO.getId(), queueVO);
-
- itemVO.setLastProcessMsid(msid);
- itemVO.setLastProcessNumber(processNumber);
- itemVO.setLastProcessTime(dt);
- _syncQueueItemDao.update(item.getId(), itemVO);
-
- resultList.add(item);
- }
- }
- }
- txt.commit();
- return resultList;
- } catch(Exception e) {
- s_logger.error("Unexpected exception: ", e);
- txt.rollback();
- }
- return null;
- }
-
- @Override
- @DB
- public void purgeItem(long queueItemId) {
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
-
- SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
- if(itemVO != null) {
- SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
-
- _syncQueueItemDao.expunge(itemVO.getId());
-
- // if item is active, reset queue information
- if (itemVO.getLastProcessMsid() != null) {
- queueVO.setLastUpdated(DateUtil.currentGMTTime());
- // decrement the count
- assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
- queueVO.setQueueSize(queueVO.getQueueSize() - 1);
- _syncQueueDao.update(queueVO.getId(), queueVO);
- }
- }
- txt.commit();
- } catch(Exception e) {
- s_logger.error("Unexpected exception: ", e);
- txt.rollback();
- }
- }
-
- @Override
- @DB
- public void returnItem(long queueItemId) {
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
-
- SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
- if(itemVO != null) {
- SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
-
- itemVO.setLastProcessMsid(null);
- itemVO.setLastProcessNumber(null);
- itemVO.setLastProcessTime(null);
- _syncQueueItemDao.update(queueItemId, itemVO);
-
- queueVO.setLastUpdated(DateUtil.currentGMTTime());
- _syncQueueDao.update(queueVO.getId(), queueVO);
- }
- txt.commit();
- } catch(Exception e) {
- s_logger.error("Unexpected exception: ", e);
- txt.rollback();
- }
- }
-
- @Override
- public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
- return _syncQueueItemDao.getActiveQueueItems(msid, exclusive);
- }
-
- @Override
- public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
- return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
- }
-
- private boolean queueReadyToProcess(SyncQueueVO queueVO) {
- return true;
-
- //
- // TODO
- //
- // Need to disable concurrency disable at queue level due to the need to support
- // job wake-up dispatching task
- //
- // Concurrency control is better done at higher level and leave the job scheduling/serializing simpler
- //
-
- // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
- }
-
- @Override
- public void purgeAsyncJobQueueItemId(long asyncJobId) {
- Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
- if (itemId != null) {
- purgeItem(itemId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java
deleted file mode 100644
index cad3462..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.cloudstack.framework.jobs;
-
-import org.apache.cloudstack.api.InternalIdentity;
-
-import java.util.Date;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-import javax.persistence.Temporal;
-import javax.persistence.TemporalType;
-
-@Entity
-@Table(name="sync_queue")
-public class SyncQueueVO implements InternalIdentity {
-
- @Id
- @GeneratedValue(strategy=GenerationType.IDENTITY)
- @Column(name="id")
- private Long id;
-
- @Column(name="sync_objtype")
-
- private String syncObjType;
-
- @Column(name="sync_objid")
- private Long syncObjId;
-
- @Column(name="queue_proc_number")
- private Long lastProcessNumber;
-
- @Column(name="created")
- @Temporal(TemporalType.TIMESTAMP)
- private Date created;
-
- @Column(name="last_updated")
- @Temporal(TemporalType.TIMESTAMP)
- private Date lastUpdated;
-
- @Column(name="queue_size")
- private long queueSize = 0;
-
- @Column(name="queue_size_limit")
- private long queueSizeLimit = 0;
-
- public long getId() {
- return id;
- }
-
- public String getSyncObjType() {
- return syncObjType;
- }
-
- public void setSyncObjType(String syncObjType) {
- this.syncObjType = syncObjType;
- }
-
- public Long getSyncObjId() {
- return syncObjId;
- }
-
- public void setSyncObjId(Long syncObjId) {
- this.syncObjId = syncObjId;
- }
-
- public Long getLastProcessNumber() {
- return lastProcessNumber;
- }
-
- public void setLastProcessNumber(Long number) {
- lastProcessNumber = number;
- }
-
- public Date getCreated() {
- return created;
- }
-
- public void setCreated(Date created) {
- this.created = created;
- }
-
- public Date getLastUpdated() {
- return lastUpdated;
- }
-
- public void setLastUpdated(Date lastUpdated) {
- this.lastUpdated = lastUpdated;
- }
-
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("SyncQueueVO {id:").append(getId());
- sb.append(", syncObjType: ").append(getSyncObjType());
- sb.append(", syncObjId: ").append(getSyncObjId());
- sb.append(", lastProcessNumber: ").append(getLastProcessNumber());
- sb.append(", lastUpdated: ").append(getLastUpdated());
- sb.append(", created: ").append(getCreated());
- sb.append(", count: ").append(getQueueSize());
- sb.append("}");
- return sb.toString();
- }
-
- public long getQueueSize() {
- return queueSize;
- }
-
- public void setQueueSize(long queueSize) {
- this.queueSize = queueSize;
- }
-
- public long getQueueSizeLimit() {
- return queueSizeLimit;
- }
-
- public void setQueueSizeLimit(long queueSizeLimit) {
- this.queueSizeLimit = queueSizeLimit;
- }
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
index 4c442c8..cfcd173 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
@@ -19,7 +19,7 @@ package org.apache.cloudstack.framework.jobs.dao;
import java.util.Date;
import java.util.List;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import com.cloud.utils.db.GenericDao;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
index 276323c..c30dbde 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
index acf1324..a9e82a7 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
@@ -18,7 +18,7 @@ package org.apache.cloudstack.framework.jobs.dao;
import java.util.List;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
import com.cloud.utils.db.GenericDao;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
index 0b41953..4cc2218 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -27,7 +27,7 @@ import java.util.TimeZone;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.GenericDaoBase;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
index e8d8287..fb6a242 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
@@ -18,7 +18,7 @@ package org.apache.cloudstack.framework.jobs.dao;
import java.util.List;
-import org.apache.cloudstack.framework.jobs.AsyncJobJournalVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
import com.cloud.utils.db.GenericDao;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
index 9d02307..d26e6ed 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
@@ -18,7 +18,7 @@ package org.apache.cloudstack.framework.jobs.dao;
import java.util.List;
-import org.apache.cloudstack.framework.jobs.AsyncJobJournalVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java
index f45245a..fa617ad 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java
@@ -16,7 +16,7 @@
// under the License.
package org.apache.cloudstack.framework.jobs.dao;
-import org.apache.cloudstack.framework.jobs.SyncQueueVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
import com.cloud.utils.db.GenericDao;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
index 05e2a73..f7d9d72 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
@@ -24,7 +24,7 @@ import java.util.TimeZone;
import org.apache.log4j.Logger;
-import org.apache.cloudstack.framework.jobs.SyncQueueVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.GenericDaoBase;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
index b78ccf7..61670bf 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
@@ -18,7 +18,7 @@ package org.apache.cloudstack.framework.jobs.dao;
import java.util.List;
-import org.apache.cloudstack.framework.jobs.SyncQueueItemVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
import com.cloud.utils.db.GenericDao;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
index 08b777c..ccb7f10 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
@@ -27,7 +27,7 @@ import java.util.TimeZone;
import org.apache.log4j.Logger;
-import org.apache.cloudstack.framework.jobs.SyncQueueItemVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.DB;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
new file mode 100644
index 0000000..0bcdc3b
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.db.GenericDao;
+
+@Entity
+@Table(name="async_job_join_map")
+public class AsyncJobJoinMapVO {
+ @Id
+ @GeneratedValue(strategy=GenerationType.IDENTITY)
+ @Column(name="id")
+ private Long id = null;
+
+ @Column(name="job_id")
+ private long jobId;
+
+ @Column(name="join_job_id")
+ private long joinJobId;
+
+ @Column(name="join_status")
+ private int joinStatus;
+
+ @Column(name="join_result", length=1024)
+ private String joinResult;
+
+ @Column(name="join_msid")
+ private long joinMsid;
+
+ @Column(name="complete_msid")
+ private Long completeMsid;
+
+ @Column(name="sync_source_id")
+ private Long syncSourceId;
+
+ @Column(name="wakeup_handler")
+ private String wakeupHandler;
+
+ @Column(name="wakeup_dispatcher")
+ private String wakeupDispatcher;
+
+ @Column(name="wakeup_interval")
+ private long wakeupInterval;
+
+ @Column(name=GenericDao.CREATED_COLUMN)
+ private Date created;
+
+ @Column(name="last_updated")
+ @Temporal(TemporalType.TIMESTAMP)
+ private Date lastUpdated;
+
+ @Column(name="next_wakeup")
+ @Temporal(TemporalType.TIMESTAMP)
+ private Date nextWakeupTime;
+
+ @Column(name="expiration")
+ @Temporal(TemporalType.TIMESTAMP)
+ private Date expiration;
+
+ public AsyncJobJoinMapVO() {
+ created = DateUtil.currentGMTTime();
+ lastUpdated = DateUtil.currentGMTTime();
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public long getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(long jobId) {
+ this.jobId = jobId;
+ }
+
+ public long getJoinJobId() {
+ return joinJobId;
+ }
+
+ public void setJoinJobId(long joinJobId) {
+ this.joinJobId = joinJobId;
+ }
+
+ public int getJoinStatus() {
+ return joinStatus;
+ }
+
+ public void setJoinStatus(int joinStatus) {
+ this.joinStatus = joinStatus;
+ }
+
+ public String getJoinResult() {
+ return joinResult;
+ }
+
+ public void setJoinResult(String joinResult) {
+ this.joinResult = joinResult;
+ }
+
+ public long getJoinMsid() {
+ return joinMsid;
+ }
+
+ public void setJoinMsid(long joinMsid) {
+ this.joinMsid = joinMsid;
+ }
+
+ public Long getCompleteMsid() {
+ return completeMsid;
+ }
+
+ public void setCompleteMsid(Long completeMsid) {
+ this.completeMsid = completeMsid;
+ }
+
+ public Date getCreated() {
+ return created;
+ }
+
+ public void setCreated(Date created) {
+ this.created = created;
+ }
+
+ public Date getLastUpdated() {
+ return lastUpdated;
+ }
+
+ public void setLastUpdated(Date lastUpdated) {
+ this.lastUpdated = lastUpdated;
+ }
+
+ public Long getSyncSourceId() {
+ return syncSourceId;
+ }
+
+ public void setSyncSourceId(Long syncSourceId) {
+ this.syncSourceId = syncSourceId;
+ }
+
+ public String getWakeupHandler() {
+ return wakeupHandler;
+ }
+
+ public void setWakeupHandler(String wakeupHandler) {
+ this.wakeupHandler = wakeupHandler;
+ }
+
+ public String getWakeupDispatcher() {
+ return wakeupDispatcher;
+ }
+
+ public void setWakeupDispatcher(String wakeupDispatcher) {
+ this.wakeupDispatcher = wakeupDispatcher;
+ }
+
+ public long getWakeupInterval() {
+ return wakeupInterval;
+ }
+
+ public void setWakeupInterval(long wakeupInterval) {
+ this.wakeupInterval = wakeupInterval;
+ }
+
+ public Date getNextWakeupTime() {
+ return nextWakeupTime;
+ }
+
+ public void setNextWakeupTime(Date nextWakeupTime) {
+ this.nextWakeupTime = nextWakeupTime;
+ }
+
+ public Date getExpiration() {
+ return expiration;
+ }
+
+ public void setExpiration(Date expiration) {
+ this.expiration = expiration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJournalVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJournalVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJournalVO.java
new file mode 100644
index 0000000..b78a7e0
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJournalVO.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJob.JournalType;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.db.GenericDao;
+
+@Entity
+@Table(name="async_job_journal")
+public class AsyncJobJournalVO {
+ @Id
+ @GeneratedValue(strategy=GenerationType.IDENTITY)
+ @Column(name="id")
+ private Long id = null;
+
+ @Column(name="job_id")
+ private long jobId;
+
+ @Column(name="journal_type", updatable=false, nullable=false, length=32)
+ @Enumerated(value=EnumType.STRING)
+ private AsyncJob.JournalType journalType;
+
+ @Column(name="journal_text", length=1024)
+ private String journalText;
+
+ @Column(name="journal_obj", length=1024)
+ private String journalObjJsonString;
+
+ @Column(name=GenericDao.CREATED_COLUMN)
+ protected Date created;
+
+ public AsyncJobJournalVO() {
+ created = DateUtil.currentGMTTime();
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public long getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(long jobId) {
+ this.jobId = jobId;
+ }
+
+ public AsyncJob.JournalType getJournalType() {
+ return journalType;
+ }
+
+ public void setJournalType(AsyncJob.JournalType journalType) {
+ this.journalType = journalType;
+ }
+
+ public String getJournalText() {
+ return journalText;
+ }
+
+ public void setJournalText(String journalText) {
+ this.journalText = journalText;
+ }
+
+ public String getJournalObjJsonString() {
+ return journalObjJsonString;
+ }
+
+ public void setJournalObjJsonString(String journalObjJsonString) {
+ this.journalObjJsonString = journalObjJsonString;
+ }
+
+ public Date getCreated() {
+ return created;
+ }
+
+ public void setCreated(Date created) {
+ this.created = created;
+ }
+}