You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/06/18 02:24:47 UTC
[2/9] git commit: updated refs/heads/vmsync to 309f8da
Removed AsyncJobConstant
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/ea6ca5ff
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/ea6ca5ff
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/ea6ca5ff
Branch: refs/heads/vmsync
Commit: ea6ca5ff5c928a61c024c242f07cc1f813e2f616
Parents: 2b96665
Author: Alex Huang <al...@gmail.com>
Authored: Fri Jun 14 19:21:25 2013 -0700
Committer: Alex Huang <al...@gmail.com>
Committed: Mon Jun 17 17:04:01 2013 -0700
----------------------------------------------------------------------
.../apache/cloudstack/api/ResponseObject.java | 1 +
api/src/org/apache/cloudstack/jobs/JobInfo.java | 8 +-
.../com/cloud/vm/VirtualMachineManagerImpl.java | 4 +-
.../src/com/cloud/vm/VmWorkJobDispatcher.java | 4 +-
.../cloudstack/vm/jobs/VmWorkJobDaoImpl.java | 2 +-
.../cloudstack/framework/jobs/AsyncJob.java | 2 +-
.../framework/jobs/AsyncJobConstants.java | 34 ------
.../jobs/AsyncJobExecutionContext.java | 25 ++---
.../framework/jobs/AsyncJobManager.java | 10 +-
.../framework/jobs/dao/AsyncJobDaoImpl.java | 12 +--
.../framework/jobs/dao/AsyncJobJoinMapDao.java | 5 +-
.../jobs/dao/AsyncJobJoinMapDaoImpl.java | 50 +++++----
.../framework/jobs/impl/AsyncJobJoinMapVO.java | 11 +-
.../framework/jobs/impl/AsyncJobMBeanImpl.java | 66 +++++++-----
.../framework/jobs/impl/AsyncJobVO.java | 16 ++-
.../com/cloud/api/ApiAsyncJobDispatcher.java | 6 +-
server/src/com/cloud/api/ApiServer.java | 2 +-
.../com/cloud/async/AsyncJobManagerImpl.java | 108 +++++++++----------
server/src/com/cloud/async/AsyncJobResult.java | 14 +--
.../consoleproxy/ConsoleProxyManagerImpl.java | 8 +-
.../com/cloud/server/ManagementServerImpl.java | 4 +-
.../secondary/SecondaryStorageManagerImpl.java | 8 +-
.../storage/snapshot/SnapshotSchedulerImpl.java | 6 +-
.../cloud/storage/upload/UploadListener.java | 10 +-
.../src/com/cloud/vm/SystemVmLoadScanner.java | 4 +-
.../com/cloud/async/TestAsyncJobManager.java | 13 ++-
.../vm/VmWorkMockVirtualMachineManagerImpl.java | 2 +-
.../cloud/vm/VmWorkTestWorkJobDispatcher.java | 2 +-
28 files changed, 215 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/api/src/org/apache/cloudstack/api/ResponseObject.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/api/ResponseObject.java b/api/src/org/apache/cloudstack/api/ResponseObject.java
index c8bd457..b696159 100644
--- a/api/src/org/apache/cloudstack/api/ResponseObject.java
+++ b/api/src/org/apache/cloudstack/api/ResponseObject.java
@@ -16,6 +16,7 @@
// under the License.
package org.apache.cloudstack.api;
+
public interface ResponseObject {
/**
* Get the name of the API response
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/api/src/org/apache/cloudstack/jobs/JobInfo.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/jobs/JobInfo.java b/api/src/org/apache/cloudstack/jobs/JobInfo.java
index bce9627..6a53123 100644
--- a/api/src/org/apache/cloudstack/jobs/JobInfo.java
+++ b/api/src/org/apache/cloudstack/jobs/JobInfo.java
@@ -22,6 +22,12 @@ import org.apache.cloudstack.api.Identity;
import org.apache.cloudstack.api.InternalIdentity;
public interface JobInfo extends Identity, InternalIdentity {
+ public enum Status {
+ IN_PROGRESS,
+ SUCCEEDED,
+ FAILED,
+ CANCELLED;
+ }
String getType();
@@ -39,7 +45,7 @@ public interface JobInfo extends Identity, InternalIdentity {
String getCmdInfo();
- int getStatus();
+ Status getStatus();
int getProcessStatus();
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 4fee49e..0f710d1 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -725,7 +725,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return true;
VmWorkJobVO workJob = _workJobDao.findById(jobId);
- if(workJob.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS)
+ if(workJob.getStatus() != JobInfo.Status.IN_PROGRESS)
return true;
return false;
@@ -1152,7 +1152,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return true;
VmWorkJobVO workJob = _workJobDao.findById(jobId);
- if(workJob.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS)
+ if(workJob.getStatus() != JobInfo.Status.IN_PROGRESS)
return true;
return false;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
index 8c7fd9c..7819c1a 100644
--- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
+++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
@@ -99,10 +99,10 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
VmWorkStop stop = (VmWorkStop)work;
_vmMgr.orchestrateStop(vm.getUuid(), stop.isForceStop());
}
- _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_SUCCEEDED, 0, null);
+ _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null);
} catch(Throwable e) {
s_logger.error("Unable to complete " + job, e);
- _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, 0, e.getMessage());
+ _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, e.getMessage());
} finally {
CallContext.unregister();
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
index 4ece4e8..0135d81 100644
--- a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
+++ b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
@@ -111,7 +111,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
public void expungeCompletedWorkJobs(Date cutDate) {
SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
sc.setParameters("lastUpdated",cutDate);
- sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
+ sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
expunge(sc);
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
index dfb67f8..be92846 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
@@ -57,7 +57,7 @@ public interface AsyncJob extends JobInfo {
String getCmdInfo();
@Override
- int getStatus();
+ Status getStatus();
@Override
int getProcessStatus();
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java
deleted file mode 100644
index 9568eb4..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.cloudstack.framework.jobs;
-
-public interface AsyncJobConstants {
- public static final int STATUS_IN_PROGRESS = 0;
- public static final int STATUS_SUCCEEDED = 1;
- public static final int STATUS_FAILED = 2;
-
- public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
- public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
-
- public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
-
- // Although we may have detailed masks for each individual wakeup event, i.e.
- // periodical timer, matched topic from message bus, it seems that we don't
- // need to distinguish them to such level. Therefore, only one wakeup signal
- // is defined
- public static final int SIGNAL_MASK_WAKEUP = 1;
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
index 3d5c326..ef0a4a6 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
@@ -22,11 +22,11 @@ import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
+import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.exception.ConcurrentOperationException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.ResourceUnavailableException;
-import com.cloud.utils.component.ComponentContext;
public class AsyncJobExecutionContext {
private AsyncJob _job;
@@ -57,10 +57,6 @@ public class AsyncJobExecutionContext {
}
public AsyncJob getJob() {
- if(_job == null) {
- _job = _jobMgr.getPseudoJob();
- }
-
return _job;
}
@@ -68,7 +64,7 @@ public class AsyncJobExecutionContext {
_job = job;
}
- public void completeAsyncJob(int jobStatus, int resultCode, Object resultObject) {
+ public void completeAsyncJob(JobInfo.Status jobStatus, int resultCode, Object resultObject) {
assert(_job != null);
_jobMgr.completeAsyncJob(_job.getId(), jobStatus, resultCode, resultObject);
}
@@ -114,7 +110,7 @@ public class AsyncJobExecutionContext {
assert(_job != null);
AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
- if(record.getJoinStatus() == AsyncJobConstants.STATUS_FAILED && record.getJoinResult() != null) {
+ if(record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) {
Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
if(exception != null && exception instanceof Exception) {
if(exception instanceof InsufficientCapacityException)
@@ -131,12 +127,12 @@ public class AsyncJobExecutionContext {
_jobMgr.disjoinJob(_job.getId(), joinedJobId);
}
- public void completeJoin(int joinStatus, String joinResult) {
+ public void completeJoin(JobInfo.Status joinStatus, String joinResult) {
assert(_job != null);
_jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
}
- public void completeJobAndJoin(int joinStatus, String joinResult) {
+ public void completeJobAndJoin(JobInfo.Status joinStatus, String joinResult) {
assert(_job != null);
_jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
_jobMgr.completeAsyncJob(_job.getId(), joinStatus, 0, null);
@@ -144,21 +140,14 @@ public class AsyncJobExecutionContext {
public static AsyncJobExecutionContext getCurrentExecutionContext() {
AsyncJobExecutionContext context = s_currentExectionContext.get();
- if(context == null) {
- context = new AsyncJobExecutionContext();
- context = ComponentContext.inject(context);
- context.getJob();
- setCurrentExecutionContext(context);
- }
-
return context;
}
- public static AsyncJobExecutionContext registerPseudoExecutionContext() {
+ public static AsyncJobExecutionContext registerPseudoExecutionContext(long accountId, long userId) {
AsyncJobExecutionContext context = s_currentExectionContext.get();
if (context == null) {
context = new AsyncJobExecutionContext();
- context.getJob();
+ context.setJob(_jobMgr.getPseudoJob(accountId, userId));
setCurrentExecutionContext(context);
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
index e577fb0..3178268 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
@@ -20,6 +20,7 @@ import java.util.List;
import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.Predicate;
import com.cloud.utils.component.Manager;
@@ -31,12 +32,9 @@ public interface AsyncJobManager extends Manager {
List<? extends AsyncJob> findInstancePendingAsyncJobs(String instanceType, Long accountId);
long submitAsyncJob(AsyncJob job);
- long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext);
long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId);
-// AsyncJobResult queryAsyncJobResult(long jobId);
-
- void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject);
+ void completeAsyncJob(long jobId, JobInfo.Status jobStatus, int resultCode, Object resultObject);
void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject);
void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
@@ -50,7 +48,7 @@ public interface AsyncJobManager extends Manager {
*
* @return pseudo job for the thread
*/
- AsyncJob getPseudoJob();
+ AsyncJob getPseudoJob(long accountId, long userId);
/**
* Used by upper level job to wait for completion of a down-level job (usually VmWork jobs)
@@ -101,7 +99,7 @@ public interface AsyncJobManager extends Manager {
* for legacy code to work. To help pass exception object easier, we use
* object-stream based serialization instead of GSON
*/
- void completeJoin(long joinJobId, int joinStatus, String joinResult);
+ void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult);
void releaseSyncSource();
void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
index c30dbde..96775f7 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -23,8 +23,8 @@ import java.util.List;
import org.apache.log4j.Logger;
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
@@ -100,7 +100,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
sc.setParameters("instanceType", instanceType);
sc.setParameters("instanceId", instanceId);
- sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
+ sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
List<AsyncJobVO> l = listIncludingRemovedBy(sc);
if(l != null && l.size() > 0) {
@@ -121,7 +121,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
if (accountId != null) {
sc.setParameters("accountId", accountId);
}
- sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
+ sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
return listBy(sc);
}
@@ -129,8 +129,8 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
@Override
public AsyncJobVO findPseudoJob(long threadId, long msid) {
SearchCriteria<AsyncJobVO> sc = pseudoJobSearch.create();
- sc.setParameters("jobDispatcher", AsyncJobConstants.JOB_DISPATCHER_PSEUDO);
- sc.setParameters("instanceType", AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE);
+ sc.setParameters("jobDispatcher", AsyncJobVO.JOB_DISPATCHER_PSEUDO);
+ sc.setParameters("instanceType", AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
sc.setParameters("instanceId", threadId);
List<AsyncJobVO> result = listBy(sc);
@@ -178,7 +178,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
@Override
@DB
public void resetJobProcess(long msid, int jobResultCode, String jobResultMessage) {
- String sql = "UPDATE async_job SET job_status=" + AsyncJobConstants.STATUS_FAILED + ", job_result_code=" + jobResultCode
+ String sql = "UPDATE async_job SET job_status=" + JobInfo.Status.FAILED + ", job_result_code=" + jobResultCode
+ ", job_result='" + jobResultMessage + "' where job_status=0 AND (job_complete_msid=? OR (job_complete_msid IS NULL AND job_init_msid=?))";
Transaction txn = Transaction.currentTxn();
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
index a9e82a7..9c993f1 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
@@ -19,12 +19,13 @@ package org.apache.cloudstack.framework.jobs.dao;
import java.util.List;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.db.GenericDao;
public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long> {
- Long joinJob(long jobId, long joinJobId, long joinMsid,
+ Long joinJob(long jobId, long joinJobId, long joinMsid,
long wakeupIntervalMs, long expirationMs,
Long syncSourceId, String wakeupHandler, String wakeupDispatcher);
void disjoinJob(long jobId, long joinedJobId);
@@ -33,7 +34,7 @@ public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long>
AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId);
List<AsyncJobJoinMapVO> listJoinRecords(long jobId);
- void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid);
+ void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult, long completeMsid);
List<Long> wakeupScan();
List<Long> wakeupByJoinedJobCompletion(long joinedJobId);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
index 4cc2218..60dea03 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -26,23 +26,23 @@ import java.util.TimeZone;
import org.apache.log4j.Logger;
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.UpdateBuilder;
-import com.cloud.utils.db.SearchCriteria.Op;
public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Long> implements AsyncJobJoinMapDao {
public static final Logger s_logger = Logger.getLogger(AsyncJobJoinMapDaoImpl.class);
- private final SearchBuilder<AsyncJobJoinMapVO> RecordSearch;
- private final SearchBuilder<AsyncJobJoinMapVO> RecordSearchByOwner;
- private final SearchBuilder<AsyncJobJoinMapVO> CompleteJoinSearch;
+ private final SearchBuilder<AsyncJobJoinMapVO> RecordSearch;
+ private final SearchBuilder<AsyncJobJoinMapVO> RecordSearchByOwner;
+ private final SearchBuilder<AsyncJobJoinMapVO> CompleteJoinSearch;
private final SearchBuilder<AsyncJobJoinMapVO> WakeupSearch;
public AsyncJobJoinMapDaoImpl() {
@@ -66,7 +66,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
WakeupSearch.done();
}
- public Long joinJob(long jobId, long joinJobId, long joinMsid,
+ @Override
+ public Long joinJob(long jobId, long joinJobId, long joinMsid,
long wakeupIntervalMs, long expirationMs,
Long syncSourceId, String wakeupHandler, String wakeupDispatcher) {
@@ -74,7 +75,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
record.setJobId(jobId);
record.setJoinJobId(joinJobId);
record.setJoinMsid(joinMsid);
- record.setJoinStatus(AsyncJobConstants.STATUS_IN_PROGRESS);
+ record.setJoinStatus(JobInfo.Status.IN_PROGRESS);
record.setSyncSourceId(syncSourceId);
record.setWakeupInterval(wakeupIntervalMs / 1000); // convert millisecond to second
record.setWakeupHandler(wakeupHandler);
@@ -84,11 +85,12 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
record.setExpiration(new Date(DateUtil.currentGMTTime().getTime() + expirationMs));
}
- this.persist(record);
+ persist(record);
return record.getId();
}
- public void disjoinJob(long jobId, long joinedJobId) {
+ @Override
+ public void disjoinJob(long jobId, long joinedJobId) {
SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
sc.setParameters("jobId", jobId);
sc.setParameters("joinJobId", joinedJobId);
@@ -96,14 +98,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
this.expunge(sc);
}
- public void disjoinAllJobs(long jobId) {
+ @Override
+ public void disjoinAllJobs(long jobId) {
SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearchByOwner.create();
sc.setParameters("jobId", jobId);
this.expunge(sc);
}
- public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) {
+ @Override
+ public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) {
SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
sc.setParameters("jobId", jobId);
sc.setParameters("joinJobId", joinJobId);
@@ -117,14 +121,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
return null;
}
- public List<AsyncJobJoinMapVO> listJoinRecords(long jobId) {
+ @Override
+ public List<AsyncJobJoinMapVO> listJoinRecords(long jobId) {
SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearchByOwner.create();
sc.setParameters("jobId", jobId);
return this.listBy(sc);
}
- public void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid) {
+ @Override
+ public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult, long completeMsid) {
AsyncJobJoinMapVO record = createForUpdate();
record.setJoinStatus(joinStatus);
record.setJoinResult(joinResult);
@@ -138,7 +144,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
update(ub, sc, null);
}
- public List<Long> wakeupScan() {
+ @Override
+ public List<Long> wakeupScan() {
List<Long> standaloneList = new ArrayList<Long>();
Date cutDate = DateUtil.currentGMTTime();
@@ -149,9 +156,9 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
txn.start();
//
- // performance sensitive processing, do it in plain SQL
+ // performance sensitive processing, do it in plain SQL
//
- String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
+ String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
pstmt = txn.prepareStatement(sql);
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
@@ -159,7 +166,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
pstmt.executeUpdate();
pstmt.close();
- sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
+ sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
pstmt = txn.prepareStatement(sql);
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
@@ -194,7 +201,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
return standaloneList;
}
- public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
+ @Override
+ public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
List<Long> standaloneList = new ArrayList<Long>();
Transaction txn = Transaction.currentTxn();
@@ -203,16 +211,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
txn.start();
//
- // performance sensitive processing, do it in plain SQL
+ // performance sensitive processing, do it in plain SQL
//
- String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
+ String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
"(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
pstmt = txn.prepareStatement(sql);
pstmt.setLong(1, joinedJobId);
pstmt.executeUpdate();
pstmt.close();
- sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
+ sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
"(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
pstmt = txn.prepareStatement(sql);
pstmt.setLong(1, joinedJobId);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
index 0bcdc3b..287121f 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobJoinMapVO.java
@@ -20,6 +20,8 @@ import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@@ -27,6 +29,8 @@ import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
+import org.apache.cloudstack.jobs.JobInfo;
+
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.GenericDao;
@@ -45,7 +49,8 @@ public class AsyncJobJoinMapVO {
private long joinJobId;
@Column(name="join_status")
- private int joinStatus;
+ @Enumerated(EnumType.ORDINAL)
+ private JobInfo.Status joinStatus;
@Column(name="join_result", length=1024)
private String joinResult;
@@ -112,11 +117,11 @@ public class AsyncJobJoinMapVO {
this.joinJobId = joinJobId;
}
- public int getJoinStatus() {
+ public JobInfo.Status getJoinStatus() {
return joinStatus;
}
- public void setJoinStatus(int joinStatus) {
+ public void setJoinStatus(JobInfo.Status joinStatus) {
this.joinStatus = joinStatus;
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
index 95f01e5..0a48da3 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
@@ -22,13 +22,12 @@ import java.util.TimeZone;
import javax.management.StandardMBean;
import org.apache.cloudstack.framework.jobs.AsyncJob;
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobMBean;
import com.cloud.utils.DateUtil;
public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
- private AsyncJob _job;
+ private final AsyncJob _job;
public AsyncJobMBeanImpl(AsyncJob job) {
super(AsyncJobMBean.class, false);
@@ -36,84 +35,100 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
_job = job;
}
- public long getAccountId() {
+ @Override
+ public long getAccountId() {
return _job.getAccountId();
}
- public long getUserId() {
+ @Override
+ public long getUserId() {
return _job.getUserId();
}
- public String getCmd() {
+ @Override
+ public String getCmd() {
return _job.getCmd();
}
- public String getCmdInfo() {
+ @Override
+ public String getCmdInfo() {
return _job.getCmdInfo();
}
- public String getStatus() {
- int jobStatus = _job.getStatus();
- switch(jobStatus) {
- case AsyncJobConstants.STATUS_SUCCEEDED :
+ @Override
+ public String getStatus() {
+ switch (_job.getStatus()) {
+ case SUCCEEDED:
return "Completed";
- case AsyncJobConstants.STATUS_IN_PROGRESS:
+ case IN_PROGRESS:
return "In preogress";
- case AsyncJobConstants.STATUS_FAILED:
+ case FAILED:
return "failed";
+
+ case CANCELLED:
+ return "cancelled";
}
return "Unknow";
}
- public int getProcessStatus() {
+ @Override
+ public int getProcessStatus() {
return _job.getProcessStatus();
}
- public int getResultCode() {
+ @Override
+ public int getResultCode() {
return _job.getResultCode();
}
- public String getResult() {
+ @Override
+ public String getResult() {
return _job.getResult();
}
- public String getInstanceType() {
+ @Override
+ public String getInstanceType() {
if(_job.getInstanceType() != null)
return _job.getInstanceType().toString();
return "N/A";
}
- public String getInstanceId() {
+ @Override
+ public String getInstanceId() {
if(_job.getInstanceId() != null)
return String.valueOf(_job.getInstanceId());
return "N/A";
}
- public String getInitMsid() {
+ @Override
+ public String getInitMsid() {
if(_job.getInitMsid() != null) {
return String.valueOf(_job.getInitMsid());
}
return "N/A";
}
- public String getCreateTime() {
+ @Override
+ public String getCreateTime() {
Date time = _job.getCreated();
if(time != null)
return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
return "N/A";
}
- public String getLastUpdateTime() {
+ @Override
+ public String getLastUpdateTime() {
Date time = _job.getLastUpdated();
if(time != null)
return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
return "N/A";
}
- public String getLastPollTime() {
+ @Override
+ public String getLastPollTime() {
Date time = _job.getLastPolled();
if(time != null)
@@ -121,7 +136,8 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
return "N/A";
}
- public String getSyncQueueId() {
+ @Override
+ public String getSyncQueueId() {
SyncQueueItem item = _job.getSyncSource();
if(item != null && item.getQueueId() != null) {
return String.valueOf(item.getQueueId());
@@ -129,7 +145,8 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
return "N/A";
}
- public String getSyncQueueContentType() {
+ @Override
+ public String getSyncQueueContentType() {
SyncQueueItem item = _job.getSyncSource();
if(item != null) {
return item.getContentType();
@@ -137,7 +154,8 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
return "N/A";
}
- public String getSyncQueueContentId() {
+ @Override
+ public String getSyncQueueContentId() {
SyncQueueItem item = _job.getSyncSource();
if(item != null && item.getContentId() != null) {
return String.valueOf(item.getContentId());
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
index 0e103b9..cab047b 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
@@ -23,6 +23,8 @@ import javax.persistence.Column;
import javax.persistence.DiscriminatorColumn;
import javax.persistence.DiscriminatorType;
import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@@ -44,11 +46,14 @@ import com.cloud.utils.db.GenericDao;
@Inheritance(strategy=InheritanceType.JOINED)
@DiscriminatorColumn(name="job_type", discriminatorType=DiscriminatorType.STRING, length=32)
public class AsyncJobVO implements AsyncJob, JobInfo {
+
+ public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
+ public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@Column(name="id")
- private Long id = null;
+ private long id;
@Column(name="job_type", length=32)
protected String type;
@@ -78,7 +83,8 @@ public class AsyncJobVO implements AsyncJob, JobInfo {
private String cmdInfo;
@Column(name="job_status")
- private int status;
+ @Enumerated(value = EnumType.ORDINAL)
+ private Status status;
@Column(name="job_process_status")
private int processStatus;
@@ -145,7 +151,7 @@ public class AsyncJobVO implements AsyncJob, JobInfo {
return id;
}
- public void setId(Long id) {
+ public void setId(long id) {
this.id = id;
}
@@ -236,11 +242,11 @@ public class AsyncJobVO implements AsyncJob, JobInfo {
}
@Override
- public int getStatus() {
+ public Status getStatus() {
return status;
}
- public void setStatus(int status) {
+ public void setStatus(Status status) {
this.status = status;
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
index c442559..e0823dc 100644
--- a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
+++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
@@ -33,9 +33,9 @@ import org.apache.cloudstack.api.ServerApiException;
import org.apache.cloudstack.api.response.ExceptionResponse;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJob;
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.user.Account;
import com.cloud.user.dao.AccountDao;
@@ -93,7 +93,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
_dispatcher.dispatch(cmdObj, params, true);
// serialize this to the async job table
- _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject());
+ _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, cmdObj.getResponseObject());
} finally {
CallContext.unregister();
}
@@ -116,7 +116,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
// FIXME: setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling
// and we need to preserve that as much as possible here
- _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response);
+ _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response);
}
}
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/api/ApiServer.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java
index ae5be33..be161c6 100755
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@ -605,7 +605,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
if (response.getObjectId() != null && objectJobMap.containsKey(response.getObjectId())) {
AsyncJob job = objectJobMap.get(response.getObjectId());
response.setJobId(job.getUuid());
- response.setJobStatus(job.getStatus());
+ response.setJobStatus(job.getStatus().ordinal());
}
}
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 332587a..2e3a9a0 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -38,9 +38,9 @@ import org.apache.log4j.Logger;
import org.apache.cloudstack.api.ApiErrorCode;
import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.cloudstack.api.response.ExceptionResponse;
+import org.apache.cloudstack.config.ConfigRepo;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJob;
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
@@ -60,11 +60,14 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDetector;
import org.apache.cloudstack.framework.messagebus.PublishScope;
+import org.apache.cloudstack.jobs.JobInfo;
+import org.apache.cloudstack.jobs.JobInfo.Status;
import com.cloud.api.ApiSerializerHelper;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHost;
+import com.cloud.cluster.ManagementServerNode;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.InvalidParameterValueException;
@@ -76,7 +79,6 @@ import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Predicate;
import com.cloud.utils.PropertiesUtil;
-import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
@@ -87,11 +89,16 @@ import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.ExceptionUtil;
import com.cloud.utils.mgmt.JmxUtil;
-import com.cloud.utils.net.MacAddress;
public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener {
- public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
+ private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
+ private static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
+ // Although we may have detailed masks for each individual wakeup event, i.e.
+ // periodical timer, matched topic from message bus, it seems that we don't
+ // need to distinguish them to such level. Therefore, only one wakeup signal
+ // is defined
+ public static final int SIGNAL_MASK_WAKEUP = 1;
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
private static final int HEARTBEAT_INTERVAL = 2000;
@@ -107,6 +114,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Inject private List<AsyncJobDispatcher> _jobDispatchers;
@Inject private MessageBus _messageBus;
@Inject private AsyncJobMonitor _jobMonitor;
+ @Inject
+ private ConfigRepo _configRepo;
private long _jobExpireSeconds = 86400; // 1 day
private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs)
@@ -128,15 +137,15 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@Override @DB
- public AsyncJob getPseudoJob() {
+ public AsyncJob getPseudoJob(long accountId, long userId) {
AsyncJobVO job = _jobDao.findPseudoJob(Thread.currentThread().getId(), getMsid());
if(job == null) {
job = new AsyncJobVO();
- job.setAccountId(_accountMgr.getSystemAccount().getId());
- job.setUserId(_accountMgr.getSystemUser().getId());
+ job.setAccountId(accountId);
+ job.setUserId(userId);
job.setInitMsid(getMsid());
- job.setDispatcher(AsyncJobConstants.JOB_DISPATCHER_PSEUDO);
- job.setInstanceType(AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE);
+ job.setDispatcher(AsyncJobVO.JOB_DISPATCHER_PSEUDO);
+ job.setInstanceType(AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
job.setInstanceId(Thread.currentThread().getId());
_jobDao.persist(job);
}
@@ -149,31 +158,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@SuppressWarnings("unchecked")
- @Override @DB
+ @DB
public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext) {
- Transaction txt = Transaction.currentTxn();
- try {
- @SuppressWarnings("rawtypes")
- GenericDao dao = GenericDaoBase.getDao(job.getClass());
-
- txt.start();
- job.setInitMsid(getMsid());
- dao.persist(job);
- txt.commit();
-
- // no sync source originally
- job.setSyncSource(null);
- scheduleExecution(job, scheduleJobExecutionInContext);
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
- }
- return job.getId();
- } catch(Exception e) {
- txt.rollback();
- String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception.";
- s_logger.warn(errMsg, e);
- throw new CloudRuntimeException(errMsg);
+ @SuppressWarnings("rawtypes")
+ GenericDao dao = GenericDaoBase.getDao(job.getClass());
+ job.setInitMsid(getMsid());
+ job.setSyncSource(null); // no sync source originally
+ dao.persist(job);
+
+ scheduleExecution(job, scheduleJobExecutionInContext);
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
}
+ return job.getId();
}
@SuppressWarnings("unchecked")
@@ -199,7 +196,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@Override @DB
- public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject) {
+ public void completeAsyncJob(long jobId, Status jobStatus, int resultCode, Object resultObject) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus +
", resultCode: " + resultCode + ", result: " + resultObject);
@@ -219,7 +216,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
return;
}
- if(job.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS) {
+ if(job.getStatus() != JobInfo.Status.IN_PROGRESS) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("job-" + jobId + " is already completed.");
}
@@ -251,7 +248,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
for(Long id : wakeupList) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO jobToWakeup = _jobDao.findById(id);
- if(jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0)
+ if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(jobToWakeup, false);
}
@@ -358,7 +355,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@Override @DB
- public void completeJoin(long joinJobId, int joinStatus, String joinResult) {
+ public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult) {
_joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid());
}
@@ -436,8 +433,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
jobResult.setResultCode(job.getResultCode());
jobResult.setUuid(job.getUuid());
- if(job.getStatus() == AsyncJobConstants.STATUS_SUCCEEDED ||
- job.getStatus() == AsyncJobConstants.STATUS_FAILED) {
+ if(job.getStatus() == JobInfo.Status.SUCCEEDED ||
+ job.getStatus() == JobInfo.Status.FAILED) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Async job-" + jobId + " completed");
@@ -451,14 +448,14 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
s_logger.debug("Async job-" + jobId + " does not exist, invalid job id?");
}
- jobResult.setJobStatus(AsyncJobConstants.STATUS_FAILED);
+ jobResult.setJobStatus(JobInfo.Status.FAILED);
jobResult.setResult("job-" + jobId + " does not exist");
}
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception while querying async job-" + jobId + " status: ", e);
- jobResult.setJobStatus(AsyncJobConstants.STATUS_FAILED);
+ jobResult.setJobStatus(JobInfo.Status.FAILED);
jobResult.setResult("Exception: " + e.toString());
txt.rollback();
}
@@ -475,7 +472,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
private void scheduleExecution(final AsyncJob job, boolean executeInContext) {
- Runnable runnable = getExecutorRunnable(this, job);
+ Runnable runnable = getExecutorRunnable(job);
if (executeInContext) {
runnable.run();
} else {
@@ -516,7 +513,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
}
- private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJob job) {
+ private Runnable getExecutorRunnable(final AsyncJob job) {
return new Runnable() {
@Override
public void run() {
@@ -539,20 +536,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
_jobMonitor.registerActiveTask(runNumber, job.getId());
- AsyncJobExecutionContext.setCurrentExecutionContext((AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job))
- );
+ AsyncJobExecutionContext.setCurrentExecutionContext(new AsyncJobExecutionContext(job));
// execute the job
if(s_logger.isDebugEnabled()) {
s_logger.debug("Executing " + job);
}
- if((getAndResetPendingSignals(job) & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0) {
+ if ((getAndResetPendingSignals(job) & SIGNAL_MASK_WAKEUP) != 0) {
AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
if(jobDispatcher != null) {
jobDispatcher.runJob(job);
} else {
- s_logger.error("Unable to find a wakeup dispatcher from the joined job. job-" + job.getId());
+ s_logger.error("Unable to find a wakeup dispatcher from the joined job: " + job);
}
} else {
AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher());
@@ -560,7 +556,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
jobDispatcher.runJob(job);
} else {
s_logger.error("Unable to find job dispatcher, job will be cancelled");
- completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
+ completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
}
}
@@ -570,7 +566,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
} catch (Throwable e) {
s_logger.error("Unexpected exception", e);
- completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
+ completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
} finally {
// guard final clause as well
try {
@@ -598,7 +594,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
//
// clean execution environment
//
- AsyncJobExecutionContext.setCurrentExecutionContext(null);
+ AsyncJobExecutionContext.unregister();
_jobMonitor.unregisterActiveTask(runNumber);
} catch(Throwable e) {
@@ -727,7 +723,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
for(Long jobId : standaloneWakeupJobs) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO job = _jobDao.findById(jobId);
- if(job != null && (job.getPendingSignals() & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0)
+ if (job != null && (job.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(job, false);
}
} catch(Throwable e) {
@@ -789,7 +785,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
if(blockItems != null && blockItems.size() > 0) {
for(SyncQueueItemVO item : blockItems) {
if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
- completeAsyncJob(item.getContentId(), AsyncJobConstants.STATUS_FAILED, 0,
+ completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0,
getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
}
@@ -819,11 +815,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
private long getMsid() {
- if(_clusterMgr != null) {
- return _clusterMgr.getManagementNodeId();
- }
-
- return MacAddress.getMacAddress().toLong();
+ return ManagementServerNode.getManagementServerId();
}
private void cleanupPendingJobs(List<SyncQueueItemVO> l) {
@@ -838,7 +830,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
Long jobId = item.getContentId();
if(jobId != null) {
s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
- completeAsyncJob(jobId, AsyncJobConstants.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
+ completeAsyncJob(jobId, JobInfo.Status.FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
}
}
_queueMgr.purgeItem(item.getId());
@@ -864,7 +856,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
int poolSize = (cloudMaxActive * 2) / 3;
s_logger.info("Start AsyncJobManager thread pool in size " + poolSize);
- _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobConstants.JOB_POOL_THREAD_PREFIX));
+ _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(JOB_POOL_THREAD_PREFIX));
} catch (final Exception e) {
throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/async/AsyncJobResult.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobResult.java b/server/src/com/cloud/async/AsyncJobResult.java
index 783655e..d71e64b 100644
--- a/server/src/com/cloud/async/AsyncJobResult.java
+++ b/server/src/com/cloud/async/AsyncJobResult.java
@@ -16,14 +16,14 @@
// under the License.
package com.cloud.async;
-import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.api.ApiSerializerHelper;
public class AsyncJobResult {
private long jobId;
- private int jobStatus;
+ private JobInfo.Status jobStatus;
private int processStatus;
private int resultCode;
private String result;
@@ -31,7 +31,7 @@ public class AsyncJobResult {
public AsyncJobResult(long jobId) {
this.jobId = jobId;
- jobStatus = AsyncJobConstants.STATUS_IN_PROGRESS;
+ jobStatus = JobInfo.Status.IN_PROGRESS;
processStatus = 0;
resultCode = 0;
result = "";
@@ -46,18 +46,18 @@ public class AsyncJobResult {
}
public String getUuid() {
- return this.uuid;
+ return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
- public int getJobStatus() {
+ public JobInfo.Status getJobStatus() {
return jobStatus;
}
- public void setJobStatus(int jobStatus) {
+ public void setJobStatus(JobInfo.Status jobStatus) {
this.jobStatus = jobStatus;
}
@@ -97,7 +97,7 @@ public class AsyncJobResult {
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("AsyncJobResult {jobId:").append(getJobId());
- sb.append(", jobStatus: ").append(getJobStatus());
+ sb.append(", jobStatus: ").append(getJobStatus().ordinal());
sb.append(", processStatus: ").append(getProcessStatus());
sb.append(", resultCode: ").append(getResultCode());
sb.append(", result: ").append(result);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
index f6bddf0..513a713 100755
--- a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
+++ b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
@@ -1728,11 +1728,11 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy
// _itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(),
// user, account, ((VmWorkStart)work).getPlan());
//
-// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
+// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
// } catch(Exception e) {
// s_logger.error("Exception in process VM-start work", e);
// String result = SerializerHelper.toObjectSerializedString(e);
-// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
+// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
// }
// }
//
@@ -1748,11 +1748,11 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy
// try {
// _itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account);
//
-// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
+// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
// } catch(Exception e) {
// s_logger.error("Exception in process VM-stop work", e);
// String result = SerializerHelper.toObjectSerializedString(e);
-// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
+// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
// }
// }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/server/ManagementServerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/server/ManagementServerImpl.java b/server/src/com/cloud/server/ManagementServerImpl.java
index 6de8840..9fc4645 100755
--- a/server/src/com/cloud/server/ManagementServerImpl.java
+++ b/server/src/com/cloud/server/ManagementServerImpl.java
@@ -3469,7 +3469,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
if (asyncExecutionContext != null) {
job = asyncExecutionContext.getJob();
_asyncMgr.updateAsyncJobAttachment(job.getId(), Upload.Type.VOLUME.toString(), volumeId);
- _asyncMgr.updateAsyncJobStatus(job.getId(), AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
+ _asyncMgr.updateAsyncJobStatus(job.getId(), JobInfo.Status.IN_PROGRESS, resultObj);
}
String value = _configs.get(Config.CopyVolumeWait.toString());
int copyvolumewait = NumbersUtil.parseInt(value, Integer.parseInt(Config.CopyVolumeWait.getDefaultValue()));
@@ -3490,7 +3490,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
resultObj.setResultString(errorString);
resultObj.setUploadStatus(UploadVO.Status.COPY_ERROR.toString());
if (asyncExecutionContext != null) {
- _asyncMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, 0, resultObj);
+ _asyncMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, resultObj);
}
// Update the DB that volume couldn't be copied
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
index 562db28..1324ac7 100755
--- a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
+++ b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
@@ -1462,11 +1462,11 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
// _itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(),
// user, account, ((VmWorkStart)work).getPlan());
//
-// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
+// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
// } catch(Exception e) {
// s_logger.error("Exception in process VM-start work", e);
// String result = SerializerHelper.toObjectSerializedString(e);
-// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
+// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
// }
// }
//
@@ -1482,11 +1482,11 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
// try {
// _itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account);
//
-// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
+// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
// } catch(Exception e) {
// s_logger.error("Exception in process VM-stop work", e);
// String result = SerializerHelper.toObjectSerializedString(e);
-// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
+// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
// }
// }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
index c6dd772..607e39b 100644
--- a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
+++ b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
@@ -143,14 +143,14 @@ public class SnapshotSchedulerImpl extends ManagerBase implements SnapshotSchedu
Long asyncJobId = snapshotSchedule.getAsyncJobId();
AsyncJobVO asyncJob = _asyncJobDao.findById(asyncJobId);
switch (asyncJob.getStatus()) {
- case AsyncJobConstants.STATUS_SUCCEEDED:
+ case JobInfo.Status.SUCCEEDED:
// The snapshot has been successfully backed up.
// The snapshot state has also been cleaned up.
// We can schedule the next job for this snapshot.
// Remove the existing entry in the snapshot_schedule table.
scheduleNextSnapshotJob(snapshotSchedule);
break;
- case AsyncJobConstants.STATUS_FAILED:
+ case JobInfo.Status.FAILED:
// Check the snapshot status.
Long snapshotId = snapshotSchedule.getSnapshotId();
if (snapshotId == null) {
@@ -188,7 +188,7 @@ public class SnapshotSchedulerImpl extends ManagerBase implements SnapshotSchedu
}
break;
- case AsyncJobConstants.STATUS_IN_PROGRESS:
+ case JobInfo.Status.IN_PROGRESS:
// There is no way of knowing from here whether
// 1) Another management server is processing this snapshot job
// 2) The management server has crashed and this snapshot is lying
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/storage/upload/UploadListener.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/upload/UploadListener.java b/server/src/com/cloud/storage/upload/UploadListener.java
index b66624d..b6a9288 100755
--- a/server/src/com/cloud/storage/upload/UploadListener.java
+++ b/server/src/com/cloud/storage/upload/UploadListener.java
@@ -365,7 +365,7 @@ public class UploadListener implements Listener {
resultObj.setResultString(uploadErrorString);
resultObj.setState(state.toString());
asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
- asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
+ asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
UploadVO vo = uploadDao.createForUpdate();
vo.setUploadState(state);
@@ -378,7 +378,7 @@ public class UploadListener implements Listener {
resultObj.setResultString(uploadErrorString);
resultObj.setState(state.toString());
asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
- asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
+ asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
UploadVO vo = uploadDao.createForUpdate();
@@ -407,12 +407,12 @@ public class UploadListener implements Listener {
if (answer.getUploadStatus() == Status.UPLOAD_IN_PROGRESS){
asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
- asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
+ asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
}else if(answer.getUploadStatus() == Status.UPLOADED){
resultObj.setResultString("Success");
- asyncMgr.completeAsyncJob(asyncJobId, AsyncJobConstants.STATUS_SUCCEEDED, 1, resultObj);
+ asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.SUCCEEDED, 1, resultObj);
}else{
- asyncMgr.completeAsyncJob(asyncJobId, AsyncJobConstants.STATUS_FAILED, 2, resultObj);
+ asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.FAILED, 2, resultObj);
}
UploadVO updateBuilder = uploadDao.createForUpdate();
updateBuilder.setUploadPercent(answer.getUploadPct());
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/src/com/cloud/vm/SystemVmLoadScanner.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/SystemVmLoadScanner.java b/server/src/com/cloud/vm/SystemVmLoadScanner.java
index 704129d..4d378ce 100644
--- a/server/src/com/cloud/vm/SystemVmLoadScanner.java
+++ b/server/src/com/cloud/vm/SystemVmLoadScanner.java
@@ -71,8 +71,8 @@ public class SystemVmLoadScanner<T> {
@Override
public void run() {
try {
- CallContext.registerSystemCallContextOnceOnly();
- AsyncJobExecutionContext.registerPseudoExecutionContext();
+ CallContext cc = CallContext.registerSystemCallContextOnceOnly();
+ AsyncJobExecutionContext.registerPseudoExecutionContext(cc.getCallingAccountId(), cc.getCallingUserId());
} catch (Exception e) {
s_logger.fatal("Unable to start the capacity scan task", e);
System.exit(1);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/test/com/cloud/async/TestAsyncJobManager.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java
index 320c68f..419f5fa 100644
--- a/server/test/com/cloud/async/TestAsyncJobManager.java
+++ b/server/test/com/cloud/async/TestAsyncJobManager.java
@@ -48,10 +48,13 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.PublishScope;
+import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.cluster.ClusterManager;
+import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.user.AccountVO;
+import com.cloud.user.User;
import com.cloud.user.UserVO;
import com.cloud.utils.Predicate;
import com.cloud.utils.component.ComponentContext;
@@ -142,21 +145,21 @@ public class TestAsyncJobManager extends TestCase {
AsyncJobJoinMapVO record = joinMapDao.getJoinRecord(2, 1);
Assert.assertTrue(record != null);
Assert.assertTrue(record.getJoinMsid() == 100);
- Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_IN_PROGRESS);
+ Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.IN_PROGRESS);
- joinMapDao.completeJoin(1, AsyncJobConstants.STATUS_SUCCEEDED, "Done", 101);
+ joinMapDao.completeJoin(1, JobInfo.Status.SUCCEEDED, "Done", 101);
record = joinMapDao.getJoinRecord(2, 1);
Assert.assertTrue(record != null);
Assert.assertTrue(record.getJoinMsid() == 100);
- Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_SUCCEEDED);
+ Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.SUCCEEDED);
Assert.assertTrue(record.getJoinResult().equals("Done"));
Assert.assertTrue(record.getCompleteMsid() == 101);
record = joinMapDao.getJoinRecord(3, 1);
Assert.assertTrue(record != null);
Assert.assertTrue(record.getJoinMsid() == 100);
- Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_SUCCEEDED);
+ Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.SUCCEEDED);
Assert.assertTrue(record.getJoinResult().equals("Done"));
Assert.assertTrue(record.getCompleteMsid() == 101);
@@ -198,7 +201,7 @@ public class TestAsyncJobManager extends TestCase {
@Test
public void testPseudoJob() {
- AsyncJob job = asyncMgr.getPseudoJob();
+ AsyncJob job = asyncMgr.getPseudoJob(Account.ACCOUNT_ID_SYSTEM, User.UID_SYSTEM);
Assert.assertTrue(job.getInstanceType().equals(AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE));
Assert.assertTrue(job.getInstanceId().longValue() == Thread.currentThread().getId());
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
index 2c1249e..49b0e31 100644
--- a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
+++ b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
@@ -316,7 +316,7 @@ public class VmWorkMockVirtualMachineManagerImpl implements VirtualMachineManage
if(wakeupCount++ < 3) {
AsyncJobExecutionContext.getCurrentExecutionContext().resetSyncSource();
} else {
- AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobConstants.STATUS_SUCCEEDED, 0, null);
+ AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(JobInfo.Status.SUCCEEDED, 0, null);
}
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea6ca5ff/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java b/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java
index 72210b4..eb5e81f 100644
--- a/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java
+++ b/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java
@@ -20,7 +20,7 @@ public class VmWorkTestWorkJobDispatcher extends AdapterBase implements AsyncJob
} catch (InterruptedException e) {
}
- AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
+ AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
s_logger.info("End work job execution. job-" + job.getId());
}
}