You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2013/02/01 20:38:11 UTC
[39/50] [abbrv] git commit: refs/heads/javelin - Merging events
framework branch into master. This commit will bring following changes
Merging events framework branch into master. This commit will bring
following changes
- introduced notion of event bus with publish, subscribe, unsubscribe
semantics
- a plug-in can implement the EventBus abstraction to provide event
bug to CloudStack
- A rabbitMQ based plug-in that can interact with AMQP servers to
provide message broker based event-bug
- stream lines, action events, usage events, alerts publishing in to
convineance classed which are also used to publish corresponding
event on to event bus
- introduced notion of state change event. On a state change, in the
state machine corrsponding to the resource, a state change event is
published on the event bug
- associated a state machined with Snapshot and Network objects
- Virtual Machine, Volume, Snaphost, Network object state changes wil
result in a state change event
Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/e7a554fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/e7a554fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/e7a554fc
Branch: refs/heads/javelin
Commit: e7a554fc6a23a49949c2d88d6ef680682c6f6bc4
Parents: 6a6d93c
Author: Murali Reddy <mu...@citrix.com>
Authored: Fri Feb 1 01:30:49 2013 +0530
Committer: Murali Reddy <mu...@citrix.com>
Committed: Fri Feb 1 01:37:24 2013 +0530
----------------------------------------------------------------------
api/src/com/cloud/event/EventCategory.java | 55 ++
api/src/com/cloud/event/EventTypes.java | 351 +++++++++-
api/src/com/cloud/network/Network.java | 56 +-
api/src/com/cloud/storage/Snapshot.java | 39 +-
.../cloudstack/api/response/SnapshotResponse.java | 12 +-
client/pom.xml | 5 +
core/src/com/cloud/storage/SnapshotVO.java | 26 +-
framework/events/pom.xml | 47 ++
.../apache/cloudstack/framework/events/Event.java | 94 +++
.../cloudstack/framework/events/EventBus.java | 55 ++
.../framework/events/EventBusException.java | 26 +
.../framework/events/EventSubscriber.java | 30 +
.../cloudstack/framework/events/EventTopic.java | 57 ++
framework/pom.xml | 35 +
plugins/event-bus/rabbitmq/pom.xml | 46 ++
.../cloudstack/mom/rabbitmq/RabbitMQEventBus.java | 555 +++++++++++++++
.../cloud/network/guru/OvsGuestNetworkGuru.java | 4 +-
plugins/pom.xml | 1 +
pom.xml | 1 +
server/pom.xml | 5 +
server/src/com/cloud/alert/AlertManagerImpl.java | 64 ++
server/src/com/cloud/api/ApiDBUtils.java | 207 +-----
server/src/com/cloud/api/ApiResponseHelper.java | 191 +-----
server/src/com/cloud/api/ApiServer.java | 5 +-
.../cloud/baremetal/BareMetalTemplateAdapter.java | 30 +-
.../cloud/baremetal/BareMetalVmManagerImpl.java | 63 +--
.../configuration/DefaultInterceptorLibrary.java | 8 +-
.../src/com/cloud/event/ActionEventCallback.java | 135 ----
server/src/com/cloud/event/ActionEventUtils.java | 288 ++++++++
server/src/com/cloud/event/AlertGenerator.java | 87 +++
server/src/com/cloud/event/EventUtils.java | 102 ---
server/src/com/cloud/event/UsageEventUtils.java | 119 +++
.../src/com/cloud/network/NetworkManagerImpl.java | 204 +++----
.../src/com/cloud/network/NetworkServiceImpl.java | 90 +--
.../com/cloud/network/NetworkStateListener.java | 90 +++
server/src/com/cloud/network/NetworkVO.java | 1 +
server/src/com/cloud/network/dao/NetworkDao.java | 4 +-
.../src/com/cloud/network/dao/NetworkDaoImpl.java | 32 +-
.../network/firewall/FirewallManagerImpl.java | 27 +-
.../network/guru/ExternalGuestNetworkGuru.java | 7 +-
.../com/cloud/network/guru/GuestNetworkGuru.java | 11 +-
.../network/lb/LoadBalancingRulesManagerImpl.java | 11 +-
.../com/cloud/network/rules/RulesManagerImpl.java | 12 +-
.../network/security/SecurityGroupManagerImpl.java | 78 +--
.../network/vpn/RemoteAccessVpnManagerImpl.java | 61 +-
.../src/com/cloud/server/ManagementServerImpl.java | 9 +-
.../src/com/cloud/storage/StorageManagerImpl.java | 187 ++----
server/src/com/cloud/storage/dao/SnapshotDao.java | 13 +-
.../src/com/cloud/storage/dao/SnapshotDaoImpl.java | 49 +-
.../storage/download/DownloadMonitorImpl.java | 76 +--
.../storage/listener/SnapshotStateListener.java | 85 +++
.../storage/listener/VolumeStateListener.java | 85 +++
.../storage/snapshot/SnapshotManagerImpl.java | 195 +++---
.../storage/snapshot/SnapshotSchedulerImpl.java | 6 +-
.../cloud/template/HyervisorTemplateAdapter.java | 29 +-
.../com/cloud/template/TemplateManagerImpl.java | 95 +--
server/src/com/cloud/user/AccountManagerImpl.java | 8 +-
server/src/com/cloud/vm/UserVmManagerImpl.java | 59 +-
server/src/com/cloud/vm/UserVmStateListener.java | 88 ++-
.../test/com/cloud/snapshot/SnapshotDaoTest.java | 9 +-
.../test/com/cloud/vpc/dao/MockNetworkDaoImpl.java | 17 +-
tools/whisker/LICENSE | 500 +++++++++++++-
tools/whisker/descriptor-for-packaging.xml | 18 +
63 files changed, 3400 insertions(+), 1555 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/api/src/com/cloud/event/EventCategory.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/event/EventCategory.java b/api/src/com/cloud/event/EventCategory.java
new file mode 100644
index 0000000..cee6529
--- /dev/null
+++ b/api/src/com/cloud/event/EventCategory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.cloud.event;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventCategory {
+ private static List<EventCategory> eventCategories = new ArrayList<EventCategory>();
+ private String eventCategoryName;
+
+ public EventCategory(String categoryName) {
+ this.eventCategoryName = categoryName;
+ eventCategories.add(this);
+ }
+
+ public String getName() {
+ return eventCategoryName;
+ }
+
+ public static List<EventCategory> listAllEventCategories() {
+ return eventCategories;
+ }
+
+ public static EventCategory getEventCategory(String categoryName) {
+ for (EventCategory category : eventCategories) {
+ if (category.getName().equalsIgnoreCase(categoryName)) {
+ return category;
+ }
+ }
+ return null;
+ }
+
+ public static final EventCategory ACTION_EVENT = new EventCategory("ActionEvent");
+ public static final EventCategory ALERT_EVENT = new EventCategory("AlertEvent");
+ public static final EventCategory USAGE_EVENT = new EventCategory("UsageEvent");
+ public static final EventCategory RESOURCE_STATE_CHANGE_EVENT = new EventCategory("ResourceStateEvent");
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/api/src/com/cloud/event/EventTypes.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/event/EventTypes.java b/api/src/com/cloud/event/EventTypes.java
index d666c1e..0dd97cb 100755
--- a/api/src/com/cloud/event/EventTypes.java
+++ b/api/src/com/cloud/event/EventTypes.java
@@ -16,7 +16,41 @@
// under the License.
package com.cloud.event;
+import com.cloud.configuration.Configuration;
+import com.cloud.dc.DataCenter;
+import com.cloud.dc.Pod;
+import com.cloud.dc.StorageNetworkIpRange;
+import com.cloud.dc.Vlan;
+import com.cloud.domain.Domain;
+import com.cloud.host.Host;
+import com.cloud.network.*;
+import com.cloud.network.as.*;
+import com.cloud.network.router.VirtualRouter;
+import com.cloud.network.rules.LoadBalancer;
+import com.cloud.network.rules.StaticNat;
+import com.cloud.network.security.SecurityGroup;
+import com.cloud.network.vpc.PrivateGateway;
+import com.cloud.network.vpc.StaticRoute;
+import com.cloud.network.vpc.Vpc;
+import com.cloud.offering.DiskOffering;
+import com.cloud.offering.NetworkOffering;
+import com.cloud.offering.ServiceOffering;
+import com.cloud.projects.Project;
+import com.cloud.storage.Snapshot;
+import com.cloud.storage.Volume;
+import com.cloud.template.VirtualMachineTemplate;
+import com.cloud.user.Account;
+import com.cloud.user.User;
+import com.cloud.vm.VirtualMachine;
+
+import java.util.HashMap;
+import java.util.Map;
+
public class EventTypes {
+
+ //map of Event and corresponding entity for which Event is applicable
+ private static Map<String, String> entityEventDetails = null;
+
// VM Events
public static final String EVENT_VM_CREATE = "VM.CREATE";
public static final String EVENT_VM_DESTROY = "VM.DESTROY";
@@ -319,10 +353,323 @@ public class EventTypes {
public static final String EVENT_AUTOSCALEVMGROUP_UPDATE = "AUTOSCALEVMGROUP.UPDATE";
public static final String EVENT_AUTOSCALEVMGROUP_ENABLE = "AUTOSCALEVMGROUP.ENABLE";
public static final String EVENT_AUTOSCALEVMGROUP_DISABLE = "AUTOSCALEVMGROUP.DISABLE";
-
+
+
public static final String EVENT_BAREMETAL_DHCP_SERVER_ADD = "PHYSICAL.DHCP.ADD";
public static final String EVENT_BAREMETAL_DHCP_SERVER_DELETE = "PHYSICAL.DHCP.DELETE";
-
public static final String EVENT_BAREMETAL_PXE_SERVER_ADD = "PHYSICAL.PXE.ADD";
public static final String EVENT_BAREMETAL_PXE_SERVER_DELETE = "PHYSICAL.PXE.DELETE";
+
+ static {
+
+ // TODO: need a way to force author adding event types to declare the entity details as well, with out braking
+ // current ActionEvent annotation semantics
+
+ entityEventDetails = new HashMap<String, String>();
+
+ entityEventDetails.put(EVENT_VM_CREATE, VirtualMachine.class.getName());
+ entityEventDetails.put(EVENT_VM_DESTROY, VirtualMachine.class.getName());
+ entityEventDetails.put(EVENT_VM_START, VirtualMachine.class.getName());
+ entityEventDetails.put(EVENT_VM_STOP, VirtualMachine.class.getName());
+ entityEventDetails.put(EVENT_VM_REBOOT, VirtualMachine.class.getName());
+ entityEventDetails.put(EVENT_VM_UPDATE, VirtualMachine.class.getName());
+ entityEventDetails.put(EVENT_VM_UPGRADE, VirtualMachine.class.getName());
+ entityEventDetails.put(EVENT_VM_RESETPASSWORD, VirtualMachine.class.getName());
+ entityEventDetails.put(EVENT_VM_MIGRATE, VirtualMachine.class.getName());
+ entityEventDetails.put(EVENT_VM_MOVE, VirtualMachine.class.getName());
+ entityEventDetails.put(EVENT_VM_RESTORE, VirtualMachine.class.getName());
+
+ entityEventDetails.put(EVENT_ROUTER_CREATE, VirtualRouter.class.getName());
+ entityEventDetails.put(EVENT_ROUTER_DESTROY, VirtualRouter.class.getName());
+ entityEventDetails.put(EVENT_ROUTER_START, VirtualRouter.class.getName());
+ entityEventDetails.put(EVENT_ROUTER_STOP, VirtualRouter.class.getName());
+ entityEventDetails.put(EVENT_ROUTER_REBOOT, VirtualRouter.class.getName());
+ entityEventDetails.put(EVENT_ROUTER_HA, VirtualRouter.class.getName());
+ entityEventDetails.put(EVENT_ROUTER_UPGRADE, VirtualRouter.class.getName());
+
+ entityEventDetails.put(EVENT_PROXY_CREATE, "ConsoleProxy");
+ entityEventDetails.put(EVENT_PROXY_DESTROY, "ConsoleProxy");
+ entityEventDetails.put(EVENT_PROXY_START, "ConsoleProxy");
+ entityEventDetails.put(EVENT_PROXY_STOP, "ConsoleProxy");
+ entityEventDetails.put(EVENT_PROXY_REBOOT, "ConsoleProxy");
+ entityEventDetails.put(EVENT_ROUTER_HA, "ConsoleProxy");
+ entityEventDetails.put(EVENT_PROXY_HA, "ConsoleProxy");
+
+ entityEventDetails.put(EVENT_VNC_CONNECT, "VNC");
+ entityEventDetails.put(EVENT_VNC_DISCONNECT, "VNC");
+
+ // Network Events
+ entityEventDetails.put(EVENT_NETWORK_CREATE, Network.class.getName());
+ entityEventDetails.put(EVENT_NETWORK_DELETE, Network.class.getName());
+ entityEventDetails.put(EVENT_NETWORK_UPDATE, Network.class.getName());
+ entityEventDetails.put(EVENT_NETWORK_RESTART, Network.class.getName());
+ entityEventDetails.put(EVENT_NET_IP_ASSIGN, PublicIpAddress.class.getName());
+ entityEventDetails.put(EVENT_NET_IP_RELEASE, PublicIpAddress.class.getName());
+ entityEventDetails.put(EVENT_NET_RULE_ADD, Network.class.getName());
+ entityEventDetails.put(EVENT_NET_RULE_DELETE, Network.class.getName());
+ entityEventDetails.put(EVENT_NET_RULE_MODIFY, Network.class.getName());
+ entityEventDetails.put(EVENT_FIREWALL_OPEN, Network.class.getName());
+ entityEventDetails.put(EVENT_FIREWALL_CLOSE, Network.class.getName());
+
+ // Load Balancers
+ entityEventDetails.put(EVENT_ASSIGN_TO_LOAD_BALANCER_RULE, LoadBalancer.class.getName());
+ entityEventDetails.put(EVENT_REMOVE_FROM_LOAD_BALANCER_RULE, LoadBalancer.class.getName());
+ entityEventDetails.put(EVENT_LOAD_BALANCER_CREATE, LoadBalancer.class.getName());
+ entityEventDetails.put(EVENT_LOAD_BALANCER_DELETE, LoadBalancer.class.getName());
+ entityEventDetails.put(EVENT_LB_STICKINESSPOLICY_CREATE, LoadBalancer.class.getName());
+ entityEventDetails.put(EVENT_LB_STICKINESSPOLICY_DELETE, LoadBalancer.class.getName());
+ entityEventDetails.put(EVENT_LOAD_BALANCER_UPDATE, LoadBalancer.class.getName());
+
+ // Account events
+ entityEventDetails.put(EVENT_ACCOUNT_DISABLE, Account.class.getName());
+ entityEventDetails.put(EVENT_ACCOUNT_CREATE, Account.class.getName());
+ entityEventDetails.put(EVENT_ACCOUNT_DELETE, Account.class.getName());
+ entityEventDetails.put(EVENT_ACCOUNT_MARK_DEFAULT_ZONE, Account.class.getName());
+
+ // UserVO Events
+ entityEventDetails.put(EVENT_USER_LOGIN, User.class.getName());
+ entityEventDetails.put(EVENT_USER_LOGOUT, User.class.getName());
+ entityEventDetails.put(EVENT_USER_CREATE, User.class.getName());
+ entityEventDetails.put(EVENT_USER_DELETE, User.class.getName());
+ entityEventDetails.put(EVENT_USER_DISABLE, User.class.getName());
+ entityEventDetails.put(EVENT_USER_UPDATE, User.class.getName());
+ entityEventDetails.put(EVENT_USER_ENABLE, User.class.getName());
+ entityEventDetails.put(EVENT_USER_LOCK, User.class.getName());
+
+ // Template Events
+ entityEventDetails.put(EVENT_TEMPLATE_CREATE, VirtualMachineTemplate.class.getName());
+ entityEventDetails.put(EVENT_TEMPLATE_DELETE, VirtualMachineTemplate.class.getName());
+ entityEventDetails.put(EVENT_TEMPLATE_UPDATE, VirtualMachineTemplate.class.getName());
+ entityEventDetails.put(EVENT_TEMPLATE_DOWNLOAD_START, VirtualMachineTemplate.class.getName());
+ entityEventDetails.put(EVENT_TEMPLATE_DOWNLOAD_SUCCESS, VirtualMachineTemplate.class.getName());
+ entityEventDetails.put(EVENT_TEMPLATE_DOWNLOAD_FAILED, VirtualMachineTemplate.class.getName());
+ entityEventDetails.put(EVENT_TEMPLATE_COPY, VirtualMachineTemplate.class.getName());
+ entityEventDetails.put(EVENT_TEMPLATE_EXTRACT, VirtualMachineTemplate.class.getName());
+ entityEventDetails.put(EVENT_TEMPLATE_UPLOAD, VirtualMachineTemplate.class.getName());
+ entityEventDetails.put(EVENT_TEMPLATE_CLEANUP, VirtualMachineTemplate.class.getName());
+
+ // Volume Events
+ entityEventDetails.put(EVENT_VOLUME_CREATE, Volume.class.getName());
+ entityEventDetails.put(EVENT_VOLUME_DELETE, Volume.class.getName());
+ entityEventDetails.put(EVENT_VOLUME_ATTACH, Volume.class.getName());
+ entityEventDetails.put(EVENT_VOLUME_DETACH, Volume.class.getName());
+ entityEventDetails.put(EVENT_VOLUME_EXTRACT, Volume.class.getName());
+ entityEventDetails.put(EVENT_VOLUME_UPLOAD, Volume.class.getName());
+ entityEventDetails.put(EVENT_VOLUME_MIGRATE, Volume.class.getName());
+ entityEventDetails.put(EVENT_VOLUME_RESIZE, Volume.class.getName());
+
+ // Domains
+ entityEventDetails.put(EVENT_DOMAIN_CREATE, Domain.class.getName());
+ entityEventDetails.put(EVENT_DOMAIN_DELETE, Domain.class.getName());
+ entityEventDetails.put(EVENT_DOMAIN_UPDATE, Domain.class.getName());
+
+ // Snapshots
+ entityEventDetails.put(EVENT_SNAPSHOT_CREATE, Snapshot.class.getName());
+ entityEventDetails.put(EVENT_SNAPSHOT_DELETE, Snapshot.class.getName());
+ entityEventDetails.put(EVENT_SNAPSHOT_POLICY_CREATE, Snapshot.class.getName());
+ entityEventDetails.put(EVENT_SNAPSHOT_POLICY_UPDATE, Snapshot.class.getName());
+ entityEventDetails.put(EVENT_SNAPSHOT_POLICY_DELETE, Snapshot.class.getName());
+
+ // ISO
+ entityEventDetails.put(EVENT_ISO_CREATE, "Iso");
+ entityEventDetails.put(EVENT_ISO_DELETE, "Iso");
+ entityEventDetails.put(EVENT_ISO_COPY, "Iso");
+ entityEventDetails.put(EVENT_ISO_ATTACH, "Iso");
+ entityEventDetails.put(EVENT_ISO_DETACH, "Iso");
+ entityEventDetails.put(EVENT_ISO_EXTRACT, "Iso");
+ entityEventDetails.put(EVENT_ISO_UPLOAD, "Iso");
+
+ // SSVM
+ entityEventDetails.put(EVENT_SSVM_CREATE, "SecondaryStorageVm");
+ entityEventDetails.put(EVENT_SSVM_DESTROY, "SecondaryStorageVm");
+ entityEventDetails.put(EVENT_SSVM_START, "SecondaryStorageVm");
+ entityEventDetails.put(EVENT_SSVM_STOP, "SecondaryStorageVm");
+ entityEventDetails.put(EVENT_SSVM_REBOOT, "SecondaryStorageVm");
+ entityEventDetails.put(EVENT_SSVM_HA, "SecondaryStorageVm");
+
+ // Service Offerings
+ entityEventDetails.put(EVENT_SERVICE_OFFERING_CREATE, ServiceOffering.class.getName());
+ entityEventDetails.put(EVENT_SERVICE_OFFERING_EDIT, ServiceOffering.class.getName());
+ entityEventDetails.put(EVENT_SERVICE_OFFERING_DELETE, ServiceOffering.class.getName());
+
+ // Disk Offerings
+ entityEventDetails.put(EVENT_DISK_OFFERING_CREATE, DiskOffering.class.getName());
+ entityEventDetails.put(EVENT_DISK_OFFERING_EDIT, DiskOffering.class.getName());
+ entityEventDetails.put(EVENT_DISK_OFFERING_DELETE, DiskOffering.class.getName());
+
+ // Network offerings
+ entityEventDetails.put(EVENT_NETWORK_OFFERING_CREATE, NetworkOffering.class.getName());
+ entityEventDetails.put(EVENT_NETWORK_OFFERING_ASSIGN, NetworkOffering.class.getName());
+ entityEventDetails.put(EVENT_NETWORK_OFFERING_EDIT, NetworkOffering.class.getName());
+ entityEventDetails.put(EVENT_NETWORK_OFFERING_REMOVE, NetworkOffering.class.getName());
+ entityEventDetails.put(EVENT_NETWORK_OFFERING_DELETE, NetworkOffering.class.getName());
+
+ // Pods
+ entityEventDetails.put(EVENT_POD_CREATE, Pod.class.getName());
+ entityEventDetails.put(EVENT_POD_EDIT, Pod.class.getName());
+ entityEventDetails.put(EVENT_POD_DELETE, Pod.class.getName());
+
+ // Zones
+ entityEventDetails.put(EVENT_ZONE_CREATE, DataCenter.class.getName());
+ entityEventDetails.put(EVENT_ZONE_EDIT, DataCenter.class.getName());
+ entityEventDetails.put(EVENT_ZONE_DELETE, DataCenter.class.getName());
+
+ // VLANs/IP ranges
+ entityEventDetails.put(EVENT_VLAN_IP_RANGE_CREATE, Vlan.class.getName());
+ entityEventDetails.put(EVENT_VLAN_IP_RANGE_DELETE,Vlan.class.getName());
+
+ entityEventDetails.put(EVENT_STORAGE_IP_RANGE_CREATE, StorageNetworkIpRange.class.getName());
+ entityEventDetails.put(EVENT_STORAGE_IP_RANGE_DELETE, StorageNetworkIpRange.class.getName());
+ entityEventDetails.put(EVENT_STORAGE_IP_RANGE_UPDATE, StorageNetworkIpRange.class.getName());
+
+ // Configuration Table
+ entityEventDetails.put(EVENT_CONFIGURATION_VALUE_EDIT, Configuration.class.getName());
+
+ // Security Groups
+ entityEventDetails.put(EVENT_SECURITY_GROUP_AUTHORIZE_INGRESS, SecurityGroup.class.getName());
+ entityEventDetails.put(EVENT_SECURITY_GROUP_REVOKE_INGRESS, SecurityGroup.class.getName());
+ entityEventDetails.put(EVENT_SECURITY_GROUP_AUTHORIZE_EGRESS, SecurityGroup.class.getName());
+ entityEventDetails.put(EVENT_SECURITY_GROUP_REVOKE_EGRESS, SecurityGroup.class.getName());
+ entityEventDetails.put(EVENT_SECURITY_GROUP_CREATE, SecurityGroup.class.getName());
+ entityEventDetails.put(EVENT_SECURITY_GROUP_DELETE, SecurityGroup.class.getName());
+ entityEventDetails.put(EVENT_SECURITY_GROUP_ASSIGN, SecurityGroup.class.getName());
+ entityEventDetails.put(EVENT_SECURITY_GROUP_REMOVE, SecurityGroup.class.getName());
+
+ // Host
+ entityEventDetails.put(EVENT_HOST_RECONNECT, Host.class.getName());
+
+ // Maintenance
+ entityEventDetails.put(EVENT_MAINTENANCE_CANCEL, Host.class.getName());
+ entityEventDetails.put(EVENT_MAINTENANCE_CANCEL_PRIMARY_STORAGE, Host.class.getName());
+ entityEventDetails.put(EVENT_MAINTENANCE_PREPARE, Host.class.getName());
+ entityEventDetails.put(EVENT_MAINTENANCE_PREPARE_PRIMARY_STORAGE, Host.class.getName());
+
+ // VPN
+ entityEventDetails.put(EVENT_REMOTE_ACCESS_VPN_CREATE, RemoteAccessVpn.class.getName());
+ entityEventDetails.put(EVENT_REMOTE_ACCESS_VPN_DESTROY, RemoteAccessVpn.class.getName());
+ entityEventDetails.put(EVENT_VPN_USER_ADD, RemoteAccessVpn.class.getName());
+ entityEventDetails.put(EVENT_VPN_USER_REMOVE, RemoteAccessVpn.class.getName());
+ entityEventDetails.put(EVENT_S2S_VPN_GATEWAY_CREATE, RemoteAccessVpn.class.getName());
+ entityEventDetails.put(EVENT_S2S_VPN_GATEWAY_DELETE, RemoteAccessVpn.class.getName());
+ entityEventDetails.put(EVENT_S2S_VPN_CUSTOMER_GATEWAY_CREATE, RemoteAccessVpn.class.getName());
+ entityEventDetails.put(EVENT_S2S_VPN_CUSTOMER_GATEWAY_DELETE, RemoteAccessVpn.class.getName());
+ entityEventDetails.put(EVENT_S2S_VPN_CUSTOMER_GATEWAY_UPDATE, RemoteAccessVpn.class.getName());
+ entityEventDetails.put(EVENT_S2S_VPN_CONNECTION_CREATE, RemoteAccessVpn.class.getName());
+ entityEventDetails.put(EVENT_S2S_VPN_CONNECTION_DELETE, RemoteAccessVpn.class.getName());
+ entityEventDetails.put(EVENT_S2S_VPN_CONNECTION_RESET, RemoteAccessVpn.class.getName());
+
+ // Custom certificates
+ entityEventDetails.put(EVENT_UPLOAD_CUSTOM_CERTIFICATE, "Certificate");
+
+ // OneToOnenat
+ entityEventDetails.put(EVENT_ENABLE_STATIC_NAT, StaticNat.class.getName());
+ entityEventDetails.put(EVENT_DISABLE_STATIC_NAT, StaticNat.class.getName());
+
+ entityEventDetails.put(EVENT_ZONE_VLAN_ASSIGN,Vlan.class.getName());
+ entityEventDetails.put(EVENT_ZONE_VLAN_RELEASE,Vlan.class.getName());
+
+ // Projects
+ entityEventDetails.put(EVENT_PROJECT_CREATE, Project.class.getName());
+ entityEventDetails.put(EVENT_PROJECT_UPDATE, Project.class.getName());
+ entityEventDetails.put(EVENT_PROJECT_DELETE, Project.class.getName());
+ entityEventDetails.put(EVENT_PROJECT_ACTIVATE, Project.class.getName());
+ entityEventDetails.put(EVENT_PROJECT_SUSPEND, Project.class.getName());
+ entityEventDetails.put(EVENT_PROJECT_ACCOUNT_ADD, Project.class.getName());
+ entityEventDetails.put(EVENT_PROJECT_INVITATION_UPDATE, Project.class.getName());
+ entityEventDetails.put(EVENT_PROJECT_INVITATION_REMOVE, Project.class.getName());
+ entityEventDetails.put(EVENT_PROJECT_ACCOUNT_REMOVE, Project.class.getName());
+
+ // Network as a Service
+ entityEventDetails.put(EVENT_NETWORK_ELEMENT_CONFIGURE,Network.class.getName());
+
+ // Physical Network Events
+ entityEventDetails.put(EVENT_PHYSICAL_NETWORK_CREATE, PhysicalNetwork.class.getName());
+ entityEventDetails.put(EVENT_PHYSICAL_NETWORK_DELETE, PhysicalNetwork.class.getName());
+ entityEventDetails.put(EVENT_PHYSICAL_NETWORK_UPDATE, PhysicalNetwork.class.getName());
+
+ // Physical Network Service Provider Events
+ entityEventDetails.put(EVENT_SERVICE_PROVIDER_CREATE, PhysicalNetworkServiceProvider.class.getName());
+ entityEventDetails.put(EVENT_SERVICE_PROVIDER_DELETE, PhysicalNetworkServiceProvider.class.getName());
+ entityEventDetails.put(EVENT_SERVICE_PROVIDER_UPDATE, PhysicalNetworkServiceProvider.class.getName());
+
+ // Physical Network TrafficType Events
+ entityEventDetails.put(EVENT_TRAFFIC_TYPE_CREATE, PhysicalNetworkTrafficType.class.getName());
+ entityEventDetails.put(EVENT_TRAFFIC_TYPE_DELETE, PhysicalNetworkTrafficType.class.getName());
+ entityEventDetails.put(EVENT_TRAFFIC_TYPE_UPDATE, PhysicalNetworkTrafficType.class.getName());
+
+ // external network device events
+ entityEventDetails.put(EVENT_EXTERNAL_LB_DEVICE_ADD, PhysicalNetwork.class.getName());
+ entityEventDetails.put(EVENT_EXTERNAL_LB_DEVICE_DELETE, PhysicalNetwork.class.getName());
+ entityEventDetails.put(EVENT_EXTERNAL_LB_DEVICE_CONFIGURE, PhysicalNetwork.class.getName());
+
+ // external switch management device events (E.g.: Cisco Nexus 1000v Virtual Supervisor Module.
+ entityEventDetails.put(EVENT_EXTERNAL_SWITCH_MGMT_DEVICE_ADD, "Nexus1000v");
+ entityEventDetails.put(EVENT_EXTERNAL_SWITCH_MGMT_DEVICE_DELETE, "Nexus1000v");
+ entityEventDetails.put(EVENT_EXTERNAL_SWITCH_MGMT_DEVICE_CONFIGURE, "Nexus1000v");
+ entityEventDetails.put(EVENT_EXTERNAL_SWITCH_MGMT_DEVICE_ENABLE, "Nexus1000v");
+ entityEventDetails.put(EVENT_EXTERNAL_SWITCH_MGMT_DEVICE_DISABLE, "Nexus1000v");
+
+
+ entityEventDetails.put(EVENT_EXTERNAL_FIREWALL_DEVICE_ADD, PhysicalNetwork.class.getName());
+ entityEventDetails.put(EVENT_EXTERNAL_FIREWALL_DEVICE_DELETE, PhysicalNetwork.class.getName());
+ entityEventDetails.put(EVENT_EXTERNAL_FIREWALL_DEVICE_CONFIGURE, PhysicalNetwork.class.getName());
+
+ // VPC
+ entityEventDetails.put(EVENT_VPC_CREATE, Vpc.class.getName());
+ entityEventDetails.put(EVENT_VPC_UPDATE, Vpc.class.getName());
+ entityEventDetails.put(EVENT_VPC_DELETE, Vpc.class.getName());
+ entityEventDetails.put(EVENT_VPC_RESTART, Vpc.class.getName());
+
+ // VPC offerings
+ entityEventDetails.put(EVENT_VPC_OFFERING_CREATE, Vpc.class.getName());
+ entityEventDetails.put(EVENT_VPC_OFFERING_UPDATE, Vpc.class.getName());
+ entityEventDetails.put(EVENT_VPC_OFFERING_DELETE, Vpc.class.getName());
+
+ // Private gateway
+ entityEventDetails.put(EVENT_PRIVATE_GATEWAY_CREATE, PrivateGateway.class.getName());
+ entityEventDetails.put(EVENT_PRIVATE_GATEWAY_DELETE, PrivateGateway.class.getName());
+
+ // Static routes
+ entityEventDetails.put(EVENT_STATIC_ROUTE_CREATE, StaticRoute.class.getName());
+ entityEventDetails.put(EVENT_STATIC_ROUTE_DELETE, StaticRoute.class.getName());
+
+ // tag related events
+ entityEventDetails.put(EVENT_TAGS_CREATE, "Tag");
+ entityEventDetails.put(EVENT_TAGS_DELETE, "tag");
+
+ // external network device events
+ entityEventDetails.put(EVENT_EXTERNAL_NVP_CONTROLLER_ADD, "NvpController");
+ entityEventDetails.put(EVENT_EXTERNAL_NVP_CONTROLLER_DELETE, "NvpController");
+ entityEventDetails.put(EVENT_EXTERNAL_NVP_CONTROLLER_CONFIGURE, "NvpController");
+
+ // AutoScale
+ entityEventDetails.put(EVENT_COUNTER_CREATE, AutoScaleCounter.class.getName());
+ entityEventDetails.put(EVENT_COUNTER_DELETE, AutoScaleCounter.class.getName());
+ entityEventDetails.put(EVENT_CONDITION_CREATE, Condition.class.getName());
+ entityEventDetails.put(EVENT_CONDITION_DELETE, Condition.class.getName());
+ entityEventDetails.put(EVENT_AUTOSCALEPOLICY_CREATE, AutoScalePolicy.class.getName());
+ entityEventDetails.put(EVENT_AUTOSCALEPOLICY_UPDATE, AutoScalePolicy.class.getName());
+ entityEventDetails.put(EVENT_AUTOSCALEPOLICY_DELETE, AutoScalePolicy.class.getName());
+ entityEventDetails.put(EVENT_AUTOSCALEVMPROFILE_CREATE, AutoScaleVmProfile.class.getName());
+ entityEventDetails.put(EVENT_AUTOSCALEVMPROFILE_DELETE, AutoScaleVmProfile.class.getName());
+ entityEventDetails.put(EVENT_AUTOSCALEVMPROFILE_UPDATE, AutoScaleVmProfile.class.getName());
+ entityEventDetails.put(EVENT_AUTOSCALEVMGROUP_CREATE, AutoScaleVmGroup.class.getName());
+ entityEventDetails.put(EVENT_AUTOSCALEVMGROUP_DELETE, AutoScaleVmGroup.class.getName());
+ entityEventDetails.put(EVENT_AUTOSCALEVMGROUP_UPDATE, AutoScaleVmGroup.class.getName());
+ entityEventDetails.put(EVENT_AUTOSCALEVMGROUP_ENABLE, AutoScaleVmGroup.class.getName());
+ entityEventDetails.put(EVENT_AUTOSCALEVMGROUP_DISABLE, AutoScaleVmGroup.class.getName());
+ }
+
+ public static String getEntityForEvent (String eventName) {
+ String entityClassName = entityEventDetails.get(eventName);
+ if (entityClassName == null || entityClassName.isEmpty()) {
+ return null;
+ }
+ int index = entityClassName.lastIndexOf(".");
+ String entityName = entityClassName;
+ if (index != -1) {
+ entityName = entityClassName.substring(index+1);
+ }
+ return entityName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/api/src/com/cloud/network/Network.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/network/Network.java b/api/src/com/cloud/network/Network.java
index 413b6d9..1dbb327 100644
--- a/api/src/com/cloud/network/Network.java
+++ b/api/src/com/cloud/network/Network.java
@@ -16,26 +16,25 @@
// under the License.
package com.cloud.network;
-import org.apache.cloudstack.acl.ControlledEntity;
import com.cloud.network.Networks.BroadcastDomainType;
import com.cloud.network.Networks.Mode;
import com.cloud.network.Networks.TrafficType;
-import com.cloud.utils.fsm.FiniteState;
-import com.cloud.utils.fsm.StateMachine;
+import com.cloud.utils.fsm.StateMachine2;
+import com.cloud.utils.fsm.StateObject;
+import org.apache.cloudstack.acl.ControlledEntity;
import org.apache.cloudstack.api.Identity;
import org.apache.cloudstack.api.InternalIdentity;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
/**
* owned by an account.
*/
-public interface Network extends ControlledEntity, InternalIdentity, Identity {
+public interface Network extends ControlledEntity, StateObject<Network.State>, InternalIdentity, Identity {
- public enum GuestType {
+ public enum GuestType {
Shared,
Isolated
}
@@ -204,7 +203,8 @@ public interface Network extends ControlledEntity, InternalIdentity, Identity {
OperationFailed;
}
- enum State implements FiniteState<State, Event> {
+ public enum State {
+
Allocated("Indicates the network configuration is in allocated but not setup"),
Setup("Indicates the network configuration is setup"),
Implementing("Indicates the network configuration is being implemented"),
@@ -212,39 +212,8 @@ public interface Network extends ControlledEntity, InternalIdentity, Identity {
Shutdown("Indicates the network configuration is being destroyed"),
Destroy("Indicates that the network is destroyed");
+ protected static final StateMachine2<State, Network.Event, Network> s_fsm = new StateMachine2<State, Network.Event, Network>();
- @Override
- public StateMachine<State, Event> getStateMachine() {
- return s_fsm;
- }
-
- @Override
- public State getNextState(Event event) {
- return s_fsm.getNextState(this, event);
- }
-
- @Override
- public List<State> getFromStates(Event event) {
- return s_fsm.getFromStates(this, event);
- }
-
- @Override
- public Set<Event> getPossibleEvents() {
- return s_fsm.getPossibleEvents(this);
- }
-
- String _description;
-
- @Override
- public String getDescription() {
- return _description;
- }
-
- private State(String description) {
- _description = description;
- }
-
- private static StateMachine<State, Event> s_fsm = new StateMachine<State, Event>();
static {
s_fsm.addTransition(State.Allocated, Event.ImplementNetwork, State.Implementing);
s_fsm.addTransition(State.Implementing, Event.OperationSucceeded, State.Implemented);
@@ -253,6 +222,15 @@ public interface Network extends ControlledEntity, InternalIdentity, Identity {
s_fsm.addTransition(State.Shutdown, Event.OperationSucceeded, State.Allocated);
s_fsm.addTransition(State.Shutdown, Event.OperationFailed, State.Implemented);
}
+
+ public static StateMachine2<State, Network.Event, Network> getStateMachine() {
+ return s_fsm;
+ }
+
+ String _description;
+ private State(String description) {
+ _description = description;
+ }
}
String getName();
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/api/src/com/cloud/storage/Snapshot.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/storage/Snapshot.java b/api/src/com/cloud/storage/Snapshot.java
index 99bdee6..2e2965a 100644
--- a/api/src/com/cloud/storage/Snapshot.java
+++ b/api/src/com/cloud/storage/Snapshot.java
@@ -16,14 +16,16 @@
// under the License.
package com.cloud.storage;
-import java.util.Date;
-
-import org.apache.cloudstack.acl.ControlledEntity;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.utils.fsm.StateMachine2;
+import com.cloud.utils.fsm.StateObject;
+import org.apache.cloudstack.acl.ControlledEntity;
import org.apache.cloudstack.api.Identity;
import org.apache.cloudstack.api.InternalIdentity;
-public interface Snapshot extends ControlledEntity, Identity, InternalIdentity {
+import java.util.Date;
+
+public interface Snapshot extends ControlledEntity, Identity, InternalIdentity, StateObject<Snapshot.State> {
public enum Type {
MANUAL,
RECURRING,
@@ -51,13 +53,29 @@ public interface Snapshot extends ControlledEntity, Identity, InternalIdentity {
}
}
- public enum Status {
+ public enum State {
Creating,
CreatedOnPrimary,
BackingUp,
BackedUp,
Error;
+ private final static StateMachine2<State, Event, Snapshot> s_fsm = new StateMachine2<State, Event, Snapshot>();
+
+ public static StateMachine2<State, Event, Snapshot> getStateMachine() {
+ return s_fsm;
+ }
+
+ static {
+ s_fsm.addTransition(null, Event.CreateRequested, Creating);
+ s_fsm.addTransition(Creating, Event.OperationSucceeded, CreatedOnPrimary);
+ s_fsm.addTransition(Creating, Event.OperationNotPerformed, BackedUp);
+ s_fsm.addTransition(Creating, Event.OperationFailed, Error);
+ s_fsm.addTransition(CreatedOnPrimary, Event.BackupToSecondary, BackingUp);
+ s_fsm.addTransition(BackingUp, Event.OperationSucceeded, BackedUp);
+ s_fsm.addTransition(BackingUp, Event.OperationFailed, Error);
+ }
+
public String toString() {
return this.name();
}
@@ -67,6 +85,15 @@ public interface Snapshot extends ControlledEntity, Identity, InternalIdentity {
}
}
+ enum Event {
+ CreateRequested,
+ OperationNotPerformed,
+ BackupToSecondary,
+ BackedupToSecondary,
+ OperationSucceeded,
+ OperationFailed
+ }
+
public static final long MANUAL_POLICY_ID = 0L;
long getAccountId();
@@ -81,7 +108,7 @@ public interface Snapshot extends ControlledEntity, Identity, InternalIdentity {
Type getType();
- Status getStatus();
+ State getState();
HypervisorType getHypervisorType();
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java b/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
index 8ea0d7f..58b7cf1 100644
--- a/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
+++ b/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
@@ -16,16 +16,16 @@
// under the License.
package org.apache.cloudstack.api.response;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.cloudstack.api.ApiConstants;
import com.cloud.serializer.Param;
import com.cloud.storage.Snapshot;
import com.google.gson.annotations.SerializedName;
+import org.apache.cloudstack.api.ApiConstants;
import org.apache.cloudstack.api.BaseResponse;
import org.apache.cloudstack.api.EntityReference;
+import java.util.Date;
+import java.util.List;
+
@EntityReference(value=Snapshot.class)
@SuppressWarnings("unused")
public class SnapshotResponse extends BaseResponse implements ControlledEntityResponse {
@@ -81,7 +81,7 @@ public class SnapshotResponse extends BaseResponse implements ControlledEntityRe
@SerializedName(ApiConstants.STATE)
@Param(description = "the state of the snapshot. BackedUp means that snapshot is ready to be used; Creating - the snapshot is being allocated on the primary storage; BackingUp - the snapshot is being backed up on secondary storage")
- private Snapshot.Status state;
+ private Snapshot.State state;
@SerializedName(ApiConstants.TAGS) @Param(description="the list of resource tags associated with snapshot", responseObject = ResourceTagResponse.class)
private List<ResourceTagResponse> tags;
@@ -149,7 +149,7 @@ public class SnapshotResponse extends BaseResponse implements ControlledEntityRe
this.intervalType = intervalType;
}
- public void setState(Snapshot.Status state) {
+ public void setState(Snapshot.State state) {
this.state = state;
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 7ebe50c..63ec2ef 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -117,6 +117,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.cloudstack</groupId>
+ <artifactId>cloud-mom-rabbitmq</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${cs.mysql.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/core/src/com/cloud/storage/SnapshotVO.java
----------------------------------------------------------------------
diff --git a/core/src/com/cloud/storage/SnapshotVO.java b/core/src/com/cloud/storage/SnapshotVO.java
index e5e3650..c1c5f21 100644
--- a/core/src/com/cloud/storage/SnapshotVO.java
+++ b/core/src/com/cloud/storage/SnapshotVO.java
@@ -16,23 +16,13 @@
// under the License.
package com.cloud.storage;
-import java.util.Date;
-import java.util.UUID;
-
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.EnumType;
-import javax.persistence.Enumerated;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-
-import org.apache.cloudstack.api.Identity;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.utils.db.GenericDao;
import com.google.gson.annotations.Expose;
-import org.apache.cloudstack.api.InternalIdentity;
+
+import javax.persistence.*;
+import java.util.Date;
+import java.util.UUID;
@Entity
@Table(name="snapshots")
@@ -69,7 +59,7 @@ public class SnapshotVO implements Snapshot {
@Expose
@Column(name="status", updatable = true, nullable=false)
@Enumerated(value=EnumType.STRING)
- private Status status;
+ private State status;
@Column(name="snapshot_type")
short snapshotType;
@@ -127,7 +117,7 @@ public class SnapshotVO implements Snapshot {
this.snapshotType = snapshotType;
this.typeDescription = typeDescription;
this.size = size;
- this.status = Status.Creating;
+ this.status = State.Creating;
this.prevSnapshotId = 0;
this.hypervisorType = hypervisorType;
this.version = "2.2";
@@ -252,11 +242,11 @@ public class SnapshotVO implements Snapshot {
}
@Override
- public Status getStatus() {
+ public State getState() {
return status;
}
- public void setStatus(Status status) {
+ public void setStatus(State status) {
this.status = status;
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/pom.xml
----------------------------------------------------------------------
diff --git a/framework/events/pom.xml b/framework/events/pom.xml
new file mode 100644
index 0000000..ef812e5
--- /dev/null
+++ b/framework/events/pom.xml
@@ -0,0 +1,47 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>cloud-framework-events</artifactId>
+ <name>Apache CloudStack Event Notification Framework</name>
+ <parent>
+ <groupId>org.apache.cloudstack</groupId>
+ <artifactId>cloudstack-framework</artifactId>
+ <version>4.1.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.cloudstack</groupId>
+ <artifactId>cloud-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${cs.gson.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <defaultGoal>install</defaultGoal>
+ <sourceDirectory>src</sourceDirectory>
+ <testSourceDirectory>test</testSourceDirectory>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/src/org/apache/cloudstack/framework/events/Event.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/Event.java b/framework/events/src/org/apache/cloudstack/framework/events/Event.java
new file mode 100644
index 0000000..eb6f48d
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/Event.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+import com.google.gson.Gson;
+
+public class Event {
+
+ String eventCategory;
+ String eventType;
+ String eventSource;
+ String resourceType;
+ String resourceUUID;
+ String description;
+
+ public Event(String eventSource, String eventCategory, String eventType, String resourceType,
+ String resourceUUID) {
+ this.eventCategory = eventCategory;
+ this.eventType = eventType;
+ this.eventSource = eventSource;
+ this.resourceType = resourceType;
+ this.resourceUUID = resourceUUID;
+ }
+
+ public String getEventCategory() {
+ return eventCategory;
+ }
+
+ public void setEventCategory(String category) {
+ eventCategory = category;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public void setEventType(String type) {
+ eventType = type;
+ }
+
+ public String getEventSource() {
+ return eventSource;
+ }
+
+ void setEventSource(String source) {
+ eventSource = source;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription (Object message) {
+ Gson gson = new Gson();
+ this.description = gson.toJson(message).toString();
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public String getResourceType() {
+ return resourceType;
+ }
+
+ public void setResourceType(String resourceType) {
+ this.resourceType = resourceType;
+ }
+
+ public void setResourceUUID(String uuid) {
+ this.resourceUUID = uuid;
+ }
+
+ public String getResourceUUID () {
+ return resourceUUID;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
new file mode 100644
index 0000000..c16ee6f
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+import com.cloud.utils.component.Adapter;
+
+import java.util.UUID;
+
+/**
+ * Interface to publish and subscribe to CloudStack events
+ *
+ */
+public interface EventBus extends Adapter{
+
+ /**
+ * publish an event on to the event bus
+ *
+ * @param event event that needs to be published on the event bus
+ */
+ void publish(Event event) throws EventBusException;
+
+ /**
+ * subscribe to events that matches specified event topics
+ *
+ * @param topic defines category and type of the events being subscribed to
+ * @param subscriber subscriber that intends to receive event notification
+ * @return UUID returns the subscription ID
+ */
+ UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException;
+
+ /**
+ * unsubscribe to events of a category and a type
+ *
+ * @param subscriber subscriber that intends to unsubscribe from the event notification
+ */
+ void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java b/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
new file mode 100644
index 0000000..5654ba0
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+public class EventBusException extends Exception{
+ public EventBusException (String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java b/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
new file mode 100644
index 0000000..b1c30c2
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+public interface EventSubscriber {
+
+ /**
+ * Callback method. EventBus calls this method on occurrence of subscribed event
+ *
+ * @param event details of the event
+ */
+ void onEvent(Event event);
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java b/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
new file mode 100644
index 0000000..19b727d
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+public class EventTopic {
+
+ String eventCategory;
+ String eventType;
+ String resourceType;
+ String resourceUUID;
+ String eventSource;
+
+ public EventTopic(String eventCategory, String eventType, String resourceType, String resourceUUID, String eventSource) {
+ this.eventCategory = eventCategory;
+ this.eventType = eventType;
+ this.resourceType = resourceType;
+ this.resourceUUID = resourceUUID;
+ this.eventSource = eventSource;
+ }
+
+ public String getEventCategory() {
+ return eventCategory;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public String getResourceType() {
+ return resourceType;
+ }
+
+ public String getEventSource() {
+ return eventSource;
+ }
+
+ public String getResourceUUID() {
+ return resourceUUID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/pom.xml
----------------------------------------------------------------------
diff --git a/framework/pom.xml b/framework/pom.xml
new file mode 100644
index 0000000..81e0916
--- /dev/null
+++ b/framework/pom.xml
@@ -0,0 +1,35 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>cloudstack-framework</artifactId>
+ <name>Apache CloudStack framework POM</name>
+ <packaging>pom</packaging>
+ <parent>
+ <groupId>org.apache.cloudstack</groupId>
+ <artifactId>cloudstack</artifactId>
+ <version>4.1.0-SNAPSHOT</version>
+ </parent>
+ <build>
+ <defaultGoal>install</defaultGoal>
+ </build>
+ <modules>
+ <module>events</module>
+ </modules>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/plugins/event-bus/rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/plugins/event-bus/rabbitmq/pom.xml b/plugins/event-bus/rabbitmq/pom.xml
new file mode 100644
index 0000000..6a47983
--- /dev/null
+++ b/plugins/event-bus/rabbitmq/pom.xml
@@ -0,0 +1,46 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>cloud-mom-rabbitmq</artifactId>
+ <name>Apache CloudStack Plugin - RabbitMQ Event Bus</name>
+ <parent>
+ <groupId>org.apache.cloudstack</groupId>
+ <artifactId>cloudstack-plugins</artifactId>
+ <version>4.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <dependencies>
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>2.8.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cloudstack</groupId>
+ <artifactId>cloud-framework-events</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <defaultGoal>install</defaultGoal>
+ <sourceDirectory>src</sourceDirectory>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
new file mode 100644
index 0000000..3a06c42
--- /dev/null
+++ b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.mom.rabbitmq;
+
+import com.rabbitmq.client.*;
+import org.apache.cloudstack.framework.events.*;
+import org.apache.log4j.Logger;
+
+import com.cloud.utils.Ternary;
+
+import javax.ejb.Local;
+import javax.naming.ConfigurationException;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Local(value=EventBus.class)
+public class RabbitMQEventBus implements EventBus {
+
+ // details of AMQP server
+ private static String _amqpHost;
+ private static Integer _port;
+ private static String _username;
+ private static String _password;
+
+ // AMQP exchange name where all CloudStack events will be published
+ private static String _amqpExchangeName;
+
+ // hashmap to book keep the registered subscribers
+ private static ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>> _subscribers;
+
+ // connection to AMQP server,
+ private static Connection _connection=null;
+
+ // AMQP server should consider messages acknowledged once delivered if _autoAck is true
+ private static boolean _autoAck = true;
+
+ private ExecutorService executorService;
+ private String _name;
+ private static DisconnectHandler disconnectHandler;
+ private static Integer _retryInterval;
+ private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
+
+ @Override
+ public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+
+ _amqpHost = (String) params.get("server");
+ if (_amqpHost == null || _amqpHost.isEmpty()) {
+ throw new ConfigurationException("Unable to get the AMQP server details");
+ }
+
+ _username = (String) params.get("username");
+ if (_username == null || _username.isEmpty()) {
+ throw new ConfigurationException("Unable to get the username details");
+ }
+
+ _password = (String) params.get("password");
+ if (_password == null || _password.isEmpty()) {
+ throw new ConfigurationException("Unable to get the password details");
+ }
+
+ _amqpExchangeName = (String) params.get("exchangename");
+ if (_amqpExchangeName == null || _amqpExchangeName.isEmpty()) {
+ throw new ConfigurationException("Unable to get the _exchange details on the AMQP server");
+ }
+
+ try {
+ String portStr = (String) params.get("port");
+ if (portStr == null || portStr.isEmpty()) {
+ throw new ConfigurationException("Unable to get the port details of AMQP server");
+ }
+ _port = Integer.parseInt(portStr);
+
+ String retryIntervalStr = (String) params.get("retryinterval");
+ if (retryIntervalStr == null || retryIntervalStr.isEmpty()) {
+ // default to 10s to try out reconnect
+ retryIntervalStr = "10000";
+ }
+ _retryInterval = Integer.parseInt(retryIntervalStr);
+ } catch (NumberFormatException e) {
+ throw new ConfigurationException("Invalid port number/retry interval");
+ }
+
+ _subscribers = new ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>>();
+
+ executorService = Executors.newCachedThreadPool();
+ disconnectHandler = new DisconnectHandler();
+ _name = name;
+ return true;
+ }
+
+ /** Call to subscribe to interested set of events
+ *
+ * @param topic defines category and type of the events being subscribed to
+ * @param subscriber subscriber that intends to receive event notification
+ * @return UUID that represents the subscription with event bus
+ * @throws EventBusException
+ */
+ @Override
+ public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
+
+ if (subscriber == null || topic == null) {
+ throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
+ }
+
+ // create a UUID, that will be used for managing subscriptions and also used as queue name
+ // for on the queue used for the subscriber on the AMQP broker
+ UUID queueId = UUID.randomUUID();
+ String queueName = queueId.toString();
+
+ try {
+ String bindingKey = createBindingKey(topic);
+
+ // store the subscriber details before creating channel
+ _subscribers.put(queueName, new Ternary(bindingKey, null, subscriber));
+
+ // create a channel dedicated for this subscription
+ Connection connection = getConnection();
+ Channel channel = createChannel(connection);
+
+ // create a queue and bind it to the exchange with binding key formed from event topic
+ createExchange(channel, _amqpExchangeName);
+ channel.queueDeclare(queueName, false, false, false, null);
+ channel.queueBind(queueName, _amqpExchangeName, bindingKey);
+
+ // register a callback handler to receive the events that a subscriber subscribed to
+ channel.basicConsume(queueName, _autoAck, queueName,
+ new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(String queueName,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body)
+ throws IOException {
+ Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+ if (queueDetails != null) {
+ EventSubscriber subscriber = queueDetails.third();
+ String routingKey = envelope.getRoutingKey();
+ String eventSource = getEventSourceFromRoutingKey(routingKey);
+ String eventCategory = getEventCategoryFromRoutingKey(routingKey);
+ String eventType = getEventTypeFromRoutingKey(routingKey);
+ String resourceType = getResourceTypeFromRoutingKey(routingKey);
+ String resourceUUID = getResourceUUIDFromRoutingKey(routingKey);
+ Event event = new Event(eventSource, eventCategory, eventType,
+ resourceType, resourceUUID);
+ event.setDescription(new String(body));
+
+ // deliver the event to call back object provided by subscriber
+ subscriber.onEvent(event);
+ }
+ }
+ }
+ );
+
+ // update the channel details for the subscription
+ Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+ queueDetails.second(channel);
+ _subscribers.put(queueName, queueDetails);
+
+ } catch (AlreadyClosedException closedException) {
+ s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName +
+ " will be active after reconnection");
+ } catch (ConnectException connectException) {
+ s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName +
+ " will be active after reconnection");
+ } catch (Exception e) {
+ throw new EventBusException("Failed to subscribe to event due to " + e.getMessage());
+ }
+
+ return queueId;
+ }
+
+ @Override
+ public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
+ try {
+ String classname = subscriber.getClass().getName();
+ String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString();
+ Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+ Channel channel = queueDetails.second();
+ channel.basicCancel(queueName);
+ _subscribers.remove(queueName, queueDetails);
+ } catch (Exception e) {
+ throw new EventBusException("Failed to unsubscribe from event bus due to " + e.getMessage());
+ }
+ }
+
+ // publish event on to the exchange created on AMQP server
+ @Override
+ public void publish(Event event) throws EventBusException {
+
+ String routingKey = createRoutingKey(event);
+ String eventDescription = event.getDescription();
+
+ try {
+ Connection connection = getConnection();
+ Channel channel = createChannel(connection);
+ createExchange(channel, _amqpExchangeName);
+ publishEventToExchange(channel, _amqpExchangeName, routingKey, eventDescription);
+ channel.close();
+ } catch (AlreadyClosedException e) {
+ closeConnection();
+ throw new EventBusException("Failed to publish event to message broker as connection to AMQP broker in lost");
+ } catch (Exception e) {
+ throw new EventBusException("Failed to publish event to message broker due to " + e.getMessage());
+ }
+ }
+
+ /** creates a routing key from the event details.
+ * created routing key will be used while publishing the message to exchange on AMQP server
+ */
+ private String createRoutingKey(Event event) {
+
+ StringBuilder routingKey = new StringBuilder();
+
+ String eventSource = replaceNullWithWildcard(event.getEventSource());
+ eventSource = eventSource.replace(".", "-");
+
+ String eventCategory = replaceNullWithWildcard(event.getEventCategory());
+ eventCategory = eventCategory.replace(".", "-");
+
+ String eventType = replaceNullWithWildcard(event.getEventType());
+ eventType = eventType.replace(".", "-");
+
+ String resourceType = replaceNullWithWildcard(event.getResourceType());
+ resourceType = resourceType.replace(".", "-");
+
+ String resourceUuid = replaceNullWithWildcard(event.getResourceUUID());
+ resourceUuid = resourceUuid.replace(".", "-");
+
+ // routing key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid
+ routingKey.append(eventSource);
+ routingKey.append(".");
+ routingKey.append(eventCategory);
+ routingKey.append(".");
+ routingKey.append(eventType);
+ routingKey.append(".");
+ routingKey.append(resourceType);
+ routingKey.append(".");
+ routingKey.append(resourceUuid);
+
+ return routingKey.toString();
+ }
+
+ /** creates a binding key from the event topic that subscriber specified
+ * binding key will be used to bind the queue created for subscriber to exchange on AMQP server
+ */
+ private String createBindingKey(EventTopic topic) {
+
+ StringBuilder bindingKey = new StringBuilder();
+
+ String eventSource = replaceNullWithWildcard(topic.getEventSource());
+ eventSource = eventSource.replace(".", "-");
+
+ String eventCategory = replaceNullWithWildcard(topic.getEventCategory());
+ eventCategory = eventCategory.replace(".", "-");
+
+ String eventType = replaceNullWithWildcard(topic.getEventType());
+ eventType = eventType.replace(".", "-");
+
+ String resourceType = replaceNullWithWildcard(topic.getResourceType());
+ resourceType = resourceType.replace(".", "-");
+
+ String resourceUuid = replaceNullWithWildcard(topic.getResourceUUID());
+ resourceUuid = resourceUuid.replace(".", "-");
+
+ // binding key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid
+ bindingKey.append(eventSource);
+ bindingKey.append(".");
+ bindingKey.append(eventCategory);
+ bindingKey.append(".");
+ bindingKey.append(eventType);
+ bindingKey.append(".");
+ bindingKey.append(resourceType);
+ bindingKey.append(".");
+ bindingKey.append(resourceUuid);
+
+ return bindingKey.toString();
+ }
+
+ private synchronized Connection getConnection() throws Exception {
+ if (_connection == null) {
+ try {
+ return createConnection();
+ } catch (Exception e) {
+ s_logger.error("Failed to create a connection to AMQP server due to " + e.getMessage());
+ throw e;
+ }
+ } else {
+ return _connection;
+ }
+ }
+
+ private synchronized Connection createConnection() throws Exception {
+ try {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setUsername(_username);
+ factory.setPassword(_password);
+ factory.setVirtualHost("/");
+ factory.setHost(_amqpHost);
+ factory.setPort(_port);
+ Connection connection = factory.newConnection();
+ connection.addShutdownListener(disconnectHandler);
+ _connection = connection;
+ return _connection;
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ private synchronized void closeConnection() {
+ try {
+ if (_connection != null) {
+ _connection.close();
+ }
+ } catch (Exception e) {
+ s_logger.warn("Failed to close connection to AMQP server due to " + e.getMessage());
+ }
+ _connection = null;
+ }
+
+ private synchronized void abortConnection () {
+ if (_connection == null)
+ return;
+
+ try {
+ _connection.abort();
+ } catch (Exception e) {
+ s_logger.warn("Failed to abort connection due to " + e.getMessage());
+ }
+ _connection = null;
+ }
+
+ private String replaceNullWithWildcard(String key) {
+ if (key == null || key.isEmpty()) {
+ return "*";
+ } else {
+ return key;
+ }
+ }
+
+ private Channel createChannel(Connection connection) throws Exception {
+ try {
+ return connection.createChannel();
+ } catch (java.io.IOException exception) {
+ s_logger.warn("Failed to create a channel due to " + exception.getMessage());
+ throw exception;
+ }
+ }
+
+ private void createExchange(Channel channel, String exchangeName) throws Exception {
+ try {
+ channel.exchangeDeclare(exchangeName, "topic", true);
+ } catch (java.io.IOException exception) {
+ s_logger.error("Failed to create exchange" + exchangeName + " on RabbitMQ server");
+ throw exception;
+ }
+ }
+
+ private void publishEventToExchange(Channel channel, String exchangeName,
+ String routingKey, String eventDescription) throws Exception {
+ try {
+ byte[] messageBodyBytes = eventDescription.getBytes();
+ channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
+ } catch (Exception e) {
+ s_logger.error("Failed to publish event " + routingKey + " on exchange " + exchangeName +
+ " of message broker due to " + e.getMessage());
+ throw e;
+ }
+ }
+
+ private String getEventCategoryFromRoutingKey(String routingKey) {
+ String[] keyParts = routingKey.split("\\.");
+ return keyParts[1];
+ }
+
+ private String getEventTypeFromRoutingKey(String routingKey) {
+ String[] keyParts = routingKey.split("\\.");
+ return keyParts[2];
+ }
+
+ private String getEventSourceFromRoutingKey(String routingKey) {
+ String[] keyParts = routingKey.split("\\.");
+ return keyParts[0];
+ }
+
+ private String getResourceTypeFromRoutingKey(String routingKey) {
+ String[] keyParts = routingKey.split("\\.");
+ return keyParts[3];
+ }
+
+ private String getResourceUUIDFromRoutingKey(String routingKey) {
+ String[] keyParts = routingKey.split("\\.");
+ return keyParts[4];
+ }
+
+ @Override
+ public String getName() {
+ return _name;
+ }
+
+ @Override
+ public boolean start() {
+ ReconnectionTask reconnect = new ReconnectionTask(); // initiate connection to AMQP server
+ executorService.submit(reconnect);
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+
+ if (_connection.isOpen()) {
+ for (String subscriberId : _subscribers.keySet()) {
+ Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+ Channel channel = subscriberDetails.second();
+ String queueName = subscriberId;
+ try {
+ channel.queueDelete(queueName);
+ channel.abort();
+ } catch (IOException ioe) {
+ s_logger.warn("Failed to delete queue: " + queueName + " on AMQP server due to " + ioe.getMessage() );
+ }
+ }
+ }
+
+ closeConnection();
+ return true;
+ }
+
+ // logic to deal with loss of connection to AMQP server
+ private class DisconnectHandler implements ShutdownListener {
+
+ @Override
+ public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
+ if (!shutdownSignalException.isInitiatedByApplication()) {
+
+ for (String subscriberId : _subscribers.keySet()) {
+ Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+ subscriberDetails.second(null);
+ _subscribers.put(subscriberId, subscriberDetails);
+ }
+
+ abortConnection(); // disconnected to AMQP server, so abort the connection and channels
+ s_logger.warn("Connection has been shutdown by AMQP server. Attempting to reconnect.");
+
+ // initiate re-connect process
+ ReconnectionTask reconnect = new ReconnectionTask();
+ executorService.submit(reconnect);
+ }
+ }
+ }
+
+ // retry logic to connect back to AMQP server after loss of connection
+ private class ReconnectionTask implements Runnable {
+
+ boolean connected = false;
+ Connection connection = null;
+
+ public void run() {
+
+ while (!connected) {
+ try {
+ Thread.sleep(_retryInterval);
+ } catch (InterruptedException ie) {
+ // ignore timer interrupts
+ }
+
+ try {
+ try {
+ connection = createConnection();
+ connected = true;
+ } catch (IOException ie) {
+ continue; // can't establish connection to AMQP server yet, so continue
+ }
+
+ // prepare consumer on AMQP server for each of subscriber
+ for (String subscriberId : _subscribers.keySet()) {
+ Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+ String bindingKey = subscriberDetails.first();
+ EventSubscriber subscriber = subscriberDetails.third();
+
+ /** create a queue with subscriber ID as queue name and bind it to the exchange
+ * with binding key formed from event topic
+ */
+ Channel channel = createChannel(connection);
+ createExchange(channel, _amqpExchangeName);
+ channel.queueDeclare(subscriberId, false, false, false, null);
+ channel.queueBind(subscriberId, _amqpExchangeName, bindingKey);
+
+ // register a callback handler to receive the events that a subscriber subscribed to
+ channel.basicConsume(subscriberId, _autoAck, subscriberId,
+ new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(String queueName,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body)
+ throws IOException {
+
+ Ternary<String, Channel, EventSubscriber> subscriberDetails
+ = _subscribers.get(queueName); // queue name == subscriber ID
+
+ if (subscriberDetails != null) {
+ EventSubscriber subscriber = subscriberDetails.third();
+ String routingKey = envelope.getRoutingKey();
+ String eventSource = getEventSourceFromRoutingKey(routingKey);
+ String eventCategory = getEventCategoryFromRoutingKey(routingKey);
+ String eventType = getEventTypeFromRoutingKey(routingKey);
+ String resourceType = getResourceTypeFromRoutingKey(routingKey);
+ String resourceUUID = getResourceUUIDFromRoutingKey(routingKey);
+
+ // create event object from the message details obtained from AMQP server
+ Event event = new Event(eventSource, eventCategory, eventType,
+ resourceType, resourceUUID);
+ event.setDescription(new String(body));
+
+ // deliver the event to call back object provided by subscriber
+ subscriber.onEvent(event);
+ }
+ }
+ }
+ );
+
+ // update the channel details for the subscription
+ subscriberDetails.second(channel);
+ _subscribers.put(subscriberId, subscriberDetails);
+ }
+ } catch (Exception e) {
+ s_logger.warn("Failed to recreate queues and binding for the subscribers due to " + e.getMessage());
+ }
+ }
+ return;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
----------------------------------------------------------------------
diff --git a/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java b/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
index 30a1129..2104322 100644
--- a/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
+++ b/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
@@ -18,6 +18,7 @@ package com.cloud.network.guru;
import javax.ejb.Local;
+import com.cloud.event.ActionEventUtils;
import org.apache.log4j.Logger;
import com.cloud.dc.DataCenter;
@@ -25,7 +26,6 @@ import com.cloud.dc.DataCenter.NetworkType;
import com.cloud.deploy.DeployDestination;
import com.cloud.deploy.DeploymentPlan;
import com.cloud.event.EventTypes;
-import com.cloud.event.EventUtils;
import com.cloud.event.EventVO;
import com.cloud.exception.InsufficientVirtualNetworkCapcityException;
import com.cloud.network.Network;
@@ -95,7 +95,7 @@ public class OvsGuestNetworkGuru extends GuestNetworkGuru {
throw new InsufficientVirtualNetworkCapcityException("Unable to allocate vnet as a part of network " + network + " implement ", DataCenter.class, dcId);
}
implemented.setBroadcastUri(BroadcastDomainType.Vswitch.toUri(vnet));
- EventUtils.saveEvent(UserContext.current().getCallerUserId(), network.getAccountId(), EventVO.LEVEL_INFO, EventTypes.EVENT_ZONE_VLAN_ASSIGN, "Assigned Zone Vlan: "+vnet+ " Network Id: "+network.getId(), 0);
+ ActionEventUtils.onCompletedActionEvent(UserContext.current().getCallerUserId(), network.getAccountId(), EventVO.LEVEL_INFO, EventTypes.EVENT_ZONE_VLAN_ASSIGN, "Assigned Zone Vlan: " + vnet + " Network Id: " + network.getId(), 0);
} else {
implemented.setBroadcastUri(network.getBroadcastUri());
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/plugins/pom.xml
----------------------------------------------------------------------
diff --git a/plugins/pom.xml b/plugins/pom.xml
index c5b6e58..f91c6ee 100644
--- a/plugins/pom.xml
+++ b/plugins/pom.xml
@@ -41,6 +41,7 @@
<module>hypervisors/ovm</module>
<module>hypervisors/xen</module>
<module>hypervisors/kvm</module>
+ <module>event-bus/rabbitmq</module>
<module>hypervisors/simulator</module>
<module>hypervisors/baremetal</module>
<module>network-elements/elastic-loadbalancer</module>
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 35d6520..59feef5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -163,6 +163,7 @@
<module>patches</module>
<module>client</module>
<module>test</module>
+ <module>framework</module>
</modules>
<dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 4c3ba6f..ef1b68a 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -80,6 +80,11 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.cloudstack</groupId>
+ <artifactId>cloud-framework-events</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>