You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by mc...@apache.org on 2014/01/17 23:40:21 UTC

[19/50] [abbrv] git commit: updated refs/heads/rbac to 929fbab

CLOUDSTACK-5696: Fix sync issue with out-of-band changes


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/7164fc6e
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/7164fc6e
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/7164fc6e

Branch: refs/heads/rbac
Commit: 7164fc6e738137b452e89f8889a9cd3f3bdb3c29
Parents: 9aaea28
Author: Kelven Yang <ke...@gmail.com>
Authored: Tue Jan 14 17:32:52 2014 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Wed Jan 15 13:11:02 2014 -0800

----------------------------------------------------------------------
 .../com/cloud/vm/VirtualMachineManagerImpl.java |  16 ++-
 .../cloud/vm/VirtualMachinePowerStateSync.java  |   2 +
 .../vm/VirtualMachinePowerStateSyncImpl.java    |  21 ++--
 .../src/com/cloud/vm/dao/VMInstanceDao.java     |   2 +
 .../src/com/cloud/vm/dao/VMInstanceDaoImpl.java | 117 +++++++++++--------
 .../framework/messagebus/MessageDispatcher.java |  62 ++++++++--
 6 files changed, 150 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 628528a..9894d31 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -61,6 +61,7 @@ import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
 import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
 import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
 import org.apache.cloudstack.framework.messagebus.MessageHandler;
 import org.apache.cloudstack.jobs.JobInfo;
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
@@ -578,6 +579,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
         _agentMgr.registerForHostEvents(this, true, true, true);
 
+        if (VmJobEnabled.value()) {
+            _messageBus.subscribe(VirtualMachineManager.Topics.VM_POWER_STATE, MessageDispatcher.getDispatcher(this));
+        }
+
         return true;
     }
 
@@ -3816,7 +3821,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     //
 
     @MessageHandler(topic = Topics.VM_POWER_STATE)
-    private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) {
+    private void HandlePowerStateReport(String subject, String senderAddress, Object args) {
         assert (args != null);
         Long vmId = (Long)args;
 
@@ -3836,7 +3841,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
                     break;
 
                 // PowerUnknown shouldn't be reported, it is a derived
-                // VM power state from host state (host un-reachable
+                // VM power state from host state (host un-reachable)
                 case PowerUnknown:
                 default:
                     assert (false);
@@ -3846,8 +3851,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
                 s_logger.warn("VM " + vmId + " no longer exists when processing VM state report");
             }
         } else {
-            // TODO, do job wake-up signalling, since currently async job wake-up is not in use
-            // we will skip it for nows
+            // reset VM power state tracking so that we won't lost signal when VM has
+            // been translated to
+            _vmDao.resetVmPowerStateTracking(vmId);
         }
     }
 
@@ -3924,6 +3930,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         switch (vm.getState()) {
         case Starting:
         case Stopping:
+        case Running:
         case Stopped:
         case Migrating:
             try {
@@ -3937,7 +3944,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
             // TODO: we need to forcely release all resource allocation
             break;
 
-        case Running:
         case Destroyed:
         case Expunging:
             break;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
index f84c7b7..152d0d8 100644
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
@@ -28,4 +28,6 @@ public interface VirtualMachinePowerStateSync {
 
     // to adapt legacy ping report
     void processHostVmStatePingReport(long hostId, Map<String, HostVmStateReportEntry> report);
+
+    Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry> states);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
index cd4c3c0..453890c 100644
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
@@ -32,12 +32,9 @@ import com.cloud.vm.dao.VMInstanceDao;
 public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStateSync {
     private static final Logger s_logger = Logger.getLogger(VirtualMachinePowerStateSyncImpl.class);
 
-    @Inject
-    MessageBus _messageBus;
-    @Inject
-    VMInstanceDao _instanceDao;
-    @Inject
-    VirtualMachineManager _vmMgr;
+    @Inject MessageBus _messageBus;
+    @Inject VMInstanceDao _instanceDao;
+    @Inject VirtualMachineManager _vmMgr;
 
     public VirtualMachinePowerStateSyncImpl() {
     }
@@ -53,7 +50,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
         if (s_logger.isDebugEnabled())
             s_logger.debug("Process host VM state report from ping process. host: " + hostId);
 
-        Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report);
+        Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report);
         processReport(hostId, translatedInfo);
     }
 
