You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2014/01/15 02:33:56 UTC
git commit: updated refs/heads/4.3 to 7a8c0e3
Updated Branches:
refs/heads/4.3 45065c712 -> 7a8c0e3ae
CLOUDSTACK-5696: Fix sync issue with out-of-band changes
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/7a8c0e3a
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/7a8c0e3a
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/7a8c0e3a
Branch: refs/heads/4.3
Commit: 7a8c0e3ae05fb22894c1ea3b61e77a1b8d8ec32b
Parents: 45065c7
Author: Kelven Yang <ke...@gmail.com>
Authored: Tue Jan 14 17:32:52 2014 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Tue Jan 14 17:33:15 2014 -0800
----------------------------------------------------------------------
.../com/cloud/vm/VirtualMachineManagerImpl.java | 16 ++-
.../cloud/vm/VirtualMachinePowerStateSync.java | 10 +-
.../vm/VirtualMachinePowerStateSyncImpl.java | 36 ++---
.../src/com/cloud/vm/dao/VMInstanceDao.java | 6 +-
.../src/com/cloud/vm/dao/VMInstanceDaoImpl.java | 140 +++++++++++--------
.../framework/messagebus/MessageDispatcher.java | 97 +++++++++----
6 files changed, 193 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 17f0936..af65ac6 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -62,6 +62,7 @@ import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
import org.apache.cloudstack.framework.messagebus.MessageHandler;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
@@ -575,6 +576,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_agentMgr.registerForHostEvents(this, true, true, true);
+ if (VmJobEnabled.value()) {
+ _messageBus.subscribe(VirtualMachineManager.Topics.VM_POWER_STATE, MessageDispatcher.getDispatcher(this));
+ }
+
return true;
}
@@ -3815,7 +3820,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
//
@MessageHandler(topic = Topics.VM_POWER_STATE)
- private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) {
+ private void HandlePowerStateReport(String subject, String senderAddress, Object args) {
assert(args != null);
Long vmId = (Long)args;
@@ -3835,7 +3840,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
break;
// PowerUnknown shouldn't be reported, it is a derived
- // VM power state from host state (host un-reachable
+ // VM power state from host state (host un-reachable)
case PowerUnknown :
default :
assert(false);
@@ -3845,8 +3850,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
s_logger.warn("VM " + vmId + " no longer exists when processing VM state report");
}
} else {
- // TODO, do job wake-up signalling, since currently async job wake-up is not in use
- // we will skip it for nows
+ // reset VM power state tracking so that we won't lost signal when VM has
+ // been translated to
+ _vmDao.resetVmPowerStateTracking(vmId);
}
}
@@ -3921,6 +3927,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
switch(vm.getState()) {
case Starting :
case Stopping :
+ case Running:
case Stopped :
case Migrating :
try {
@@ -3933,7 +3940,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
// TODO: we need to forcely release all resource allocation
break;
- case Running :
case Destroyed :
case Expunging :
break;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
index dacc8d0..d7aef72 100644
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
@@ -5,7 +5,7 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
@@ -21,11 +21,13 @@ import java.util.Map;
import com.cloud.agent.api.HostVmStateReportEntry;
public interface VirtualMachinePowerStateSync {
-
+
void resetHostSyncState(long hostId);
-
+
void processHostVmStateReport(long hostId, Map<String, HostVmStateReportEntry> report);
-
+
// to adapt legacy ping report
void processHostVmStatePingReport(long hostId, Map<String, HostVmStateReportEntry> report);
+
+ Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry> states);
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
index 9aa9501..9a0119c 100644
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
@@ -35,22 +35,22 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
@Inject MessageBus _messageBus;
@Inject VMInstanceDao _instanceDao;
@Inject VirtualMachineManager _vmMgr;
-
+
public VirtualMachinePowerStateSyncImpl() {
}
-
+
@Override
public void resetHostSyncState(long hostId) {
s_logger.info("Reset VM power state sync for host: " + hostId);
_instanceDao.resetHostPowerStateTracking(hostId);
}
-
+
@Override
public void processHostVmStateReport(long hostId, Map<String, HostVmStateReportEntry> report) {
if(s_logger.isDebugEnabled())
s_logger.debug("Process host VM state report from ping process. host: " + hostId);
-
- Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report);
+
+ Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report);
processReport(hostId, translatedInfo);
}
@@ -58,39 +58,41 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
public void processHostVmStatePingReport(long hostId, Map<String, HostVmStateReportEntry> report) {
if(s_logger.isDebugEnabled())
s_logger.debug("Process host VM state report from ping process. host: " + hostId);
-
- Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report);
+
+ Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report);
processReport(hostId, translatedInfo);
}
-
+
private void processReport(long hostId, Map<Long, VirtualMachine.PowerState> translatedInfo) {
-
+
for(Map.Entry<Long, VirtualMachine.PowerState> entry : translatedInfo.entrySet()) {
-
+
if(s_logger.isDebugEnabled())
s_logger.debug("VM state report. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue());
if(_instanceDao.updatePowerState(entry.getKey(), hostId, entry.getValue())) {
-
if(s_logger.isDebugEnabled())
s_logger.debug("VM state report is updated. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue());
-
+
_messageBus.publish(null, VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, entry.getKey());
+ } else {
+ if (s_logger.isDebugEnabled())
+ s_logger.debug("VM power state does not change, skip DB writing. vm id: " + entry.getKey());
}
}
}
-
- private Map<Long, VirtualMachine.PowerState> convertToInfos(Map<String, HostVmStateReportEntry> states) {
+
+ @Override
+ public Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry> states) {
final HashMap<Long, VirtualMachine.PowerState> map = new HashMap<Long, VirtualMachine.PowerState>();
if (states == null) {
return map;
}
-
+
for (Map.Entry<String, HostVmStateReportEntry> entry : states.entrySet()) {
VMInstanceVO vm = findVM(entry.getKey());
if(vm != null) {
map.put(vm.getId(), entry.getValue().getState());
- break;
} else {
s_logger.info("Unable to find matched VM in CloudStack DB. name: " + entry.getKey());
}
@@ -98,7 +100,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
return map;
}
-
+
private VMInstanceVO findVM(String vmName) {
return _instanceDao.findVMByInstanceName(vmName);
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
index 830dea8..78c6e8c 100644
--- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
+++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
@@ -70,6 +70,8 @@ public interface VMInstanceDao extends GenericDao<VMInstanceVO, Long>, StateDao<
List<VMInstanceVO> findVMInTransition(Date time, State... states);
+ List<VMInstanceVO> listByHostAndState(long hostId, State... states);
+
List<VMInstanceVO> listByTypes(VirtualMachine.Type... types);
VMInstanceVO findByIdTypes(long id, VirtualMachine.Type... types);
@@ -123,8 +125,8 @@ public interface VMInstanceDao extends GenericDao<VMInstanceVO, Long>, StateDao<
List<VMInstanceVO> listStartingWithNoHostId();
boolean updatePowerState(long instanceId, long powerHostId, VirtualMachine.PowerState powerState);
-
+
void resetVmPowerStateTracking(long instanceId);
-
+
void resetHostPowerStateTracking(long hostId);
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
index e7f907e..cc747bc 100644
--- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
+++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
@@ -49,7 +49,11 @@ import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Func;
import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallback;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.db.TransactionStatus;
import com.cloud.utils.db.UpdateBuilder;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.vm.NicVO;
@@ -65,7 +69,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
public static final Logger s_logger = Logger.getLogger(VMInstanceDaoImpl.class);
private static final int MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT = 3;
-
+
protected SearchBuilder<VMInstanceVO> VMClusterSearch;
protected SearchBuilder<VMInstanceVO> LHVMClusterSearch;
protected SearchBuilder<VMInstanceVO> IdStatesSearch;
@@ -77,6 +81,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
protected SearchBuilder<VMInstanceVO> TypesSearch;
protected SearchBuilder<VMInstanceVO> IdTypesSearch;
protected SearchBuilder<VMInstanceVO> HostIdTypesSearch;
+ protected SearchBuilder<VMInstanceVO> HostIdStatesSearch;
protected SearchBuilder<VMInstanceVO> HostIdUpTypesSearch;
protected SearchBuilder<VMInstanceVO> HostUpSearch;
protected SearchBuilder<VMInstanceVO> InstanceNameSearch;
@@ -180,6 +185,11 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
HostIdTypesSearch.and("types", HostIdTypesSearch.entity().getType(), Op.IN);
HostIdTypesSearch.done();
+ HostIdStatesSearch = createSearchBuilder();
+ HostIdStatesSearch.and("hostId", HostIdStatesSearch.entity().getHostId(), Op.EQ);
+ HostIdStatesSearch.and("states", HostIdStatesSearch.entity().getState(), Op.IN);
+ HostIdStatesSearch.done();
+
HostIdUpTypesSearch = createSearchBuilder();
HostIdUpTypesSearch.and("hostid", HostIdUpTypesSearch.entity().getHostId(), Op.EQ);
HostIdUpTypesSearch.and("types", HostIdUpTypesSearch.entity().getType(), Op.IN);
@@ -230,7 +240,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
_updateTimeAttr = _allAttributes.get("updateTime");
assert _updateTimeAttr != null : "Couldn't get this updateTime attribute";
-
+
SearchBuilder<NicVO> nicSearch = _nicDao.createSearchBuilder();
nicSearch.and("networkId", nicSearch.entity().getNetworkId(), SearchCriteria.Op.EQ);
@@ -242,7 +252,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
DistinctHostNameSearch.join("nicSearch", nicSearch, DistinctHostNameSearch.entity().getId(),
nicSearch.entity().getInstanceId(), JoinBuilder.JoinType.INNER);
DistinctHostNameSearch.done();
-
+
}
@Override
@@ -335,6 +345,15 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
}
@Override
+ public List<VMInstanceVO> listByHostAndState(long hostId, State... states) {
+ SearchCriteria<VMInstanceVO> sc = HostIdStatesSearch.create();
+ sc.setParameters("hostId", hostId);
+ sc.setParameters("states", (Object[])states);
+
+ return listBy(sc);
+ }
+
+ @Override
public List<VMInstanceVO> listUpByHostIdTypes(long hostid, Type... types) {
SearchCriteria<VMInstanceVO> sc = HostIdUpTypesSearch.create();
sc.setParameters("hostid", hostid);
@@ -679,63 +698,68 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
sc.setParameters("state", State.Starting);
return listBy(sc);
}
-
- @Override
- public boolean updatePowerState(long instanceId, long powerHostId, VirtualMachine.PowerState powerState) {
- boolean needToUpdate = false;
- TransactionLegacy txn = TransactionLegacy.currentTxn();
- txn.start();
-
- VMInstanceVO instance = findById(instanceId);
- if(instance != null) {
- Long savedPowerHostId = instance.getPowerHostId();
- if(instance.getPowerState() != powerState || savedPowerHostId == null
- || savedPowerHostId.longValue() != powerHostId) {
- instance.setPowerState(powerState);
- instance.setPowerHostId(powerHostId);
- instance.setPowerStateUpdateCount(1);
- instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
- needToUpdate = true;
- update(instanceId, instance);
- } else {
- // to reduce DB updates, consecutive same state update for more than 3 times
- if(instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
- instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1);
- instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
- needToUpdate = true;
- update(instanceId, instance);
- }
- }
- }
-
- txn.commit();
- return needToUpdate;
- }
-
+
@Override
- public void resetVmPowerStateTracking(long instanceId) {
- TransactionLegacy txn = TransactionLegacy.currentTxn();
- txn.start();
- VMInstanceVO instance = findById(instanceId);
- if(instance != null) {
- instance.setPowerStateUpdateCount(0);
- instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
- update(instanceId, instance);
- }
-
- txn.commit();
+ public boolean updatePowerState(final long instanceId, final long powerHostId, final VirtualMachine.PowerState powerState) {
+ return Transaction.execute(new TransactionCallback<Boolean>() {
+ @Override
+ public Boolean doInTransaction(TransactionStatus status) {
+ boolean needToUpdate = false;
+ VMInstanceVO instance = findById(instanceId);
+ if(instance != null) {
+ Long savedPowerHostId = instance.getPowerHostId();
+ if(instance.getPowerState() != powerState || savedPowerHostId == null
+ || savedPowerHostId.longValue() != powerHostId) {
+ instance.setPowerState(powerState);
+ instance.setPowerHostId(powerHostId);
+ instance.setPowerStateUpdateCount(1);
+ instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+ needToUpdate = true;
+ update(instanceId, instance);
+ } else {
+ // to reduce DB updates, consecutive same state update for more than 3 times
+ if(instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
+ instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1);
+ instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+ needToUpdate = true;
+ update(instanceId, instance);
+ }
+ }
+ }
+ return needToUpdate;
+ }
+ });
+ }
+
+ @Override
+ public void resetVmPowerStateTracking(final long instanceId) {
+ Transaction.execute(new TransactionCallbackNoReturn() {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ VMInstanceVO instance = findById(instanceId);
+ if (instance != null) {
+ instance.setPowerStateUpdateCount(0);
+ instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+ update(instanceId, instance);
+ }
+ }
+ });
}
-
-
+
@Override @DB
- public void resetHostPowerStateTracking(long hostId) {
- SearchCriteria<VMInstanceVO> sc = createSearchCriteria();
- sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId);
-
- VMInstanceVO instance = this.createForUpdate();
- instance.setPowerStateUpdateCount(0);
- instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
-
- this.update(instance, sc);
+ public void resetHostPowerStateTracking(final long hostId) {
+ Transaction.execute(new TransactionCallbackNoReturn() {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ SearchCriteria<VMInstanceVO> sc = createSearchCriteria();
+ sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId);
+
+ VMInstanceVO instance = createForUpdate();
+ instance.setPowerStateUpdateCount(0);
+ instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+
+ update(instance, sc);
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
index ac75afb..e83c5ee 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
@@ -20,25 +20,31 @@ package org.apache.cloudstack.framework.messagebus;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.log4j.Logger;
public class MessageDispatcher implements MessageSubscriber {
- private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
-
+ private static final Logger s_logger = Logger.getLogger(MessageDispatcher.class);
+
+ private static Map<Class<?>, List<Method>> s_handlerCache = new HashMap<Class<?>, List<Method>>();
+
private static Map<Object, MessageDispatcher> s_targetMap = new HashMap<Object, MessageDispatcher>();
private Object _targetObject;
-
+
public MessageDispatcher(Object targetObject) {
_targetObject = targetObject;
+ buildHandlerMethodCache(targetObject.getClass());
}
-
+
@Override
public void onPublishMessage(String senderAddress, String subject, Object args) {
dispatch(_targetObject, subject, senderAddress, args);
}
-
+
public static MessageDispatcher getDispatcher(Object targetObject) {
MessageDispatcher dispatcher;
synchronized(s_targetMap) {
@@ -50,55 +56,94 @@ public class MessageDispatcher implements MessageSubscriber {
}
return dispatcher;
}
-
+
public static void removeDispatcher(Object targetObject) {
synchronized(s_targetMap) {
s_targetMap.remove(targetObject);
}
}
-
+
public static boolean dispatch(Object target, String subject, String senderAddress, Object args) {
assert(subject != null);
assert(target != null);
-
+
Method handler = resolveHandler(target.getClass(), subject);
if(handler == null)
return false;
-
+
try {
handler.invoke(target, subject, senderAddress, args);
} catch (IllegalArgumentException e) {
+ s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject);
} catch (IllegalAccessException e) {
+ s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject);
} catch (InvocationTargetException e) {
+ s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject);
}
-
+
return true;
}
-
+
public static Method resolveHandler(Class<?> handlerClz, String subject) {
synchronized(s_handlerCache) {
- Method handler = s_handlerCache.get(handlerClz);
- if(handler != null)
- return handler;
-
- for(Method method : handlerClz.getMethods()) {
- MessageHandler annotation = method.getAnnotation(MessageHandler.class);
- if(annotation != null) {
- if(match(annotation.topic(), subject)) {
- s_handlerCache.put(handlerClz, method);
- return method;
- }
- }
- }
+ List<Method> handlerList = s_handlerCache.get(handlerClz);
+ if (handlerList != null) {
+ for (Method method : handlerList) {
+ MessageHandler annotation = method.getAnnotation(MessageHandler.class);
+ assert (annotation != null);
+
+ if (match(annotation.topic(), subject)) {
+ return method;
+ }
+ }
+ } else {
+ s_logger.error("Handler class " + handlerClz.getName() + " is not registered");
+ }
}
-
+
return null;
}
-
+
private static boolean match(String expression, String param) {
return param.matches(expression);
}
+
+ private void buildHandlerMethodCache(Class<?> handlerClz) {
+ if (s_logger.isInfoEnabled())
+ s_logger.info("Build message handler cache for " + handlerClz.getName());
+
+ synchronized (s_handlerCache) {
+ List<Method> handlerList = s_handlerCache.get(handlerClz);
+ if (handlerList == null) {
+ handlerList = new ArrayList<Method>();
+ s_handlerCache.put(handlerClz, handlerList);
+
+ Class<?> clz = handlerClz;
+ while (clz != null && clz != Object.class) {
+ for (Method method : clz.getDeclaredMethods()) {
+ MessageHandler annotation = method.getAnnotation(MessageHandler.class);
+ if (annotation != null) {
+ // allow private member access via reflection
+ method.setAccessible(true);
+ handlerList.add(method);
+
+ if (s_logger.isInfoEnabled())
+ s_logger.info("Add message handler " + handlerClz.getName() + "." + method.getName() + " to cache");
+ }
+ }
+
+ clz = clz.getSuperclass();
+ }
+ } else {
+ if (s_logger.isInfoEnabled())
+ s_logger.info("Message handler for class " + handlerClz.getName() + " is already in cache");
+ }
+ }
+
+ if (s_logger.isInfoEnabled())
+ s_logger.info("Done building message handler cache for " + handlerClz.getName());
+ }
}