You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2013/11/21 03:33:48 UTC
[2/6] git commit: updated refs/heads/master to 278ef81
side-by-side VM sync management at manager level
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/278ef81a
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/278ef81a
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/278ef81a
Branch: refs/heads/master
Commit: 278ef81a8339e8dfc6ecd9dd04701d6713b9c062
Parents: 2d42b2d
Author: Kelven Yang <ke...@gmail.com>
Authored: Wed Nov 20 17:46:45 2013 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Wed Nov 20 17:59:38 2013 -0800
----------------------------------------------------------------------
api/src/com/cloud/vm/VirtualMachine.java | 7 +-
.../apache/cloudstack/api/InternalIdentity.java | 4 +-
.../apache/cloudstack/context/CallContext.java | 12 +
.../src/com/cloud/vm/VirtualMachineManager.java | 30 +-
.../src/com/cloud/alert/AlertManager.java | 2 +
.../components-api/src/com/cloud/vm/VmWork.java | 45 +
...spring-engine-orchestration-core-context.xml | 1 +
.../com/cloud/vm/VirtualMachineManagerImpl.java | 1348 +++++++++++++++++-
.../cloud/vm/VirtualMachinePowerStateSync.java | 3 +-
.../vm/VirtualMachinePowerStateSyncImpl.java | 24 +-
.../src/com/cloud/vm/VmWorkAddVmToNetwork.java | 42 +
.../src/com/cloud/vm/VmWorkJobDispatcher.java | 152 ++
.../src/com/cloud/vm/VmWorkMigrate.java | 86 ++
.../src/com/cloud/vm/VmWorkMigrateForScale.java | 48 +
.../com/cloud/vm/VmWorkMigrateWithStorage.java | 52 +
.../src/com/cloud/vm/VmWorkReboot.java | 68 +
.../src/com/cloud/vm/VmWorkReconfigure.java | 43 +
.../src/com/cloud/vm/VmWorkRemoveNicFromVm.java | 33 +
.../com/cloud/vm/VmWorkRemoveVmFromNetwork.java | 43 +
.../src/com/cloud/vm/VmWorkSerializer.java | 75 +
.../src/com/cloud/vm/VmWorkStart.java | 125 ++
.../src/com/cloud/vm/VmWorkStop.java | 32 +
.../com/cloud/vm/VmWorkStorageMigration.java | 35 +
.../dao/ManagementServerHostDaoImpl.java | 7 +-
.../core/spring-framework-jobs-core-context.xml | 4 +-
.../jobs/AsyncJobExecutionContext.java | 10 +-
26 files changed, 2274 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/api/src/com/cloud/vm/VirtualMachine.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/vm/VirtualMachine.java b/api/src/com/cloud/vm/VirtualMachine.java
index 9a8d883..3400898 100755
--- a/api/src/com/cloud/vm/VirtualMachine.java
+++ b/api/src/com/cloud/vm/VirtualMachine.java
@@ -213,7 +213,12 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, Identity, I
* UserBareMetal is only used for selecting VirtualMachineGuru, there is no
* VM with this type. UserBareMetal should treat exactly as User.
*/
- UserBareMetal(false);
+ UserBareMetal(false),
+
+ /*
+ * General VM type for queuing VM orchestration work
+ */
+ Instance(false);
boolean _isUsedBySystem;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/api/src/org/apache/cloudstack/api/InternalIdentity.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/api/InternalIdentity.java b/api/src/org/apache/cloudstack/api/InternalIdentity.java
index 1dfeb8c..4149dd1 100644
--- a/api/src/org/apache/cloudstack/api/InternalIdentity.java
+++ b/api/src/org/apache/cloudstack/api/InternalIdentity.java
@@ -16,11 +16,13 @@
// under the License.
package org.apache.cloudstack.api;
+import java.io.Serializable;
+
// This interface is a contract that getId() will give the internal
// ID of an entity which extends this interface
// Any class having an internal ID in db table/schema should extend this
// For example, all ControlledEntity, OwnedBy would have an internal ID
-public interface InternalIdentity {
+public interface InternalIdentity extends Serializable {
long getId();
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/api/src/org/apache/cloudstack/context/CallContext.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/context/CallContext.java b/api/src/org/apache/cloudstack/context/CallContext.java
index 5439aee..3cdccc5 100644
--- a/api/src/org/apache/cloudstack/context/CallContext.java
+++ b/api/src/org/apache/cloudstack/context/CallContext.java
@@ -197,6 +197,18 @@ public class CallContext {
}
return register(user, account);
}
+
+ public static CallContext register(long callingUserId, long callingAccountId, String contextId) throws CloudAuthenticationException {
+ Account account = s_entityMgr.findById(Account.class, callingAccountId);
+ if (account == null) {
+ throw new CloudAuthenticationException("The account is no longer current.").add(Account.class, Long.toString(callingAccountId));
+ }
+ User user = s_entityMgr.findById(User.class, callingUserId);
+ if (user == null) {
+ throw new CloudAuthenticationException("The user is no longer current.").add(User.class, Long.toString(callingUserId));
+ }
+ return register(user, account, contextId);
+ }
public static void unregisterAll() {
while ( unregister() != null ) {
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/api/src/com/cloud/vm/VirtualMachineManager.java
----------------------------------------------------------------------
diff --git a/engine/api/src/com/cloud/vm/VirtualMachineManager.java b/engine/api/src/com/cloud/vm/VirtualMachineManager.java
index 35804af..9d19cf5 100644
--- a/engine/api/src/com/cloud/vm/VirtualMachineManager.java
+++ b/engine/api/src/com/cloud/vm/VirtualMachineManager.java
@@ -108,8 +108,13 @@ public interface VirtualMachineManager extends Manager {
void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy) throws InsufficientCapacityException,
ResourceUnavailableException, ConcurrentOperationException, OperationTimedoutException;
+ void orchestrateStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy) throws InsufficientCapacityException,
+ ResourceUnavailableException, ConcurrentOperationException, OperationTimedoutException;
+
void advanceStop(String vmUuid, boolean cleanupEvenIfUnableToStop) throws ResourceUnavailableException, OperationTimedoutException, ConcurrentOperationException;
+ void orchestrateStop(String vmUuid, boolean cleanupEvenIfUnableToStop) throws ResourceUnavailableException, OperationTimedoutException, ConcurrentOperationException;
+
void advanceExpunge(String vmUuid) throws ResourceUnavailableException, OperationTimedoutException, ConcurrentOperationException;
void destroy(String vmUuid) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException;
@@ -117,11 +122,17 @@ public interface VirtualMachineManager extends Manager {
void migrateAway(String vmUuid, long hostId) throws InsufficientServerCapacityException;
void migrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException;
+
+ void orchestrateMigrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException;
void migrateWithStorage(String vmUuid, long srcId, long destId, Map<Volume, StoragePool> volumeToPool) throws ResourceUnavailableException, ConcurrentOperationException;
-
+
+ void orchestrateMigrateWithStorage(String vmUuid, long srcId, long destId, Map<Volume, StoragePool> volumeToPool) throws ResourceUnavailableException, ConcurrentOperationException;
+
void reboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ResourceUnavailableException;
+ void orchestrateReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ResourceUnavailableException;
+
void advanceReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ResourceUnavailableException,
ConcurrentOperationException, OperationTimedoutException;
@@ -137,6 +148,8 @@ public interface VirtualMachineManager extends Manager {
VirtualMachine findById(long vmId);
void storageMigration(String vmUuid, StoragePool storagePoolId);
+
+ void orchestrateStorageMigration(String vmUuid, StoragePool storagePoolId);
/**
* @param vmInstance
@@ -161,7 +174,11 @@ public interface VirtualMachineManager extends Manager {
* @throws InsufficientCapacityException
*/
NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException,
- ResourceUnavailableException, InsufficientCapacityException;
+ ResourceUnavailableException, InsufficientCapacityException;
+
+ NicProfile orchestrateAddVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException,
+ ResourceUnavailableException, InsufficientCapacityException;
+
/**
* @param vm
@@ -172,6 +189,8 @@ public interface VirtualMachineManager extends Manager {
*/
boolean removeNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException;
+ boolean orchestrateRemoveNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException;
+
/**
* @param vm
* @param network
@@ -181,6 +200,8 @@ public interface VirtualMachineManager extends Manager {
* @throws ConcurrentOperationException
*/
boolean removeVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException;
+
+ boolean orchestrateRemoveVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException;
/**
* @param nic
@@ -196,12 +217,15 @@ public interface VirtualMachineManager extends Manager {
*/
VirtualMachineTO toVmTO(VirtualMachineProfile profile);
-
VirtualMachine reConfigureVm(String vmUuid, ServiceOffering newServiceOffering, boolean sameHost) throws ResourceUnavailableException, ConcurrentOperationException;
+
+ VirtualMachine orchestrateReConfigureVm(String vmUuid, ServiceOffering newServiceOffering, boolean sameHost) throws ResourceUnavailableException, ConcurrentOperationException;
void findHostAndMigrate(String vmUuid, Long newSvcOfferingId, DeploymentPlanner.ExcludeList excludeHostList) throws InsufficientCapacityException,
ConcurrentOperationException, ResourceUnavailableException;
void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long newSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException;
+
+ void orchestrateMigrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long newSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException;
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/components-api/src/com/cloud/alert/AlertManager.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/alert/AlertManager.java b/engine/components-api/src/com/cloud/alert/AlertManager.java
index 1ae6b1b..eb5ac0c 100755
--- a/engine/components-api/src/com/cloud/alert/AlertManager.java
+++ b/engine/components-api/src/com/cloud/alert/AlertManager.java
@@ -50,6 +50,8 @@ public interface AlertManager extends Manager {
public static final short ALERT_TYPE_DIRECT_ATTACHED_PUBLIC_IP = 24;
public static final short ALERT_TYPE_LOCAL_STORAGE = 25;
public static final short ALERT_TYPE_RESOURCE_LIMIT_EXCEEDED = 26; // Generated when the resource limit exceeds the limit. Currently used for recurring snapshots only
+
+ public static final short ALERT_TYPE_SYNC = 27;
static final ConfigKey<Double> StorageCapacityThreshold = new ConfigKey<Double>(Double.class, "cluster.storage.capacity.notificationthreshold", "Alert", "0.75",
"Percentage (as a value between 0 and 1) of storage utilization above which alerts will be sent about low storage available.", true, ConfigKey.Scope.Cluster, null);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/components-api/src/com/cloud/vm/VmWork.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/vm/VmWork.java b/engine/components-api/src/com/cloud/vm/VmWork.java
new file mode 100644
index 0000000..3f9e71d
--- /dev/null
+++ b/engine/components-api/src/com/cloud/vm/VmWork.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm;
+
+import java.io.Serializable;
+
+public class VmWork implements Serializable {
+ private static final long serialVersionUID = -6946320465729853589L;
+
+ long userId;
+ long accountId;
+ long vmId;
+
+ public VmWork(long userId, long accountId, long vmId) {
+ this.userId = userId;
+ this.accountId = accountId;
+ this.vmId = vmId;
+ }
+
+ public long getUserId() {
+ return userId;
+ }
+
+ public long getAccountId() {
+ return accountId;
+ }
+
+ public long getVmId() {
+ return vmId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
----------------------------------------------------------------------
diff --git a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
index b5c4254..880002c 100644
--- a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
+++ b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
@@ -67,5 +67,6 @@
<bean id="virtualMachineEntityImpl" class="org.apache.cloudstack.engine.cloud.entity.api.VirtualMachineEntityImpl" />
+ <bean id="virtualMachinePowerStateSyncImpl" class="com.cloud.vm.VirtualMachinePowerStateSyncImpl" />
</beans>
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 75a95c5..189c2ba 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -18,6 +18,10 @@
package com.cloud.vm;
import java.net.URI;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -26,6 +30,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -47,6 +52,18 @@ import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.framework.jobs.Outcome;
+import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
+import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
+import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageHandler;
+import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
@@ -153,8 +170,10 @@ import com.cloud.storage.dao.VolumeDao;
import com.cloud.template.VirtualMachineTemplate;
import com.cloud.user.Account;
import com.cloud.user.User;
+import com.cloud.utils.DateUtil;
import com.cloud.utils.Journal;
import com.cloud.utils.Pair;
+import com.cloud.utils.Predicate;
import com.cloud.utils.StringUtils;
import com.cloud.utils.Ternary;
import com.cloud.utils.component.ManagerBase;
@@ -163,8 +182,10 @@ import com.cloud.utils.db.DB;
import com.cloud.utils.db.EntityManager;
import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
import com.cloud.utils.db.TransactionCallbackWithException;
import com.cloud.utils.db.TransactionCallbackWithExceptionNoReturn;
+import com.cloud.utils.db.TransactionLegacy;
import com.cloud.utils.db.TransactionStatus;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.ExecutionException;
@@ -172,6 +193,7 @@ import com.cloud.utils.fsm.NoTransitionException;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.ItWorkVO.Step;
import com.cloud.vm.VirtualMachine.Event;
+import com.cloud.vm.VirtualMachine.PowerState;
import com.cloud.vm.VirtualMachine.State;
import com.cloud.vm.dao.NicDao;
import com.cloud.vm.dao.UserVmDao;
@@ -186,6 +208,8 @@ import com.cloud.vm.snapshot.dao.VMSnapshotDao;
public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMachineManager, Listener, Configurable {
private static final Logger s_logger = Logger.getLogger(VirtualMachineManagerImpl.class);
+ private static final String VM_SYNC_ALERT_SUBJECT = "VM state sync alert";
+
@Inject
DataStoreManager dataStoreMgr;
@Inject
@@ -279,6 +303,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Inject
DeploymentPlanningManager _dpMgr;
+ @Inject protected MessageBus _messageBus;
+ @Inject protected VirtualMachinePowerStateSync _syncMgr;
+ @Inject protected VmWorkJobDao _workJobDao;
+ @Inject protected AsyncJobManager _jobMgr;
+
Map<VirtualMachine.Type, VirtualMachineGuru> _vmGurus = new HashMap<VirtualMachine.Type, VirtualMachineGuru>();
protected StateMachine2<State, VirtualMachine.Event, VirtualMachine> _stateMachine;
@@ -298,6 +327,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
"On destroy, force-stop takes this value ", true);
static final ConfigKey<Integer> ClusterDeltaSyncInterval = new ConfigKey<Integer>("Advanced", Integer.class, "sync.interval", "60", "Cluster Delta sync interval in seconds",
false);
+
+ static final ConfigKey<Boolean> VmJobEnabled = new ConfigKey<Boolean>("Advanced",
+ Boolean.class, "vm.job.enabled", "false",
+ "True to enable new VM sync model. false to use the old way", false);
+ static final ConfigKey<Long> VmJobCheckInterval = new ConfigKey<Long>("Advanced",
+ Long.class, "vm.job.check.interval", "3000",
+ "Interval in milliseconds to check if the job is complete", false);
+ static final ConfigKey<Long> VmJobTimeout = new ConfigKey<Long>("Advanced",
+ Long.class, "vm.job.timeout", "600000",
+ "Time in milliseconds to wait before attempting to cancel a job", false);
+ static final ConfigKey<Integer> VmJobStateReportInterval = new ConfigKey<Integer>("Advanced",
+ Integer.class, "vm.job.report.interval", "60",
+ "Interval to send application level pings to make sure the connection is still working", false);
@@ -651,15 +693,46 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
- public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ConcurrentOperationException,
- ResourceUnavailableException {
+ public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params)
+ throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
+
advanceStart(vmUuid, params, null);
}
@Override
public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy) throws InsufficientCapacityException,
- ConcurrentOperationException, ResourceUnavailableException {
- CallContext cctxt = CallContext.current();
+ ConcurrentOperationException, ResourceUnavailableException {
+
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+ // avoid re-entrance
+ orchestrateStart(vmUuid, params, planToDeploy);
+ } else {
+ Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy);
+
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
+
+ Throwable jobException = retriveExecutionException(outcome.getJob());
+ if(jobException != null) {
+ if(jobException instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobException;
+ else if(jobException instanceof ResourceUnavailableException)
+ throw (ResourceUnavailableException)jobException;
+ }
+ }
+ }
+
+ @Override
+ public void orchestrateStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy) throws InsufficientCapacityException,
+ ConcurrentOperationException, ResourceUnavailableException {
+
+ CallContext cctxt = CallContext.current();
Account account = cctxt.getCallingAccount();
User caller = cctxt.getCallingUser();
@@ -1145,7 +1218,38 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
- public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
+ public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop)
+ throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
+
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+ // avoid re-entrance
+ orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop);
+ } else {
+ Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop);
+
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
+
+ Throwable jobException = retriveExecutionException(outcome.getJob());
+ if(jobException != null) {
+ if(jobException instanceof AgentUnavailableException)
+ throw (AgentUnavailableException)jobException;
+ else if(jobException instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobException;
+ else if(jobException instanceof OperationTimedoutException)
+ throw (OperationTimedoutException)jobException;
+ }
+ }
+ }
+
+ @Override
+ public void orchestrateStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
advanceStop(vm, cleanUpEvenIfUnableToStop);
@@ -1415,9 +1519,33 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return true;
}
+
+ public void storageMigration(String vmUuid, StoragePool destPool) {
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+ // avoid re-entrance
+ orchestrateStorageMigration(vmUuid, destPool);
+ } else {
+ Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool);
+
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
+
+ Throwable jobException = retriveExecutionException(outcome.getJob());
+ if(jobException != null) {
+ if(jobException instanceof RuntimeException)
+ throw (RuntimeException)jobException;
+ }
+ }
+ }
@Override
- public void storageMigration(String vmUuid, StoragePool destPool) {
+ public void orchestrateStorageMigration(String vmUuid, StoragePool destPool) {
VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
try {
@@ -1473,7 +1601,38 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
- public void migrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException {
+ public void migrate(String vmUuid, long srcHostId, DeployDestination dest)
+ throws ResourceUnavailableException, ConcurrentOperationException {
+
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+ // avoid re-entrance
+ orchestrateMigrate(vmUuid, srcHostId, dest);
+ } else {
+ Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest);
+
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
+
+ Throwable jobException = retriveExecutionException(outcome.getJob());
+ if(jobException != null) {
+ if(jobException instanceof ResourceUnavailableException)
+ throw (ResourceUnavailableException)jobException;
+ else if(jobException instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobException;
+ else if(jobException instanceof RuntimeException)
+ throw (RuntimeException)jobException;
+ }
+ }
+ }
+
+ @Override
+ public void orchestrateMigrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException {
VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
if (vm == null) {
if (s_logger.isDebugEnabled()) {
@@ -1713,9 +1872,38 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
}
+ public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map<Volume, StoragePool> volumeToPool)
+ throws ResourceUnavailableException, ConcurrentOperationException {
+
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+ // avoid re-entrance
+ orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool);
+ } else {
+ Outcome<VirtualMachine> outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool);
+
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
+
+ Throwable jobException = retriveExecutionException(outcome.getJob());
+ if(jobException != null) {
+ if(jobException instanceof ResourceUnavailableException)
+ throw (ResourceUnavailableException)jobException;
+ else if(jobException instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobException;
+ }
+ }
+ }
+
@Override
- public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map<Volume, StoragePool> volumeToPool) throws ResourceUnavailableException,
- ConcurrentOperationException {
+ public void orchestrateMigrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map<Volume, StoragePool> volumeToPool) throws ResourceUnavailableException,
+ ConcurrentOperationException {
+
VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
HostVO srcHost = _hostDao.findById(srcHostId);
@@ -1954,9 +2142,40 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw new CloudRuntimeException("Unable to reboot a VM due to concurrent operation", e);
}
}
-
+
@Override
- public void advanceReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ConcurrentOperationException,
+ public void advanceReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params)
+ throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
+
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+ // avoid re-entrance
+ orchestrateReboot(vmUuid, params);
+ } else {
+ Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params);
+
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
+
+ Throwable jobException = retriveExecutionException(outcome.getJob());
+ if(jobException != null) {
+ if(jobException instanceof ResourceUnavailableException)
+ throw (ResourceUnavailableException)jobException;
+ else if(jobException instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobException;
+ else if(jobException instanceof InsufficientCapacityException)
+ throw (InsufficientCapacityException)jobException;
+ }
+ }
+ }
+
+ @Override
+ public void orchestrateReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ConcurrentOperationException,
ResourceUnavailableException {
VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
@@ -2081,8 +2300,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return commands;
}
-
-
public void deltaSync(Map<String, Ternary<String, State, String>> newStates) {
Map<Long, AgentVmInfo> states = convertToInfos(newStates);
@@ -2615,6 +2832,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
}
}
+
+ if(VmJobEnabled.value()) {
+ if(ping.getHostVmStateReport() != null && ping.getHostVmStateReport().size() > 0) {
+ _syncMgr.processHostVmStatePingReport(agentId, ping.getHostVmStateReport());
+ }
+ }
+
+ // take the chance to scan VMs that are stuck in transitional states
+ // and are missing from the report
+ scanStalledVMInTransitionStateOnUpHost(agentId);
processed = true;
}
}
@@ -2636,7 +2863,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
if (!(cmd instanceof StartupRoutingCommand)) {
return;
}
+
+ if(s_logger.isDebugEnabled())
+ s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId());
+ if(VmJobEnabled.value()) {
+ _syncMgr.resetHostSyncState(agent.getId());
+ }
+
if (forRebalance) {
s_logger.debug("Not processing listener " + this + " as connect happens on rebalance process");
return;
@@ -2842,9 +3076,50 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
vmForUpdate.setServiceOfferingId(newSvcOff.getId());
return _vmDao.update(vmId, vmForUpdate);
}
-
+
+ @Override
+ public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested)
+ throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException {
+
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+ // avoid re-entrance
+ return orchestrateAddVmToNetwork(vm, network,requested);
+ } else {
+ Outcome<VirtualMachine> outcome = addVmToNetworkThroughJobQueue(vm, network, requested);
+
+ try {
+ outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
+
+ AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, outcome.getJob().getId());
+ if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) {
+
+ NicProfile nic = (NicProfile)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult());
+ return nic;
+ } else {
+ Throwable jobException = retriveExecutionException(outcome.getJob());
+ if(jobException != null) {
+ if(jobException instanceof ResourceUnavailableException)
+ throw (ResourceUnavailableException)jobException;
+ else if(jobException instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobException;
+ else if(jobException instanceof InsufficientCapacityException)
+ throw (InsufficientCapacityException)jobException;
+ else if(jobException instanceof RuntimeException)
+ throw (RuntimeException)jobException;
+ }
+ throw new RuntimeException("Job failed with unhandled exception");
+ }
+ }
+ }
+
@Override
- public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException,
+ public NicProfile orchestrateAddVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException,
InsufficientCapacityException {
CallContext cctx = CallContext.current();
@@ -2909,9 +3184,48 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
NicTO nicTO = hvGuru.toNicTO(nic);
return nicTO;
}
+
+ public boolean removeNicFromVm(VirtualMachine vm, Nic nic)
+ throws ConcurrentOperationException, ResourceUnavailableException {
+
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+ // avoid re-entrance
+ return orchestrateRemoveNicFromVm(vm, nic);
+ } else {
+ Outcome<VirtualMachine> outcome = removeNicFromVmThroughJobQueue(vm, nic);
+
+ try {
+ outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
+
+ AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, outcome.getJob().getId());
+
+ if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) {
+ Boolean result = (Boolean)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult());
+ return result;
+ } else {
+ Throwable jobException = retriveExecutionException(outcome.getJob());
+ if(jobException != null) {
+ if(jobException instanceof ResourceUnavailableException)
+ throw (ResourceUnavailableException)jobException;
+ else if(jobException instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobException;
+ else if(jobException instanceof RuntimeException)
+ throw (RuntimeException)jobException;
+ }
+
+ throw new RuntimeException("Job failed with un-handled exception");
+ }
+ }
+ }
@Override
- public boolean removeNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException {
+ public boolean orchestrateRemoveNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException {
CallContext cctx = CallContext.current();
VMInstanceVO vmVO = _vmDao.findById(vm.getId());
NetworkVO network = _networkDao.findById(nic.getNetworkId());
@@ -2971,6 +3285,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
@DB
public boolean removeVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException {
+ // TODO will serialize on the VM object later to resolve operation conflicts
+ return orchestrateRemoveVmFromNetwork(vm, network, broadcastUri);
+ }
+
+ @Override
+ @DB
+ public boolean orchestrateRemoveVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException {
CallContext cctx = CallContext.current();
VMInstanceVO vmVO = _vmDao.findById(vm.getId());
ReservationContext context = new ReservationContextImpl(null, null, cctx.getCallingUser(), cctx.getCallingAccount());
@@ -3106,10 +3427,40 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw e;
}
}
+
+ @Override
+ public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId)
+ throws ResourceUnavailableException, ConcurrentOperationException {
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+ // avoid re-entrance
+ orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId);
+ } else {
+ Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId);
+
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
+
+ Throwable jobException = retriveExecutionException(outcome.getJob());
+ if(jobException != null) {
+ if(jobException instanceof ResourceUnavailableException)
+ throw (ResourceUnavailableException)jobException;
+ else if(jobException instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobException;
+ }
+ }
+ }
@Override
- public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException {
- VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+ public void orchestrateMigrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId)
+ throws ResourceUnavailableException, ConcurrentOperationException {
+
+ VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
s_logger.info("Migrating " + vm + " to " + dest);
vm.getServiceOfferingId();
@@ -3293,7 +3644,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
public boolean unplugNic(Network network, NicTO nic, VirtualMachineTO vm, ReservationContext context, DeployDestination dest) throws ConcurrentOperationException,
- ResourceUnavailableException {
+ ResourceUnavailableException {
boolean result = true;
VMInstanceVO router = _vmDao.findById(vm.getId());
@@ -3324,9 +3675,46 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return result;
}
-
+
+ public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering,
+ boolean reconfiguringOnExistingHost)
+ throws ResourceUnavailableException, ConcurrentOperationException {
+
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+ // avoid re-entrance
+ return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
+ } else {
+ Outcome<VirtualMachine> outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
+
+ VirtualMachine vm = null;
+ try {
+ vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
+
+ AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, outcome.getJob().getId());
+ if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) {
+ return _entityMgr.findById(VMInstanceVO.class, vm.getId());
+ } else {
+ Throwable jobException = retriveExecutionException(outcome.getJob());
+ if(jobException != null) {
+ if(jobException instanceof ResourceUnavailableException)
+ throw (ResourceUnavailableException)jobException;
+ else if(jobException instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobException;
+ }
+
+ throw new RuntimeException("Failed with un-handled exception");
+ }
+ }
+ }
+
@Override
- public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, boolean reconfiguringOnExistingHost) throws ResourceUnavailableException,
+ public VMInstanceVO orchestrateReConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, boolean reconfiguringOnExistingHost) throws ResourceUnavailableException,
ConcurrentOperationException {
VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
@@ -3388,7 +3776,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[] {ClusterDeltaSyncInterval, StartRetry, VmDestroyForcestop, VmOpCancelInterval, VmOpCleanupInterval, VmOpCleanupWait, VmOpLockStateRetry,
- VmOpWaitInterval, ExecuteInSequence};
+ VmOpWaitInterval, ExecuteInSequence, VmJobCheckInterval, VmJobTimeout, VmJobStateReportInterval};
}
public List<StoragePoolAllocator> getStoragePoolAllocators() {
@@ -3400,4 +3788,920 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_storagePoolAllocators = storagePoolAllocators;
}
+
+ //
+ // PowerState report handling for out-of-band changes and handling of left-over transitional VM states
+ //
+
+ @MessageHandler(topic = Topics.VM_POWER_STATE)
+ private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) {
+ assert(args != null);
+ Long vmId = (Long)args;
+
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vmId);
+ if(pendingWorkJobs.size() == 0) {
+ // there is no pending operation job
+ VMInstanceVO vm = _vmDao.findById(vmId);
+ if(vm != null) {
+ switch(vm.getPowerState()) {
+ case PowerOn :
+ handlePowerOnReportWithNoPendingJobsOnVM(vm);
+ break;
+
+ case PowerOff :
+ handlePowerOffReportWithNoPendingJobsOnVM(vm);
+ break;
+
+ // PowerUnknown shouldn't be reported, it is a derived
+ // VM power state from host state (host un-reachable
+ case PowerUnknown :
+ default :
+ assert(false);
+ break;
+ }
+ } else {
+ s_logger.warn("VM " + vmId + " no longer exists when processing VM state report");
+ }
+ } else {
+ // TODO, do job wake-up signalling, since currently async job wake-up is not in use
+ // we will skip it for nows
+ }
+ }
+
+ private void handlePowerOnReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
+ //
+ // 1) handle left-over transitional VM states
+ // 2) handle out of band VM live migration
+ // 3) handle out of sync stationary states, marking VM from Stopped to Running with
+ // alert messages
+ //
+ switch(vm.getState()) {
+ case Starting :
+ try {
+ stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+ } catch(NoTransitionException e) {
+ s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+ }
+
+ // we need to alert admin or user about this risky state transition
+ _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+ VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset");
+ break;
+
+ case Running :
+ try {
+ if(vm.getHostId() != null && vm.getHostId().longValue() != vm.getPowerHostId().longValue())
+ s_logger.info("Detected out of band VM migration from host " + vm.getHostId() + " to host " + vm.getPowerHostId());
+ stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+ } catch(NoTransitionException e) {
+ s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+ }
+ break;
+
+ case Stopping :
+ case Stopped :
+ try {
+ stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+ } catch(NoTransitionException e) {
+ s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+ }
+ _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+ VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Running) from out-of-context transition. VM network environment may need to be reset");
+ break;
+
+ case Destroyed :
+ case Expunging :
+ s_logger.info("Receive power on report when VM is in destroyed or expunging state. vm: "
+ + vm.getId() + ", state: " + vm.getState());
+ break;
+
+ case Migrating :
+ try {
+ stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+ } catch(NoTransitionException e) {
+ s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+ }
+ break;
+
+ case Error :
+ default :
+ s_logger.info("Receive power on report when VM is in error or unexpected state. vm: "
+ + vm.getId() + ", state: " + vm.getState());
+ break;
+ }
+ }
+
+ private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
+
+ // 1) handle left-over transitional VM states
+ // 2) handle out of sync stationary states, schedule force-stop to release resources
+ //
+ switch(vm.getState()) {
+ case Starting :
+ case Stopping :
+ case Stopped :
+ case Migrating :
+ try {
+ stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId());
+ } catch(NoTransitionException e) {
+ s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+ }
+ _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+ VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Stopped) from out-of-context transition.");
+ // TODO: we need to forcely release all resource allocation
+ break;
+
+ case Running :
+ case Destroyed :
+ case Expunging :
+ break;
+
+ case Error :
+ default :
+ break;
+ }
+ }
+
+ private void scanStalledVMInTransitionStateOnUpHost(long hostId) {
+ //
+ // Check VM that is stuck in Starting, Stopping, Migrating states, we won't check
+ // VMs in expunging state (this need to be handled specially)
+ //
+ // checking condition
+ // 1) no pending VmWork job
+ // 2) on hostId host and host is UP
+ //
+ // When host is UP, soon or later we will get a report from the host about the VM,
+ // however, if VM is missing from the host report (it may happen in out of band changes
+ // or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync logic
+ //
+ // Therefor, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP
+ // and a VM stalls for status update, we will consider them to be powered off
+ // (which is relatively safe to do so)
+
+ long stallThresholdInMs = VmJobStateReportInterval.value() + (VmJobStateReportInterval.value() >> 1);
+ Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - stallThresholdInMs);
+ List<Long> mostlikelyStoppedVMs = listStalledVMInTransitionStateOnUpHost(hostId, cutTime);
+ for(Long vmId : mostlikelyStoppedVMs) {
+ VMInstanceVO vm = _vmDao.findById(vmId);
+ assert(vm != null);
+ handlePowerOffReportWithNoPendingJobsOnVM(vm);
+ }
+
+ List<Long> vmsWithRecentReport = listVMInTransitionStateWithRecentReportOnUpHost(hostId, cutTime);
+ for(Long vmId : vmsWithRecentReport) {
+ VMInstanceVO vm = _vmDao.findById(vmId);
+ assert(vm != null);
+ if(vm.getPowerState() == PowerState.PowerOn)
+ handlePowerOnReportWithNoPendingJobsOnVM(vm);
+ else
+ handlePowerOffReportWithNoPendingJobsOnVM(vm);
+ }
+ }
+
+ private void scanStalledVMInTransitionStateOnDisconnectedHosts() {
+ Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - VmOpWaitInterval.value()*1000);
+ List<Long> stuckAndUncontrollableVMs = listStalledVMInTransitionStateOnDisconnectedHosts(cutTime);
+ for(Long vmId : stuckAndUncontrollableVMs) {
+ VMInstanceVO vm = _vmDao.findById(vmId);
+
+ // We now only alert administrator about this situation
+ _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+ VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState() + " state and its host is unreachable for too long");
+ }
+ }
+
+
+ // VMs that in transitional state without recent power state report
+ private List<Long> listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) {
+ String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
+ "AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " +
+ "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
+ "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
+
+ List<Long> l = new ArrayList<Long>();
+ TransactionLegacy txn = null;
+ try {
+ txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
+
+ PreparedStatement pstmt = null;
+ try {
+ pstmt = txn.prepareAutoCloseStatement(sql);
+
+ pstmt.setLong(1, hostId);
+ pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+ pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
+ ResultSet rs = pstmt.executeQuery();
+ while(rs.next()) {
+ l.add(rs.getLong(1));
+ }
+ } catch (SQLException e) {
+ } catch (Throwable e) {
+ }
+
+ } finally {
+ if(txn != null)
+ txn.close();
+ }
+ return l;
+ }
+
+ // VMs that in transitional state and recently have power state update
+ private List<Long> listVMInTransitionStateWithRecentReportOnUpHost(long hostId, Date cutTime) {
+ String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
+ "AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " +
+ "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
+ "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
+
+ List<Long> l = new ArrayList<Long>();
+ TransactionLegacy txn = null;
+ try {
+ txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
+ PreparedStatement pstmt = null;
+ try {
+ pstmt = txn.prepareAutoCloseStatement(sql);
+
+ pstmt.setLong(1, hostId);
+ pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+ pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
+ ResultSet rs = pstmt.executeQuery();
+ while(rs.next()) {
+ l.add(rs.getLong(1));
+ }
+ } catch (SQLException e) {
+ } catch (Throwable e) {
+ }
+ return l;
+ } finally {
+ if(txn != null)
+ txn.close();
+ }
+ }
+
+ private List<Long> listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) {
+ String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " +
+ "AND i.power_state_update_time < ? AND i.host_id = h.id " +
+ "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
+ "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
+
+ List<Long> l = new ArrayList<Long>();
+ TransactionLegacy txn = null;
+ try {
+ txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
+ PreparedStatement pstmt = null;
+ try {
+ pstmt = txn.prepareAutoCloseStatement(sql);
+
+ pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+ pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal());
+ ResultSet rs = pstmt.executeQuery();
+ while(rs.next()) {
+ l.add(rs.getLong(1));
+ }
+ } catch (SQLException e) {
+ } catch (Throwable e) {
+ }
+ return l;
+ } finally {
+ if(txn != null)
+ txn.close();
+ }
+ }
+
+ //
+ // VM operation based on new sync model
+ //
+
+ public class VmStateSyncOutcome extends OutcomeImpl<VirtualMachine> {
+ private long _vmId;
+
+ public VmStateSyncOutcome(final AsyncJob job, final PowerState desiredPowerState, final long vmId, final Long srcHostIdForMigration) {
+ super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() {
+ @Override
+ public boolean checkCondition() {
+ VMInstanceVO instance = _vmDao.findById(vmId);
+ if (instance.getPowerState() == desiredPowerState && (srcHostIdForMigration != null && instance.getPowerHostId() != srcHostIdForMigration))
+ return true;
+ return false;
+ }
+ }, Topics.VM_POWER_STATE, AsyncJob.Topics.JOB_STATE);
+ _vmId = vmId;
+ }
+
+ @Override
+ protected VirtualMachine retrieve() {
+ return _vmDao.findById(_vmId);
+ }
+ }
+
+ public class VmJobSyncOutcome extends OutcomeImpl<VirtualMachine> {
+ private long _vmId;
+
+ public VmJobSyncOutcome(final AsyncJob job, final long vmId) {
+ super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() {
+ @Override
+ public boolean checkCondition() {
+ AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
+ assert(jobVo != null);
+ if(jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS)
+ return true;
+
+ return false;
+ }
+ }, AsyncJob.Topics.JOB_STATE);
+ _vmId = vmId;
+ }
+
+ @Override
+ protected VirtualMachine retrieve() {
+ return _vmDao.findById(_vmId);
+ }
+ }
+
+ public Throwable retriveExecutionException(AsyncJob job) {
+ assert(job != null);
+ assert(job.getDispatcher().equals(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER));
+
+ AsyncJobVO jobVo = this._entityMgr.findById(AsyncJobVO.class, job.getId());
+ if(jobVo != null && jobVo.getResult() != null) {
+ Object obj = JobSerializerHelper.fromSerializedString(job.getResult());
+
+ if(obj != null && obj instanceof Throwable)
+ return (Throwable)obj;
+ }
+ return null;
+ }
+
+ public Outcome<VirtualMachine> startVmThroughJobQueue(final String vmUuid,
+ final Map<VirtualMachineProfile.Param, Object> params,
+ final DeploymentPlan planToDeploy) {
+
+ final CallContext context = CallContext.current();
+ final User callingUser = context.getCallingUser();
+ final Account callingAccount = context.getCallingAccount();
+
+ final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+ Transaction.execute(new TransactionCallbackNoReturn () {
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ VmWorkJobVO workJob = null;
+
+ _vmDao.lockRow(vm.getId(), true);
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance,
+ vm.getId(), VmWorkStart.class.getName());
+
+ if (pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+ workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkStart.class.getName());
+
+ workJob.setAccountId(callingAccount.getId());
+ workJob.setUserId(callingUser.getId());
+ workJob.setStep(VmWorkJobVO.Step.Starting);
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkStart workInfo = new VmWorkStart(callingUser.getId(), callingAccount.getId(), vm.getId());
+ workInfo.setPlan(planToDeploy);
+ workInfo.setParams(params);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+ }
+
+ // Transaction syntax sugar has a cost here
+ context.putContextParameter("workJob", workJob);
+ context.putContextParameter("jobId", new Long(vm.getId()));
+ }
+ });
+
+ final long jobId = (Long)context.getContextParameter("jobId");
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+ return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+ VirtualMachine.PowerState.PowerOn, vm.getId(), null);
+ }
+
+ public Outcome<VirtualMachine> stopVmThroughJobQueue(final String vmUuid, final boolean cleanup) {
+ final CallContext context = CallContext.current();
+ final Account account = context.getCallingAccount();
+ final User user = context.getCallingUser();
+
+ final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+ Transaction.execute(new TransactionCallbackNoReturn () {
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ _vmDao.lockRow(vm.getId(), true);
+
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkStop.class.getName());
+
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+ workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkStop.class.getName());
+
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setStep(VmWorkJobVO.Step.Prepare);
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkStop workInfo = new VmWorkStop(user.getId(), account.getId(), vm.getId(), cleanup);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+ }
+
+ context.putContextParameter("workJob", workJob);
+ context.putContextParameter("jobId", new Long(vm.getId()));
+ }
+ });
+
+ final long jobId = (Long)context.getContextParameter("jobId");
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+ return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+ VirtualMachine.PowerState.PowerOff, vm.getId(), null);
+ }
+
+ public Outcome<VirtualMachine> rebootVmThroughJobQueue(final String vmUuid,
+ final Map<VirtualMachineProfile.Param, Object> params) {
+
+ final CallContext context = CallContext.current();
+ final Account account = context.getCallingAccount();
+ final User user = context.getCallingUser();
+
+ final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+ Transaction.execute(new TransactionCallbackNoReturn () {
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ _vmDao.lockRow(vm.getId(), true);
+
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkReboot.class.getName());
+
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+ workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkReboot.class.getName());
+
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setStep(VmWorkJobVO.Step.Prepare);
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkReboot workInfo = new VmWorkReboot(user.getId(), account.getId(), vm.getId(), params);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+ }
+
+ context.putContextParameter("workJob", workJob);
+ context.putContextParameter("jobId", new Long(vm.getId()));
+ }
+ });
+
+ final long jobId = (Long)context.getContextParameter("jobId");
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+ return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+ vm.getId());
+ }
+
+ public Outcome<VirtualMachine> migrateVmThroughJobQueue(final String vmUuid, final long srcHostId, final DeployDestination dest) {
+ final CallContext context = CallContext.current();
+ final User user = context.getCallingUser();
+ final Account account = context.getCallingAccount();
+
+ final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+ Transaction.execute(new TransactionCallbackNoReturn () {
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+
+ _vmDao.lockRow(vm.getId(), true);
+
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkMigrate.class.getName());
+
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+
+ workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkMigrate.class.getName());
+
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkMigrate workInfo = new VmWorkMigrate(user.getId(), account.getId(), vm.getId(), srcHostId, dest);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+ }
+ context.putContextParameter("workJob", workJob);
+ context.putContextParameter("jobId", new Long(vm.getId()));
+ }
+ });
+
+ final long jobId = (Long)context.getContextParameter("jobId");
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+ return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+ VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId());
+ }
+
+ public Outcome<VirtualMachine> migrateVmWithStorageThroughJobQueue(
+ final String vmUuid, final long srcHostId, final long destHostId,
+ final Map<Volume, StoragePool> volumeToPool) {
+
+ final CallContext context = CallContext.current();
+ final User user = context.getCallingUser();
+ final Account account = context.getCallingAccount();
+
+ final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+ Transaction.execute(new TransactionCallbackNoReturn () {
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+
+ _vmDao.lockRow(vm.getId(), true);
+
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkMigrateWithStorage.class.getName());
+
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+
+ workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkMigrate.class.getName());
+
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(),
+ srcHostId, destHostId, volumeToPool);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+ }
+ context.putContextParameter("workJob", workJob);
+ context.putContextParameter("jobId", new Long(vm.getId()));
+ }
+ });
+
+ final long jobId = (Long)context.getContextParameter("jobId");
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+ return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+ VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId);
+ }
+
+ //
+ // TODO build a common pattern to reduce code duplication in following methods
+ // no time for this at current iteration
+ //
+ public Outcome<VirtualMachine> migrateVmForScaleThroughJobQueue(
+ final String vmUuid, final long srcHostId, final DeployDestination dest, final Long newSvcOfferingId) {
+
+ final CallContext context = CallContext.current();
+ final User user = context.getCallingUser();
+ final Account account = context.getCallingAccount();
+
+ final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+ Transaction.execute(new TransactionCallbackNoReturn () {
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+
+ _vmDao.lockRow(vm.getId(), true);
+
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkMigrateForScale.class.getName());
+
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+
+ workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkMigrate.class.getName());
+
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(),
+ srcHostId, dest, newSvcOfferingId);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+ }
+ context.putContextParameter("workJob", workJob);
+ context.putContextParameter("jobId", new Long(vm.getId()));
+ }
+ });
+
+ final long jobId = (Long)context.getContextParameter("jobId");
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+ return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+ }
+
+ public Outcome<VirtualMachine> migrateVmStorageThroughJobQueue(
+ final String vmUuid, final StoragePool destPool) {
+
+ final CallContext context = CallContext.current();
+ final User user = context.getCallingUser();
+ final Account account = context.getCallingAccount();
+
+ final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+ Transaction.execute(new TransactionCallbackNoReturn () {
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+
+ _vmDao.lockRow(vm.getId(), true);
+
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkStorageMigration.class.getName());
+
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+
+ workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkStorageMigration.class.getName());
+
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(),
+ destPool);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+ }
+ context.putContextParameter("workJob", workJob);
+ context.putContextParameter("jobId", new Long(vm.getId()));
+ }
+ });
+
+ final long jobId = (Long)context.getContextParameter("jobId");
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+ return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+ }
+
+ public Outcome<VirtualMachine> addVmToNetworkThroughJobQueue(
+ final VirtualMachine vm, final Network network, final NicProfile requested) {
+
+ final CallContext context = CallContext.current();
+ final User user = context.getCallingUser();
+ final Account account = context.getCallingAccount();
+
+ Transaction.execute(new TransactionCallbackNoReturn () {
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+
+ _vmDao.lockRow(vm.getId(), true);
+
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkAddVmToNetwork.class.getName());
+
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+
+ workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkAddVmToNetwork.class.getName());
+
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(),
+ network, requested);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+ }
+ context.putContextParameter("workJob", workJob);
+ context.putContextParameter("jobId", new Long(vm.getId()));
+ }
+ });
+
+ final long jobId = (Long)context.getContextParameter("jobId");
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+ return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+ }
+
+ public Outcome<VirtualMachine> removeNicFromVmThroughJobQueue(
+ final VirtualMachine vm, final Nic nic) {
+
+ final CallContext context = CallContext.current();
+ final User user = context.getCallingUser();
+ final Account account = context.getCallingAccount();
+
+ Transaction.execute(new TransactionCallbackNoReturn () {
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+
+ _vmDao.lockRow(vm.getId(), true);
+
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkRemoveNicFromVm.class.getName());
+
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+
+ workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkRemoveNicFromVm.class.getName());
+
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(),
+ nic);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+ }
+ context.putContextParameter("workJob", workJob);
+ context.putContextParameter("jobId", new Long(vm.getId()));
+ }
+ });
+
+ final long jobId = (Long)context.getContextParameter("jobId");
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+ return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+ }
+
+ public Outcome<VirtualMachine> removeVmFromNetworkThroughJobQueue(
+ final VirtualMachine vm, final Network network, final URI broadcastUri) {
+
+ final CallContext context = CallContext.current();
+ final User user = context.getCallingUser();
+ final Account account = context.getCallingAccount();
+
+ Transaction.execute(new TransactionCallbackNoReturn () {
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+
+ _vmDao.lockRow(vm.getId(), true);
+
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkRemoveVmFromNetwork.class.getName());
+
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+
+ workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkRemoveVmFromNetwork.class.getName());
+
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(),
+ network, broadcastUri);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+ }
+ context.putContextParameter("workJob", workJob);
+ context.putContextParameter("jobId", new Long(vm.getId()));
+ }
+ });
+
+ final long jobId = (Long)context.getContextParameter("jobId");
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+ return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+ }
+
+ public Outcome<VirtualMachine> reconfigureVmThroughJobQueue(
+ final String vmUuid, final ServiceOffering oldServiceOffering, final boolean reconfiguringOnExistingHost) {
+
+ final CallContext context = CallContext.current();
+ final User user = context.getCallingUser();
+ final Account account = context.getCallingAccount();
+
+ final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+ Transaction.execute(new TransactionCallbackNoReturn () {
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+
+ _vmDao.lockRow(vm.getId(), true);
+
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkReconfigure.class.getName());
+
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+
+ workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkReconfigure.class.getName());
+
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(),
+ oldServiceOffering, reconfiguringOnExistingHost);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+ }
+ context.putContextParameter("workJob", workJob);
+ context.putContextParameter("jobId", new Long(vm.getId()));
+ }
+ });
+
+ final long jobId = (Long)context.getContextParameter("jobId");
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+ return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
index 7a23ddd..dacc8d0 100644
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
@@ -19,7 +19,6 @@ package com.cloud.vm;
import java.util.Map;
import com.cloud.agent.api.HostVmStateReportEntry;
-import com.cloud.vm.VirtualMachine.PowerState;
public interface VirtualMachinePowerStateSync {
@@ -28,5 +27,5 @@ public interface VirtualMachinePowerStateSync {
void processHostVmStateReport(long hostId, Map<String, HostVmStateReportEntry> report);
// to adapt legacy ping report
- void processHostVmStatePingReport(long hostId, Map<String, PowerState> report);
+ void processHostVmStatePingReport(long hostId, Map<String, HostVmStateReportEntry> report);
}