@@ -62,7 +59,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
         if (s_logger.isDebugEnabled())
             s_logger.debug("Process host VM state report from ping process. host: " + hostId);
 
-        Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report);
+        Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report);
         processReport(hostId, translatedInfo);
     }
 
@@ -74,16 +71,19 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
                 s_logger.debug("VM state report. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue());
 
             if (_instanceDao.updatePowerState(entry.getKey(), hostId, entry.getValue())) {
-
                 if (s_logger.isDebugEnabled())
                     s_logger.debug("VM state report is updated. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue());
 
                 _messageBus.publish(null, VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, entry.getKey());
+            } else {
+                if (s_logger.isDebugEnabled())
+                    s_logger.debug("VM power state does not change, skip DB writing. vm id: " + entry.getKey());
             }
         }
     }
 
-    private Map<Long, VirtualMachine.PowerState> convertToInfos(Map<String, HostVmStateReportEntry> states) {
+    @Override
+    public Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry> states) {
         final HashMap<Long, VirtualMachine.PowerState> map = new HashMap<Long, VirtualMachine.PowerState>();
         if (states == null) {
             return map;
@@ -93,7 +93,6 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
             VMInstanceVO vm = findVM(entry.getKey());
             if (vm != null) {
                 map.put(vm.getId(), entry.getValue().getState());
-                break;
             } else {
                 s_logger.info("Unable to find matched VM in CloudStack DB. name: " + entry.getKey());
             }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
index e6ea4a5..453d222 100644
--- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
+++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
@@ -69,6 +69,8 @@ public interface VMInstanceDao extends GenericDao<VMInstanceVO, Long>, StateDao<
 
     List<VMInstanceVO> findVMInTransition(Date time, State... states);
 
+    List<VMInstanceVO> listByHostAndState(long hostId, State... states);
+
     List<VMInstanceVO> listByTypes(VirtualMachine.Type... types);
 
     VMInstanceVO findByIdTypes(long id, VirtualMachine.Type... types);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
index 605ece3..2f25f57 100644
--- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
+++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
@@ -48,7 +48,11 @@ import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.SearchCriteria.Func;
 import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallback;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
 import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.db.TransactionStatus;
 import com.cloud.utils.db.UpdateBuilder;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.vm.NicVO;
@@ -76,6 +80,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
     protected SearchBuilder<VMInstanceVO> TypesSearch;
     protected SearchBuilder<VMInstanceVO> IdTypesSearch;
     protected SearchBuilder<VMInstanceVO> HostIdTypesSearch;
+    protected SearchBuilder<VMInstanceVO> HostIdStatesSearch;
     protected SearchBuilder<VMInstanceVO> HostIdUpTypesSearch;
     protected SearchBuilder<VMInstanceVO> HostUpSearch;
     protected SearchBuilder<VMInstanceVO> InstanceNameSearch;
@@ -182,6 +187,11 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
         HostIdTypesSearch.and("types", HostIdTypesSearch.entity().getType(), Op.IN);
         HostIdTypesSearch.done();
 
+        HostIdStatesSearch = createSearchBuilder();
+        HostIdStatesSearch.and("hostId", HostIdStatesSearch.entity().getHostId(), Op.EQ);
+        HostIdStatesSearch.and("states", HostIdStatesSearch.entity().getState(), Op.IN);
+        HostIdStatesSearch.done();
+
         HostIdUpTypesSearch = createSearchBuilder();
         HostIdUpTypesSearch.and("hostid", HostIdUpTypesSearch.entity().getHostId(), Op.EQ);
         HostIdUpTypesSearch.and("types", HostIdUpTypesSearch.entity().getType(), Op.IN);
@@ -335,6 +345,15 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
     }
 
     @Override
+    public List<VMInstanceVO> listByHostAndState(long hostId, State... states) {
+        SearchCriteria<VMInstanceVO> sc = HostIdStatesSearch.create();
+        sc.setParameters("hostId", hostId);
+        sc.setParameters("states", (Object[])states);
+
+        return listBy(sc);
+    }
+
+    @Override
     public List<VMInstanceVO> listUpByHostIdTypes(long hostid, Type... types) {
         SearchCriteria<VMInstanceVO> sc = HostIdUpTypesSearch.create();
         sc.setParameters("hostid", hostid);
@@ -702,60 +721,66 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
     }
 
     @Override
-    public boolean updatePowerState(long instanceId, long powerHostId, VirtualMachine.PowerState powerState) {
-        boolean needToUpdate = false;
-        TransactionLegacy txn = TransactionLegacy.currentTxn();
-        txn.start();
-
-        VMInstanceVO instance = findById(instanceId);
-        if (instance != null) {
-            Long savedPowerHostId = instance.getPowerHostId();
-            if (instance.getPowerState() != powerState || savedPowerHostId == null || savedPowerHostId.longValue() != powerHostId) {
-                instance.setPowerState(powerState);
-                instance.setPowerHostId(powerHostId);
-                instance.setPowerStateUpdateCount(1);
-                instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
-                needToUpdate = true;
-                update(instanceId, instance);
-            } else {
-                // to reduce DB updates, consecutive same state update for more than 3 times
-                if (instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
-                    instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1);
-                    instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
-                    needToUpdate = true;
-                    update(instanceId, instance);
+    public boolean updatePowerState(final long instanceId, final long powerHostId, final VirtualMachine.PowerState powerState) {
+        return Transaction.execute(new TransactionCallback<Boolean>() {
+            @Override
+            public Boolean doInTransaction(TransactionStatus status) {
+                boolean needToUpdate = false;
+                VMInstanceVO instance = findById(instanceId);
+                if (instance != null) {
+                    Long savedPowerHostId = instance.getPowerHostId();
+                    if (instance.getPowerState() != powerState || savedPowerHostId == null
+                            || savedPowerHostId.longValue() != powerHostId) {
+                        instance.setPowerState(powerState);
+                        instance.setPowerHostId(powerHostId);
+                        instance.setPowerStateUpdateCount(1);
+                        instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+                        needToUpdate = true;
+                        update(instanceId, instance);
+                    } else {
+                        // to reduce DB updates, consecutive same state update for more than 3 times
+                        if (instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
+                            instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1);
+                            instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+                            needToUpdate = true;
+                            update(instanceId, instance);
+                        }
+                    }
                 }
+                return needToUpdate;
             }
-        }
-
-        txn.commit();
-        return needToUpdate;
+        });
     }
 
     @Override
-    public void resetVmPowerStateTracking(long instanceId) {
-        TransactionLegacy txn = TransactionLegacy.currentTxn();
-        txn.start();
-        VMInstanceVO instance = findById(instanceId);
-        if (instance != null) {
-            instance.setPowerStateUpdateCount(0);
-            instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
-            update(instanceId, instance);
-        }
-
-        txn.commit();
+    public void resetVmPowerStateTracking(final long instanceId) {
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                VMInstanceVO instance = findById(instanceId);
+                if (instance != null) {
+                    instance.setPowerStateUpdateCount(0);
+                    instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+                    update(instanceId, instance);
+                }
+            }
+        });
     }
 
