You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2013/12/12 01:20:45 UTC
git commit: updated refs/heads/4.3 to 619f538
Updated Branches:
refs/heads/4.3 900bedbe1 -> 619f5381d
CLOUDSTACK-669: put user vm work under new vmsync model
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/619f5381
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/619f5381
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/619f5381
Branch: refs/heads/4.3
Commit: 619f5381d5fd6db7b924b3b1f962a7261c0e3c18
Parents: 900bedb
Author: Kelven Yang <ke...@gmail.com>
Authored: Wed Dec 11 16:19:18 2013 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Wed Dec 11 16:19:57 2013 -0800
----------------------------------------------------------------------
...spring-engine-orchestration-core-context.xml | 5 +
.../com/cloud/vm/VirtualMachineManagerImpl.java | 134 ++++++++++--------
.../com/cloud/vm/VmWorkJobWakeupDispatcher.java | 141 +++++++++++++++++++
.../jobs/impl/AsyncJobManagerImpl.java | 5 +-
4 files changed, 224 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/619f5381/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
----------------------------------------------------------------------
diff --git a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
index 0c76f00..fc3bae3 100644
--- a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
+++ b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
@@ -78,5 +78,10 @@
<util:constant static-field="com.cloud.vm.VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER"/>
</property>
</bean>
+ <bean id= "vmWorkJobWakeupDispatcher" class="com.cloud.vm.VmWorkJobWakeupDispatcher">
+ <property name="name">
+ <util:constant static-field="com.cloud.vm.VmWorkJobDispatcher.VM_WORK_JOB_WAKEUP_DISPATCHER"/>
+ </property>
+ </bean>
</beans>
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/619f5381/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 841721d..849ebcf 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -40,6 +40,8 @@ import javax.ejb.Local;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
+import org.apache.log4j.Logger;
+
import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
@@ -67,7 +69,6 @@ import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import org.apache.cloudstack.storage.to.VolumeObjectTO;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
-import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.Listener;
@@ -328,13 +329,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
false);
static final ConfigKey<Boolean> VmJobEnabled = new ConfigKey<Boolean>("Advanced",
- Boolean.class, "vm.job.enabled", "false",
+ Boolean.class, "vm.job.enabled", "false",
"True to enable new VM sync model. false to use the old way", false);
static final ConfigKey<Long> VmJobCheckInterval = new ConfigKey<Long>("Advanced",
- Long.class, "vm.job.check.interval", "3000",
+ Long.class, "vm.job.check.interval", "3000",
"Interval in milliseconds to check if the job is complete", false);
static final ConfigKey<Long> VmJobTimeout = new ConfigKey<Long>("Advanced",
- Long.class, "vm.job.timeout", "600000",
+ Long.class, "vm.job.timeout", "600000",
"Time in milliseconds to wait before attempting to cancel a job", false);
static final ConfigKey<Integer> VmJobStateReportInterval = new ConfigKey<Integer>("Advanced",
Integer.class, "vm.job.report.interval", "60",
@@ -370,7 +371,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
assert (plan.getClusterId() == null && plan.getPoolId() == null) : "We currently don't support cluster and pool preset yet";
final VMInstanceVO vmFinal = _vmDao.persist(vm);
- final LinkedHashMap<? extends DiskOffering, Long> dataDiskOfferingsFinal = dataDiskOfferings == null ?
+ final LinkedHashMap<? extends DiskOffering, Long> dataDiskOfferingsFinal = dataDiskOfferings == null ?
new LinkedHashMap<DiskOffering, Long>() : dataDiskOfferings;
@@ -602,7 +603,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
while (retry-- != 0) {
try {
final ItWorkVO workFinal = work;
- Ternary<VMInstanceVO, ReservationContext, ItWorkVO> result =
+ Ternary<VMInstanceVO, ReservationContext, ItWorkVO> result =
Transaction.execute(new TransactionCallbackWithException<Ternary<VMInstanceVO, ReservationContext, ItWorkVO>, NoTransitionException>() {
@Override
public Ternary<VMInstanceVO, ReservationContext, ItWorkVO> doInTransaction(TransactionStatus status) throws NoTransitionException {
@@ -690,7 +691,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
- public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params)
+ public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params)
throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
advanceStart(vmUuid, params, null);
@@ -1219,7 +1220,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
- public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop)
+ public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop)
throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
@@ -1511,6 +1512,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return true;
}
+ @Override
public void storageMigration(String vmUuid, StoragePool destPool) {
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
@@ -1592,7 +1594,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
- public void migrate(String vmUuid, long srcHostId, DeployDestination dest)
+ public void migrate(String vmUuid, long srcHostId, DeployDestination dest)
throws ResourceUnavailableException, ConcurrentOperationException {
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
@@ -1863,7 +1865,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
}
- public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map<Volume, StoragePool> volumeToPool)
+ @Override
+ public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map<Volume, StoragePool> volumeToPool)
throws ResourceUnavailableException, ConcurrentOperationException {
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
@@ -2143,7 +2146,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
- public void advanceReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params)
+ public void advanceReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params)
throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
@@ -2824,12 +2827,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
if(VmJobEnabled.value()) {
- if(ping.getHostVmStateReport() != null && ping.getHostVmStateReport().size() > 0) {
+ if(ping.getHostVmStateReport() != null && ping.getHostVmStateReport().size() > 0) {
_syncMgr.processHostVmStatePingReport(agentId, ping.getHostVmStateReport());
}
}
- // take the chance to scan VMs that are stuck in transitional states
+ // take the chance to scan VMs that are stuck in transitional states
// and are missing from the report
scanStalledVMInTransitionStateOnUpHost(agentId);
processed = true;
@@ -2965,7 +2968,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
this.name = name;
this.state = state;
this.vm = vm;
- this.hostUuid = host;
+ hostUuid = host;
this.platform = platform;
}
@@ -3067,7 +3070,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
- public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested)
+ public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested)
throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException {
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
@@ -3174,7 +3177,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return nicTO;
}
- public boolean removeNicFromVm(VirtualMachine vm, Nic nic)
+ @Override
+ public boolean removeNicFromVm(VirtualMachine vm, Nic nic)
throws ConcurrentOperationException, ResourceUnavailableException {
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
@@ -3418,7 +3422,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
- public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId)
+ public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId)
throws ResourceUnavailableException, ConcurrentOperationException {
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
@@ -3446,7 +3450,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
- public void orchestrateMigrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId)
+ public void orchestrateMigrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId)
throws ResourceUnavailableException, ConcurrentOperationException {
VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
@@ -3665,8 +3669,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return result;
}
- public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering,
- boolean reconfiguringOnExistingHost)
+ @Override
+ public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering,
+ boolean reconfiguringOnExistingHost)
throws ResourceUnavailableException, ConcurrentOperationException {
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
@@ -3774,7 +3779,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Inject
public void setStoragePoolAllocators(List<StoragePoolAllocator> storagePoolAllocators) {
- this._storagePoolAllocators = storagePoolAllocators;
+ _storagePoolAllocators = storagePoolAllocators;
}
@@ -4112,7 +4117,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
assert(job != null);
assert(job.getDispatcher().equals(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER));
- AsyncJobVO jobVo = this._entityMgr.findById(AsyncJobVO.class, job.getId());
+ AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
if(jobVo != null && jobVo.getResult() != null) {
Object obj = JobSerializerHelper.fromSerializedString(job.getResult());
@@ -4126,8 +4131,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
// TODO build a common pattern to reduce code duplication in following methods
// no time for this at current iteration
//
- public Outcome<VirtualMachine> startVmThroughJobQueue(final String vmUuid,
- final Map<VirtualMachineProfile.Param, Object> params,
+ public Outcome<VirtualMachine> startVmThroughJobQueue(final String vmUuid,
+ final Map<VirtualMachineProfile.Param, Object> params,
final DeploymentPlan planToDeploy) {
final CallContext context = CallContext.current();
@@ -4137,11 +4142,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
- public void doInTransactionWithoutResult(TransactionStatus status) {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
VmWorkJobVO workJob = null;
_vmDao.lockRow(vm.getId(), true);
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance,
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance,
vm.getId(), VmWorkStart.class.getName());
if (pendingWorkJobs.size() > 0) {
@@ -4177,7 +4183,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
- return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+ return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
VirtualMachine.PowerState.PowerOn, vm.getId(), null);
}
@@ -4189,11 +4195,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
- public void doInTransactionWithoutResult(TransactionStatus status) {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
+ VirtualMachine.Type.Instance, vm.getId(),
VmWorkStop.class.getName());
VmWorkJobVO workJob = null;
@@ -4227,11 +4234,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
- return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+ return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
VirtualMachine.PowerState.PowerOff, vm.getId(), null);
}
- public Outcome<VirtualMachine> rebootVmThroughJobQueue(final String vmUuid,
+ public Outcome<VirtualMachine> rebootVmThroughJobQueue(final String vmUuid,
final Map<VirtualMachineProfile.Param, Object> params) {
final CallContext context = CallContext.current();
@@ -4241,11 +4248,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
- public void doInTransactionWithoutResult(TransactionStatus status) {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
+ VirtualMachine.Type.Instance, vm.getId(),
VmWorkReboot.class.getName());
VmWorkJobVO workJob = null;
@@ -4279,7 +4287,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
- return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+ return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vm.getId());
}
@@ -4291,12 +4299,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
- public void doInTransactionWithoutResult(TransactionStatus status) {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
+ VirtualMachine.Type.Instance, vm.getId(),
VmWorkMigrate.class.getName());
VmWorkJobVO workJob = null;
@@ -4329,7 +4338,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
- return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+ return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId());
}
@@ -4344,12 +4353,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
- public void doInTransactionWithoutResult(TransactionStatus status) {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
+ VirtualMachine.Type.Instance, vm.getId(),
VmWorkMigrateWithStorage.class.getName());
VmWorkJobVO workJob = null;
@@ -4369,7 +4379,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
- VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(),
+ VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(),
srcHostId, destHostId, volumeToPool);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@@ -4383,7 +4393,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
- return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+ return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId);
}
@@ -4397,12 +4407,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
- public void doInTransactionWithoutResult(TransactionStatus status) {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
+ VirtualMachine.Type.Instance, vm.getId(),
VmWorkMigrateForScale.class.getName());
VmWorkJobVO workJob = null;
@@ -4422,7 +4433,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
- VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(),
+ VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(),
srcHostId, dest, newSvcOfferingId);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@@ -4449,12 +4460,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
- public void doInTransactionWithoutResult(TransactionStatus status) {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
+ VirtualMachine.Type.Instance, vm.getId(),
VmWorkStorageMigration.class.getName());
VmWorkJobVO workJob = null;
@@ -4474,7 +4486,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
- VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(),
+ VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(),
destPool);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@@ -4499,12 +4511,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final Account account = context.getCallingAccount();
Transaction.execute(new TransactionCallbackNoReturn () {
- public void doInTransactionWithoutResult(TransactionStatus status) {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
+ VirtualMachine.Type.Instance, vm.getId(),
VmWorkAddVmToNetwork.class.getName());
VmWorkJobVO workJob = null;
@@ -4524,7 +4537,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
- VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(),
+ VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(),
network, requested);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@@ -4549,12 +4562,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final Account account = context.getCallingAccount();
Transaction.execute(new TransactionCallbackNoReturn () {
- public void doInTransactionWithoutResult(TransactionStatus status) {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
+ VirtualMachine.Type.Instance, vm.getId(),
VmWorkRemoveNicFromVm.class.getName());
VmWorkJobVO workJob = null;
@@ -4574,7 +4588,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
- VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(),
+ VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(),
nic);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@@ -4599,12 +4613,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final Account account = context.getCallingAccount();
Transaction.execute(new TransactionCallbackNoReturn () {
- public void doInTransactionWithoutResult(TransactionStatus status) {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
+ VirtualMachine.Type.Instance, vm.getId(),
VmWorkRemoveVmFromNetwork.class.getName());
VmWorkJobVO workJob = null;
@@ -4624,7 +4639,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
- VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(),
+ VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(),
network, broadcastUri);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@@ -4651,12 +4666,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
- public void doInTransactionWithoutResult(TransactionStatus status) {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
+ VirtualMachine.Type.Instance, vm.getId(),
VmWorkReconfigure.class.getName());
VmWorkJobVO workJob = null;
@@ -4676,7 +4692,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
- VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(),
+ VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(),
oldServiceOffering, reconfiguringOnExistingHost);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/619f5381/engine/orchestration/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
new file mode 100644
index 0000000..5704f97
--- /dev/null
+++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.vm;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.context.CallContext;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
+import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
+
+import com.cloud.user.AccountVO;
+import com.cloud.user.dao.AccountDao;
+import com.cloud.utils.component.AdapterBase;
+import com.cloud.vm.dao.VMInstanceDao;
+
+public class VmWorkJobWakeupDispatcher extends AdapterBase implements AsyncJobDispatcher {
+ private static final Logger s_logger = Logger.getLogger(VmWorkJobWakeupDispatcher.class);
+
+ @Inject
+ private VmWorkJobDao _workjobDao;
+ @Inject
+ private AsyncJobJoinMapDao _joinMapDao;
+ @Inject
+ private AccountDao _accountDao;
+ @Inject
+ private VMInstanceDao _instanceDao;
+ @Inject
+ private VirtualMachineManager _vmMgr;
+ @Inject
+ private AsyncJobManager _asyncJobMgr;
+
+ private final Map<String, Method> _handlerMap = new HashMap<String, Method>();
+
+ @Override
+ public void runJob(AsyncJob job) {
+ try {
+ List<AsyncJobJoinMapVO> joinRecords = _joinMapDao.listJoinRecords(job.getId());
+ if (joinRecords.size() != 1) {
+ s_logger.warn("AsyncJob-" + job.getId()
+ + " received wakeup call with un-supported joining job number: " + joinRecords.size());
+
+ // if we fail wakeup-execution for any reason, avoid release sync-source if there is any
+ job.setSyncSource(null);
+ return;
+ }
+
+ AsyncJobJoinMapVO joinRecord = joinRecords.get(0);
+ VmWorkJobVO joinedJob = _workjobDao.findById(joinRecord.getJoinJobId());
+
+ Class<?> workClz = null;
+ try {
+ workClz = Class.forName(job.getCmd());
+ } catch (ClassNotFoundException e) {
+ s_logger.error("VM work class " + job.getCmd() + " is not found", e);
+ return;
+ }
+
+ // get original work context information from joined job
+ VmWork work = VmWorkSerializer.deserialize(workClz, joinedJob.getCmdInfo());
+ assert (work != null);
+
+ AccountVO account = _accountDao.findById(work.getAccountId());
+ assert (account != null);
+
+ VMInstanceVO vm = _instanceDao.findById(work.getVmId());
+ assert (vm != null);
+
+ CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated());
+ try {
+ Method handler = getHandler(joinRecord.getWakeupHandler());
+ if (handler != null) {
+ handler.invoke(_vmMgr);
+ } else {
+ assert (false);
+ s_logger.error("Unable to find wakeup handler " + joinRecord.getWakeupHandler() +
+ " when waking up job-" + job.getId());
+ }
+ } finally {
+ CallContext.unregister();
+ }
+ } catch (Throwable e) {
+ s_logger.warn("Unexpected exception in waking up job-" + job.getId());
+
+ // if we fail wakeup-execution for any reason, avoid release sync-source if there is any
+ job.setSyncSource(null);
+ }
+ }
+
+ private Method getHandler(String wakeupHandler) {
+
+ synchronized (_handlerMap) {
+ Class<?> clz = _vmMgr.getClass();
+ Method method = _handlerMap.get(wakeupHandler);
+ if (method != null)
+ return method;
+
+ try {
+ method = clz.getMethod(wakeupHandler);
+ method.setAccessible(true);
+ } catch (SecurityException e) {
+ assert (false);
+ s_logger.error("Unexpected exception", e);
+ return null;
+ } catch (NoSuchMethodException e) {
+ assert (false);
+ s_logger.error("Unexpected exception", e);
+ return null;
+ }
+
+ _handlerMap.put(wakeupHandler, method);
+ return method;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/619f5381/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index d795752..19e5182 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
+import org.apache.log4j.Logger;
+
import org.apache.cloudstack.api.ApiErrorCode;
import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey;
@@ -52,7 +54,6 @@ import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.jobs.JobInfo.Status;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
-import org.apache.log4j.Logger;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHost;
@@ -633,7 +634,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
msgDetector.open(_messageBus, topics);
try {
long startTick = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTick < timeoutInMiliseconds) {
+ while (timeoutInMiliseconds < 0 || System.currentTimeMillis() - startTick < timeoutInMiliseconds) {
msgDetector.waitAny(checkIntervalInMilliSeconds);
job = _jobDao.findById(job.getId());
if (job.getStatus().done()) {