You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2014/01/15 22:11:28 UTC
git commit: updated refs/heads/master to 7164fc6
Updated Branches:
refs/heads/master 9aaea28d0 -> 7164fc6e7
CLOUDSTACK-5696: Fix sync issue with out-of-band changes
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/7164fc6e
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/7164fc6e
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/7164fc6e
Branch: refs/heads/master
Commit: 7164fc6e738137b452e89f8889a9cd3f3bdb3c29
Parents: 9aaea28
Author: Kelven Yang <ke...@gmail.com>
Authored: Tue Jan 14 17:32:52 2014 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Wed Jan 15 13:11:02 2014 -0800
----------------------------------------------------------------------
.../com/cloud/vm/VirtualMachineManagerImpl.java | 16 ++-
.../cloud/vm/VirtualMachinePowerStateSync.java | 2 +
.../vm/VirtualMachinePowerStateSyncImpl.java | 21 ++--
.../src/com/cloud/vm/dao/VMInstanceDao.java | 2 +
.../src/com/cloud/vm/dao/VMInstanceDaoImpl.java | 117 +++++++++++--------
.../framework/messagebus/MessageDispatcher.java | 62 ++++++++--
6 files changed, 150 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 628528a..9894d31 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -61,6 +61,7 @@ import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
import org.apache.cloudstack.framework.messagebus.MessageHandler;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
@@ -578,6 +579,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_agentMgr.registerForHostEvents(this, true, true, true);
+ if (VmJobEnabled.value()) {
+ _messageBus.subscribe(VirtualMachineManager.Topics.VM_POWER_STATE, MessageDispatcher.getDispatcher(this));
+ }
+
return true;
}
@@ -3816,7 +3821,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
//
@MessageHandler(topic = Topics.VM_POWER_STATE)
- private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) {
+ private void HandlePowerStateReport(String subject, String senderAddress, Object args) {
assert (args != null);
Long vmId = (Long)args;
@@ -3836,7 +3841,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
break;
// PowerUnknown shouldn't be reported, it is a derived
- // VM power state from host state (host un-reachable
+ // VM power state from host state (host un-reachable)
case PowerUnknown:
default:
assert (false);
@@ -3846,8 +3851,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
s_logger.warn("VM " + vmId + " no longer exists when processing VM state report");
}
} else {
- // TODO, do job wake-up signalling, since currently async job wake-up is not in use
- // we will skip it for nows
+ // reset VM power state tracking so that we won't lost signal when VM has
+ // been translated to
+ _vmDao.resetVmPowerStateTracking(vmId);
}
}
@@ -3924,6 +3930,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
switch (vm.getState()) {
case Starting:
case Stopping:
+ case Running:
case Stopped:
case Migrating:
try {
@@ -3937,7 +3944,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
// TODO: we need to forcely release all resource allocation
break;
- case Running:
case Destroyed:
case Expunging:
break;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
index f84c7b7..152d0d8 100644
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
@@ -28,4 +28,6 @@ public interface VirtualMachinePowerStateSync {
// to adapt legacy ping report
void processHostVmStatePingReport(long hostId, Map<String, HostVmStateReportEntry> report);
+
+ Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry> states);
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
index cd4c3c0..453890c 100644
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
@@ -32,12 +32,9 @@ import com.cloud.vm.dao.VMInstanceDao;
public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStateSync {
private static final Logger s_logger = Logger.getLogger(VirtualMachinePowerStateSyncImpl.class);
- @Inject
- MessageBus _messageBus;
- @Inject
- VMInstanceDao _instanceDao;
- @Inject
- VirtualMachineManager _vmMgr;
+ @Inject MessageBus _messageBus;
+ @Inject VMInstanceDao _instanceDao;
+ @Inject VirtualMachineManager _vmMgr;
public VirtualMachinePowerStateSyncImpl() {
}
@@ -53,7 +50,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
if (s_logger.isDebugEnabled())
s_logger.debug("Process host VM state report from ping process. host: " + hostId);
- Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report);
+ Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report);
processReport(hostId, translatedInfo);
}
@@ -62,7 +59,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
if (s_logger.isDebugEnabled())
s_logger.debug("Process host VM state report from ping process. host: " + hostId);
- Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report);
+ Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report);
processReport(hostId, translatedInfo);
}
@@ -74,16 +71,19 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
s_logger.debug("VM state report. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue());
if (_instanceDao.updatePowerState(entry.getKey(), hostId, entry.getValue())) {
-
if (s_logger.isDebugEnabled())
s_logger.debug("VM state report is updated. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue());
_messageBus.publish(null, VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, entry.getKey());
+ } else {
+ if (s_logger.isDebugEnabled())
+ s_logger.debug("VM power state does not change, skip DB writing. vm id: " + entry.getKey());
}
}
}
- private Map<Long, VirtualMachine.PowerState> convertToInfos(Map<String, HostVmStateReportEntry> states) {
+ @Override
+ public Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry> states) {
final HashMap<Long, VirtualMachine.PowerState> map = new HashMap<Long, VirtualMachine.PowerState>();
if (states == null) {
return map;
@@ -93,7 +93,6 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
VMInstanceVO vm = findVM(entry.getKey());
if (vm != null) {
map.put(vm.getId(), entry.getValue().getState());
- break;
} else {
s_logger.info("Unable to find matched VM in CloudStack DB. name: " + entry.getKey());
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
index e6ea4a5..453d222 100644
--- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
+++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
@@ -69,6 +69,8 @@ public interface VMInstanceDao extends GenericDao<VMInstanceVO, Long>, StateDao<
List<VMInstanceVO> findVMInTransition(Date time, State... states);
+ List<VMInstanceVO> listByHostAndState(long hostId, State... states);
+
List<VMInstanceVO> listByTypes(VirtualMachine.Type... types);
VMInstanceVO findByIdTypes(long id, VirtualMachine.Type... types);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
index 605ece3..2f25f57 100644
--- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
+++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
@@ -48,7 +48,11 @@ import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Func;
import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallback;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.db.TransactionStatus;
import com.cloud.utils.db.UpdateBuilder;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.vm.NicVO;
@@ -76,6 +80,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
protected SearchBuilder<VMInstanceVO> TypesSearch;
protected SearchBuilder<VMInstanceVO> IdTypesSearch;
protected SearchBuilder<VMInstanceVO> HostIdTypesSearch;
+ protected SearchBuilder<VMInstanceVO> HostIdStatesSearch;
protected SearchBuilder<VMInstanceVO> HostIdUpTypesSearch;
protected SearchBuilder<VMInstanceVO> HostUpSearch;
protected SearchBuilder<VMInstanceVO> InstanceNameSearch;
@@ -182,6 +187,11 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
HostIdTypesSearch.and("types", HostIdTypesSearch.entity().getType(), Op.IN);
HostIdTypesSearch.done();
+ HostIdStatesSearch = createSearchBuilder();
+ HostIdStatesSearch.and("hostId", HostIdStatesSearch.entity().getHostId(), Op.EQ);
+ HostIdStatesSearch.and("states", HostIdStatesSearch.entity().getState(), Op.IN);
+ HostIdStatesSearch.done();
+
HostIdUpTypesSearch = createSearchBuilder();
HostIdUpTypesSearch.and("hostid", HostIdUpTypesSearch.entity().getHostId(), Op.EQ);
HostIdUpTypesSearch.and("types", HostIdUpTypesSearch.entity().getType(), Op.IN);
@@ -335,6 +345,15 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
}
@Override
+ public List<VMInstanceVO> listByHostAndState(long hostId, State... states) {
+ SearchCriteria<VMInstanceVO> sc = HostIdStatesSearch.create();
+ sc.setParameters("hostId", hostId);
+ sc.setParameters("states", (Object[])states);
+
+ return listBy(sc);
+ }
+
+ @Override
public List<VMInstanceVO> listUpByHostIdTypes(long hostid, Type... types) {
SearchCriteria<VMInstanceVO> sc = HostIdUpTypesSearch.create();
sc.setParameters("hostid", hostid);
@@ -702,60 +721,66 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
}
@Override
- public boolean updatePowerState(long instanceId, long powerHostId, VirtualMachine.PowerState powerState) {
- boolean needToUpdate = false;
- TransactionLegacy txn = TransactionLegacy.currentTxn();
- txn.start();
-
- VMInstanceVO instance = findById(instanceId);
- if (instance != null) {
- Long savedPowerHostId = instance.getPowerHostId();
- if (instance.getPowerState() != powerState || savedPowerHostId == null || savedPowerHostId.longValue() != powerHostId) {
- instance.setPowerState(powerState);
- instance.setPowerHostId(powerHostId);
- instance.setPowerStateUpdateCount(1);
- instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
- needToUpdate = true;
- update(instanceId, instance);
- } else {
- // to reduce DB updates, consecutive same state update for more than 3 times
- if (instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
- instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1);
- instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
- needToUpdate = true;
- update(instanceId, instance);
+ public boolean updatePowerState(final long instanceId, final long powerHostId, final VirtualMachine.PowerState powerState) {
+ return Transaction.execute(new TransactionCallback<Boolean>() {
+ @Override
+ public Boolean doInTransaction(TransactionStatus status) {
+ boolean needToUpdate = false;
+ VMInstanceVO instance = findById(instanceId);
+ if (instance != null) {
+ Long savedPowerHostId = instance.getPowerHostId();
+ if (instance.getPowerState() != powerState || savedPowerHostId == null
+ || savedPowerHostId.longValue() != powerHostId) {
+ instance.setPowerState(powerState);
+ instance.setPowerHostId(powerHostId);
+ instance.setPowerStateUpdateCount(1);
+ instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+ needToUpdate = true;
+ update(instanceId, instance);
+ } else {
+ // to reduce DB updates, consecutive same state update for more than 3 times
+ if (instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
+ instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1);
+ instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+ needToUpdate = true;
+ update(instanceId, instance);
+ }
+ }
}
+ return needToUpdate;
}
- }
-
- txn.commit();
- return needToUpdate;
+ });
}
@Override
- public void resetVmPowerStateTracking(long instanceId) {
- TransactionLegacy txn = TransactionLegacy.currentTxn();
- txn.start();
- VMInstanceVO instance = findById(instanceId);
- if (instance != null) {
- instance.setPowerStateUpdateCount(0);
- instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
- update(instanceId, instance);
- }
-
- txn.commit();
+ public void resetVmPowerStateTracking(final long instanceId) {
+ Transaction.execute(new TransactionCallbackNoReturn() {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ VMInstanceVO instance = findById(instanceId);
+ if (instance != null) {
+ instance.setPowerStateUpdateCount(0);
+ instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+ update(instanceId, instance);
+ }
+ }
+ });
}
- @Override
- @DB
- public void resetHostPowerStateTracking(long hostId) {
- SearchCriteria<VMInstanceVO> sc = createSearchCriteria();
- sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId);
+ @Override @DB
+ public void resetHostPowerStateTracking(final long hostId) {
+ Transaction.execute(new TransactionCallbackNoReturn() {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ SearchCriteria<VMInstanceVO> sc = createSearchCriteria();
+ sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId);
- VMInstanceVO instance = this.createForUpdate();
- instance.setPowerStateUpdateCount(0);
- instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+ VMInstanceVO instance = createForUpdate();
+ instance.setPowerStateUpdateCount(0);
+ instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
- this.update(instance, sc);
+ update(instance, sc);
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
index a2d9a7b..e93bbc2 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
@@ -20,17 +20,24 @@ package org.apache.cloudstack.framework.messagebus;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.log4j.Logger;
+
public class MessageDispatcher implements MessageSubscriber {
- private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
+ private static final Logger s_logger = Logger.getLogger(MessageDispatcher.class);
+
+ private static Map<Class<?>, List<Method>> s_handlerCache = new HashMap<Class<?>, List<Method>>();
private static Map<Object, MessageDispatcher> s_targetMap = new HashMap<Object, MessageDispatcher>();
private Object _targetObject;
public MessageDispatcher(Object targetObject) {
_targetObject = targetObject;
+ buildHandlerMethodCache(targetObject.getClass());
}
@Override
@@ -67,10 +74,13 @@ public class MessageDispatcher implements MessageSubscriber {
try {
handler.invoke(target, subject, senderAddress, args);
} catch (IllegalArgumentException e) {
+ s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject);
} catch (IllegalAccessException e) {
+ s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject);
} catch (InvocationTargetException e) {
+ s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject);
}
@@ -79,18 +89,18 @@ public class MessageDispatcher implements MessageSubscriber {
public static Method resolveHandler(Class<?> handlerClz, String subject) {
synchronized (s_handlerCache) {
- Method handler = s_handlerCache.get(handlerClz);
- if (handler != null)
- return handler;
+ List<Method> handlerList = s_handlerCache.get(handlerClz);
+ if (handlerList != null) {
+ for (Method method : handlerList) {
+ MessageHandler annotation = method.getAnnotation(MessageHandler.class);
+ assert (annotation != null);
- for (Method method : handlerClz.getMethods()) {
- MessageHandler annotation = method.getAnnotation(MessageHandler.class);
- if (annotation != null) {
if (match(annotation.topic(), subject)) {
- s_handlerCache.put(handlerClz, method);
return method;
}
}
+ } else {
+ s_logger.error("Handler class " + handlerClz.getName() + " is not registered");
}
}
@@ -100,4 +110,40 @@ public class MessageDispatcher implements MessageSubscriber {
private static boolean match(String expression, String param) {
return param.matches(expression);
}
+
+ private void buildHandlerMethodCache(Class<?> handlerClz) {
+ if (s_logger.isInfoEnabled())
+ s_logger.info("Build message handler cache for " + handlerClz.getName());
+
+ synchronized (s_handlerCache) {
+ List<Method> handlerList = s_handlerCache.get(handlerClz);
+ if (handlerList == null) {
+ handlerList = new ArrayList<Method>();
+ s_handlerCache.put(handlerClz, handlerList);
+
+ Class<?> clz = handlerClz;
+ while (clz != null && clz != Object.class) {
+ for (Method method : clz.getDeclaredMethods()) {
+ MessageHandler annotation = method.getAnnotation(MessageHandler.class);
+ if (annotation != null) {
+ // allow private member access via reflection
+ method.setAccessible(true);
+ handlerList.add(method);
+
+ if (s_logger.isInfoEnabled())
+ s_logger.info("Add message handler " + handlerClz.getName() + "." + method.getName() + " to cache");
+ }
+ }
+
+ clz = clz.getSuperclass();
+ }
+ } else {
+ if (s_logger.isInfoEnabled())
+ s_logger.info("Message handler for class " + handlerClz.getName() + " is already in cache");
+ }
+ }
+
+ if (s_logger.isInfoEnabled())
+ s_logger.info("Done building message handler cache for " + handlerClz.getName());
+ }
}