-    @Override
-    @DB
-    public void resetHostPowerStateTracking(long hostId) {
-        SearchCriteria<VMInstanceVO> sc = createSearchCriteria();
-        sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId);
+    @Override @DB
+    public void resetHostPowerStateTracking(final long hostId) {
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                SearchCriteria<VMInstanceVO> sc = createSearchCriteria();
+                sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId);
 
-        VMInstanceVO instance = this.createForUpdate();
-        instance.setPowerStateUpdateCount(0);
-        instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+                VMInstanceVO instance = createForUpdate();
+                instance.setPowerStateUpdateCount(0);
+                instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
 
-        this.update(instance, sc);
+                update(instance, sc);
+            }
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
index a2d9a7b..e93bbc2 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
@@ -20,17 +20,24 @@ package org.apache.cloudstack.framework.messagebus;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.log4j.Logger;
+
 public class MessageDispatcher implements MessageSubscriber {
-    private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
+    private static final Logger s_logger = Logger.getLogger(MessageDispatcher.class);
+
+    private static Map<Class<?>, List<Method>> s_handlerCache = new HashMap<Class<?>, List<Method>>();
 
     private static Map<Object, MessageDispatcher> s_targetMap = new HashMap<Object, MessageDispatcher>();
     private Object _targetObject;
 
     public MessageDispatcher(Object targetObject) {
         _targetObject = targetObject;
+        buildHandlerMethodCache(targetObject.getClass());
     }
 
     @Override
@@ -67,10 +74,13 @@ public class MessageDispatcher implements MessageSubscriber {
         try {
             handler.invoke(target, subject, senderAddress, args);
         } catch (IllegalArgumentException e) {
+            s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
             throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject);
         } catch (IllegalAccessException e) {
+            s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
             throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject);
         } catch (InvocationTargetException e) {
+            s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
             throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject);
         }
 
