You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by da...@apache.org on 2023/11/20 12:18:22 UTC
(cloudstack) branch 6778-if-kafka-is-turned-on-internal-subs-dont-work updated: injection code smell removed
This is an automated email from the ASF dual-hosted git repository.
dahn pushed a commit to branch 6778-if-kafka-is-turned-on-internal-subs-dont-work
in repository https://gitbox.apache.org/repos/asf/cloudstack.git
The following commit(s) were added to refs/heads/6778-if-kafka-is-turned-on-internal-subs-dont-work by this push:
new 7c8fa45a115 injection code smell removed
7c8fa45a115 is described below
commit 7c8fa45a115f1018644b6a0c0c817e65e1c09527
Author: Daan Hoogland <da...@onecht.net>
AuthorDate: Mon Nov 20 13:18:11 2023 +0100
injection code smell removed
---
.../com/cloud/network/NetworkStateListener.java | 11 +++++++++--
.../framework/events/EventDistributorImpl.java | 3 +++
.../apache/cloudstack/mom/kafka/KafkaEventBus.java | 4 ++--
.../network/contrail/management/EventUtils.java | 22 ++++++++++++----------
.../storage/listener/SnapshotStateListener.java | 16 ++++++++++++----
.../storage/listener/VolumeStateListener.java | 15 ++++++++++-----
.../java/com/cloud/vm/UserVmStateListener.java | 12 ++++++++++--
.../cloud/utils/component/ComponentContext.java | 3 ++-
8 files changed, 60 insertions(+), 26 deletions(-)
diff --git a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
index c506bd66cc9..d3462501185 100644
--- a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
+++ b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
@@ -32,18 +32,25 @@ import com.cloud.network.Network.Event;
import com.cloud.network.Network.State;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
public class NetworkStateListener implements StateListener<State, Event, Network> {
@Inject
private ConfigurationDao _configDao;
- @Inject
+ @Autowired
+ @Qualifier("eventDistributor")
private EventDistributor eventDistributor;
public NetworkStateListener(ConfigurationDao configDao) {
_configDao = configDao;
}
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, Network vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@@ -70,7 +77,7 @@ public class NetworkStateListener implements StateListener<State, Event, Network
String resourceName = getEntityFromClassName(Network.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName, vo.getUuid());
- Map<String, String> eventDescription = new HashMap<String, String>();
+ Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
eventDescription.put("old-state", oldState.name());
diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
index cf8e09a8370..e92d36b4541 100644
--- a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
+++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
@@ -47,6 +47,9 @@ public class EventDistributorImpl extends ManagerBase implements EventDistributo
public List<EventBusException> publish(Event event) {
LOGGER.info(String.format("publishing %s to %d event busses", (event == null ? "<none>" : event.getDescription()), eventBusses.size()));
List<EventBusException> exceptions = new ArrayList<>();
+ if (event == null) {
+ return exceptions;
+ }
for (EventBus bus : eventBusses) {
try {
bus.publish(event);
diff --git a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
index f680ad29c24..7d48a391025 100644
--- a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
+++ b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
@@ -110,8 +110,8 @@ public class KafkaEventBus extends ManagerBase implements EventBus {
if (s_logger.isTraceEnabled()) {
s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
}
- ProducerRecord<String, String> record = new ProducerRecord<>(_topic, event.getResourceUUID(), event.getDescription());
- _producer.send(record);
+ ProducerRecord<String, String> newRecord = new ProducerRecord<>(_topic, event.getResourceUUID(), event.getDescription());
+ _producer.send(newRecord);
}
@Override
diff --git a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
index d2f2fd2182a..420ca51a0a0 100644
--- a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
+++ b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
@@ -43,19 +43,21 @@ import com.cloud.server.ManagementService;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.component.ComponentMethodInterceptor;
-import javax.inject.Inject;
@Component
public class EventUtils {
private static final Logger s_logger = Logger.getLogger(EventUtils.class);
- @Inject
- private EventDistributor eventDistributor;
+ private static EventDistributor eventDistributor;
protected static EventBus s_eventBus = null;
public EventUtils() {
}
+ public static void setEventDistributor(EventDistributor eventDistributorImpl) {
+ eventDistributor = eventDistributorImpl;
+ }
+
private static void publishOnMessageBus(String eventCategory, String eventType, String details, Event.State state) {
if (state != com.cloud.event.Event.State.Completed) {
@@ -63,6 +65,7 @@ public class EventUtils {
}
try {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
s_eventBus = ComponentContext.getComponent(EventBus.class);
} catch (NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
@@ -71,16 +74,15 @@ public class EventUtils {
org.apache.cloudstack.framework.events.Event event =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, eventCategory, eventType, EventTypes.getEntityForEvent(eventType), null);
- Map<String, String> eventDescription = new HashMap<String, String>();
+ Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("event", eventType);
eventDescription.put("status", state.toString());
eventDescription.put("details", details);
event.setDescription(eventDescription);
- try {
- s_eventBus.publish(event);
- } catch (EventBusException evx) {
- String errMsg = "Failed to publish contrail event.";
- s_logger.warn(errMsg, evx);
+ List<EventBusException> exceptions = eventDistributor.publish(event);
+ for (EventBusException ex : exceptions) {
+ String errMsg = "Failed to publish event.";
+ s_logger.warn(errMsg, ex);
}
}
@@ -123,7 +125,7 @@ public class EventUtils {
}
protected List<ActionEvent> getActionEvents(Method m) {
- List<ActionEvent> result = new ArrayList<ActionEvent>();
+ List<ActionEvent> result = new ArrayList<>();
ActionEvents events = m.getAnnotation(ActionEvents.class);
diff --git a/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java b/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
index 7a651526002..9034e9ceea2 100644
--- a/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
+++ b/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
@@ -25,9 +25,12 @@ import java.util.Map;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
+import com.cloud.utils.component.ComponentContext;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventDistributor;
import org.apache.log4j.Logger;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import com.cloud.configuration.Config;
@@ -47,10 +50,8 @@ public class SnapshotStateListener implements StateListener<State, Event, Snapsh
@Inject
private ConfigurationDao configDao;
- @Inject
- private EventDistributor eventDistributor;
- private static final Logger s_logger = Logger.getLogger(SnapshotStateListener.class);
+ private EventDistributor eventDistributor = null;
public SnapshotStateListener() {
@@ -61,6 +62,10 @@ public class SnapshotStateListener implements StateListener<State, Event, Snapsh
s_configDao = configDao;
}
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, SnapshotVO vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@@ -81,12 +86,15 @@ public class SnapshotStateListener implements StateListener<State, Event, Snapsh
if(!configValue) {
return;
}
+ if (eventDistributor == null) {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
+ }
String resourceName = getEntityFromClassName(Snapshot.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName,
vo.getUuid());
- Map<String, String> eventDescription = new HashMap<String, String>();
+ Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
eventDescription.put("old-state", oldState.name());
diff --git a/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java b/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
index 9f063fe8913..26b5d680be3 100644
--- a/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
+++ b/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
@@ -32,6 +32,7 @@ import com.cloud.server.ManagementService;
import com.cloud.storage.Volume;
import com.cloud.storage.Volume.Event;
import com.cloud.storage.Volume.State;
+import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.VMInstanceVO;
@@ -48,16 +49,17 @@ public class VolumeStateListener implements StateListener<State, Event, Volume>
protected ConfigurationDao _configDao;
protected VMInstanceDao _vmInstanceDao;
- @Inject
private EventDistributor eventDistributor;
- private static final Logger s_logger = Logger.getLogger(VolumeStateListener.class);
-
public VolumeStateListener(ConfigurationDao configDao, VMInstanceDao vmInstanceDao) {
this._configDao = configDao;
this._vmInstanceDao = vmInstanceDao;
}
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, Volume vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@@ -93,18 +95,21 @@ public class VolumeStateListener implements StateListener<State, Event, Volume>
return true;
}
- private void pubishOnEventBus(String event, String status, Volume vo, State oldState, State newState) {
+ private void pubishOnEventBus(String event, String status, Volume vo, State oldState, State newState) {
String configKey = Config.PublishResourceStateEvent.key();
String value = _configDao.getValue(configKey);
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
+ if (eventDistributor == null) {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
+ }
String resourceName = getEntityFromClassName(Volume.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName,
- vo.getUuid());
+ vo.getUuid());
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
diff --git a/server/src/main/java/com/cloud/vm/UserVmStateListener.java b/server/src/main/java/com/cloud/vm/UserVmStateListener.java
index 3935f49c4c0..8d397278fdc 100644
--- a/server/src/main/java/com/cloud/vm/UserVmStateListener.java
+++ b/server/src/main/java/com/cloud/vm/UserVmStateListener.java
@@ -33,6 +33,7 @@ import com.cloud.network.dao.NetworkDao;
import com.cloud.network.dao.NetworkVO;
import com.cloud.service.dao.ServiceOfferingDao;
import com.cloud.server.ManagementService;
+import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.VirtualMachine.Event;
@@ -52,7 +53,7 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine.
@Inject protected UserVmDao _userVmDao;
@Inject protected UserVmManager _userVmMgr;
@Inject protected ConfigurationDao _configDao;
- @Inject private EventDistributor eventDistributor;
+ private EventDistributor eventDistributor;
public UserVmStateListener(UsageEventDao usageEventDao, NetworkDao networkDao, NicDao nicDao, ServiceOfferingDao offeringDao, UserVmDao userVmDao, UserVmManager userVmMgr,
ConfigurationDao configDao) {
@@ -65,6 +66,10 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine.
this._configDao = configDao;
}
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, VirtualMachine vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@@ -122,12 +127,15 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine.
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
+ if (eventDistributor == null) {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
+ }
String resourceName = getEntityFromClassName(VirtualMachine.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName,
vo.getUuid());
- Map<String, String> eventDescription = new HashMap<String, String>();
+ Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
eventDescription.put("old-state", oldState.name());
diff --git a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
index decaa34cf95..5c97fb1f6db 100644
--- a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
+++ b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
@@ -29,6 +29,7 @@ import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.naming.ConfigurationException;
+import com.cloud.utils.exception.CloudRuntimeException;
import org.apache.log4j.Logger;
import org.springframework.aop.framework.Advised;
import org.springframework.beans.BeansException;
@@ -100,7 +101,7 @@ public class ComponentContext implements ApplicationContextAware {
s_logger.info("Running SystemIntegrityChecker " + entry.getKey());
try {
entry.getValue().check();
- } catch (Throwable e) {
+ } catch (RuntimeException e) {
s_logger.error("System integrity check failed. Refuse to startup", e);
System.exit(1);
}