@@ -79,18 +89,18 @@ public class MessageDispatcher implements MessageSubscriber {
 
     public static Method resolveHandler(Class<?> handlerClz, String subject) {
         synchronized (s_handlerCache) {
-            Method handler = s_handlerCache.get(handlerClz);
-            if (handler != null)
-                return handler;
+            List<Method> handlerList = s_handlerCache.get(handlerClz);
+            if (handlerList != null) {
+                for (Method method : handlerList) {
+                    MessageHandler annotation = method.getAnnotation(MessageHandler.class);
+                    assert (annotation != null);
 
-            for (Method method : handlerClz.getMethods()) {
-                MessageHandler annotation = method.getAnnotation(MessageHandler.class);
-                if (annotation != null) {
                     if (match(annotation.topic(), subject)) {
-                        s_handlerCache.put(handlerClz, method);
                         return method;
                     }
                 }
+            } else {
+                s_logger.error("Handler class " + handlerClz.getName() + " is not registered");
             }
         }
 
@@ -100,4 +110,40 @@ public class MessageDispatcher implements MessageSubscriber {
     private static boolean match(String expression, String param) {
         return param.matches(expression);
     }
+
+    private void buildHandlerMethodCache(Class<?> handlerClz) {
+        if (s_logger.isInfoEnabled())
+            s_logger.info("Build message handler cache for " + handlerClz.getName());
+
+        synchronized (s_handlerCache) {
+            List<Method> handlerList = s_handlerCache.get(handlerClz);
+            if (handlerList == null) {
+                handlerList = new ArrayList<Method>();
+                s_handlerCache.put(handlerClz, handlerList);
+
+                Class<?> clz = handlerClz;
+                while (clz != null && clz != Object.class) {
+                    for (Method method : clz.getDeclaredMethods()) {
+                        MessageHandler annotation = method.getAnnotation(MessageHandler.class);
+                        if (annotation != null) {
+                            // allow private member access via reflection
+                            method.setAccessible(true);
+                            handlerList.add(method);
+
+                            if (s_logger.isInfoEnabled())
+                                s_logger.info("Add message handler " + handlerClz.getName() + "." + method.getName() + " to cache");
+                        }
+                    }
+
+                    clz = clz.getSuperclass();
+                }
+            } else {
+                if (s_logger.isInfoEnabled())
+                    s_logger.info("Message handler for class " + handlerClz.getName() + " is already in cache");
+            }
+        }
+
+        if (s_logger.isInfoEnabled())
+            s_logger.info("Done building message handler cache for " + handlerClz.getName());
+    }
 }