You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by mu...@apache.org on 2013/01/25 13:55:09 UTC
[47/50] git commit: -Added recoonect logic using shutdown listner in
RabbitMQEventBus -changed EventBus interface method signtures to return/take
UUID for subscriber management
-Added recoonect logic using shutdown listner in RabbitMQEventBus
-changed EventBus interface method signtures to return/take UUID for
subscriber management
Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/2dd40c28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/2dd40c28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/2dd40c28
Branch: refs/heads/events-framework
Commit: 2dd40c2823bb908ef55be7adbfaa8df7638c30c4
Parents: 28738e4
Author: Murali Reddy <mu...@citrix.com>
Authored: Fri Jan 25 14:18:49 2013 +0530
Committer: Murali Reddy <mu...@citrix.com>
Committed: Fri Jan 25 14:18:49 2013 +0530
----------------------------------------------------------------------
api/src/com/cloud/event/EventCategory.java | 7 +-
api/src/com/cloud/network/Network.java | 2 +
client/tomcatconf/components.xml.in | 1 +
.../apache/cloudstack/framework/events/Event.java | 68 ++-
.../cloudstack/framework/events/EventBus.java | 19 +-
.../framework/events/EventBusException.java | 26 +
.../framework/events/EventSubscriber.java | 6 +-
.../cloudstack/framework/events/EventTopic.java | 22 +-
.../cloudstack/framework/events/Subscribe.java | 8 +-
.../cloudstack/mom/rabbitmq/RabbitMQEventBus.java | 521 +++++++++++++--
server/src/com/cloud/alert/AlertManagerImpl.java | 2 +-
.../src/com/cloud/event/ActionEventCallback.java | 24 +-
server/src/com/cloud/event/AlertGenerator.java | 43 +-
.../src/com/cloud/event/UsageEventGenerator.java | 12 +-
.../src/com/cloud/network/NetworkManagerImpl.java | 142 +---
15 files changed, 670 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/api/src/com/cloud/event/EventCategory.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/event/EventCategory.java b/api/src/com/cloud/event/EventCategory.java
index a0042dd..cee6529 100644
--- a/api/src/com/cloud/event/EventCategory.java
+++ b/api/src/com/cloud/event/EventCategory.java
@@ -48,7 +48,8 @@ public class EventCategory {
return null;
}
- public static final EventCategory ACTION_EVENT = new EventCategory("Action Events");
- public static final EventCategory ALERT_EVENT = new EventCategory("Alert Event");
- public static final EventCategory USAGE_EVENT = new EventCategory("Usage Event");
+ public static final EventCategory ACTION_EVENT = new EventCategory("ActionEvent");
+ public static final EventCategory ALERT_EVENT = new EventCategory("AlertEvent");
+ public static final EventCategory USAGE_EVENT = new EventCategory("UsageEvent");
+ public static final EventCategory RESOURCE_STATE_CHANGE_EVENT = new EventCategory("ResourceStateEvent");
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/api/src/com/cloud/network/Network.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/network/Network.java b/api/src/com/cloud/network/Network.java
index f70d898..b3c8381 100644
--- a/api/src/com/cloud/network/Network.java
+++ b/api/src/com/cloud/network/Network.java
@@ -202,6 +202,7 @@ public interface Network extends ControlledEntity, StateObject<Network.State>, I
}
public enum State {
+
Allocated("Indicates the network configuration is in allocated but not setup"),
Setup("Indicates the network configuration is setup"),
Implementing("Indicates the network configuration is being implemented"),
@@ -210,6 +211,7 @@ public interface Network extends ControlledEntity, StateObject<Network.State>, I
Destroy("Indicates that the network is destroyed");
protected static final StateMachine2<State, Network.Event, Network> s_fsm = new StateMachine2<State, Network.Event, Network>();
+
static {
s_fsm.addTransition(State.Allocated, Event.ImplementNetwork, State.Implementing);
s_fsm.addTransition(State.Implementing, Event.OperationSucceeded, State.Implemented);
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/client/tomcatconf/components.xml.in
----------------------------------------------------------------------
diff --git a/client/tomcatconf/components.xml.in b/client/tomcatconf/components.xml.in
index 8af6d9b..5cd1985 100755
--- a/client/tomcatconf/components.xml.in
+++ b/client/tomcatconf/components.xml.in
@@ -234,6 +234,7 @@ under the License.
<param name="port">5672</param>
<param name="username">guest</param>
<param name="password">guest</param>
+ <param name="exchangename">cloudstack-evetns</param>
</adapter>
</adapters>
<manager name="OvsTunnelManager" key="com.cloud.network.ovs.OvsTunnelManager" class="com.cloud.network.ovs.OvsTunnelManagerImpl"/>
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/Event.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/Event.java b/framework/events/src/org/apache/cloudstack/framework/events/Event.java
index f35bbef..eb6f48d 100644
--- a/framework/events/src/org/apache/cloudstack/framework/events/Event.java
+++ b/framework/events/src/org/apache/cloudstack/framework/events/Event.java
@@ -23,34 +23,44 @@ import com.google.gson.Gson;
public class Event {
- String category;
- String type;
- String routingKey;
- String description;
- String publisher;
- String date;
+ String eventCategory;
+ String eventType;
+ String eventSource;
String resourceType;
+ String resourceUUID;
+ String description;
+
+ public Event(String eventSource, String eventCategory, String eventType, String resourceType,
+ String resourceUUID) {
+ this.eventCategory = eventCategory;
+ this.eventType = eventType;
+ this.eventSource = eventSource;
+ this.resourceType = resourceType;
+ this.resourceUUID = resourceUUID;
+ }
- public Event(String category, String type, String routingKey) {
- this.category = category;
- this.type = type;
- this.routingKey = routingKey;
+ public String getEventCategory() {
+ return eventCategory;
}
- public String getCategory() {
- return category;
+ public void setEventCategory(String category) {
+ eventCategory = category;
}
- public String getType() {
- return type;
+ public String getEventType() {
+ return eventType;
}
- public String getRoutingKey() {
- return routingKey;
+ public void setEventType(String type) {
+ eventType = type;
}
- public void setRoutingKey(String routingKey) {
- this.routingKey = routingKey;
+ public String getEventSource() {
+ return eventSource;
+ }
+
+ void setEventSource(String source) {
+ eventSource = source;
}
public String getDescription() {
@@ -62,19 +72,23 @@ public class Event {
this.description = gson.toJson(message).toString();
}
- public String getEventPublisher() {
- return publisher;
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public String getResourceType() {
+ return resourceType;
}
- void setEventPublisher(String source) {
- this.publisher = source;
+ public void setResourceType(String resourceType) {
+ this.resourceType = resourceType;
}
- public String getDate() {
- return date;
+ public void setResourceUUID(String uuid) {
+ this.resourceUUID = uuid;
}
- void setDate(String date) {
- this.date = date;
+ public String getResourceUUID () {
+ return resourceUUID;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
index 3782b32..c16ee6f 100644
--- a/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
@@ -21,6 +21,8 @@ package org.apache.cloudstack.framework.events;
import com.cloud.utils.component.Adapter;
+import java.util.UUID;
+
/**
* Interface to publish and subscribe to CloudStack events
*
@@ -28,29 +30,26 @@ import com.cloud.utils.component.Adapter;
public interface EventBus extends Adapter{
/**
- * publish an event
+ * publish an event on to the event bus
*
- * @param event event that needs to be published
- * @return true if the event has been successfully published on event bus
+ * @param event event that needs to be published on the event bus
*/
- boolean publish(Event event);
+ void publish(Event event) throws EventBusException;
/**
- * subscribe to events of a category and a type
+ * subscribe to events that matches specified event topics
*
* @param topic defines category and type of the events being subscribed to
* @param subscriber subscriber that intends to receive event notification
- * @return true if the subscriber has been successfully registered.
+ * @return UUID returns the subscription ID
*/
- boolean subscribe(EventTopic topic, EventSubscriber subscriber);
+ UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException;
/**
* unsubscribe to events of a category and a type
*
- * @param topic defines category and type of the events to unsubscribe
* @param subscriber subscriber that intends to unsubscribe from the event notification
- * @return true if the subscriber has been successfully unsubscribed.
*/
- boolean unsubscribe(EventTopic topic, EventSubscriber subscriber);
+ void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException;
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java b/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
new file mode 100644
index 0000000..5654ba0
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+public class EventBusException extends Exception{
+ public EventBusException (String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java b/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
index d69035a..b1c30c2 100644
--- a/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
@@ -24,9 +24,7 @@ public interface EventSubscriber {
/**
* Callback method. EventBus calls this method on occurrence of subscribed event
*
- * @param category category of the event being subscribed (e.g. action, usage, alert etc)
- * @param type type of the event (e.g. vm stop, volume delete etc)
- * @param description description of the event
+ * @param event details of the event
*/
- void recieve(Event event);
+ void onEvent(Event event);
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java b/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
index eabcdf6..19b727d 100644
--- a/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
@@ -23,12 +23,16 @@ public class EventTopic {
String eventCategory;
String eventType;
- String bindingKey;
+ String resourceType;
+ String resourceUUID;
+ String eventSource;
- public EventTopic(String eventCategory, String eventType, String bindingKey) {
+ public EventTopic(String eventCategory, String eventType, String resourceType, String resourceUUID, String eventSource) {
this.eventCategory = eventCategory;
this.eventType = eventType;
- this.bindingKey = bindingKey;
+ this.resourceType = resourceType;
+ this.resourceUUID = resourceUUID;
+ this.eventSource = eventSource;
}
public String getEventCategory() {
@@ -39,7 +43,15 @@ public class EventTopic {
return eventType;
}
- public String getBindingKey() {
- return bindingKey;
+ public String getResourceType() {
+ return resourceType;
+ }
+
+ public String getEventSource() {
+ return eventSource;
+ }
+
+ public String getResourceUUID() {
+ return resourceUUID;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java b/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java
index 00aa5e5..74997f6 100644
--- a/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java
+++ b/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java
@@ -19,15 +19,15 @@
package org.apache.cloudstack.framework.events;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
@Target({METHOD })
@Retention(RUNTIME)
-public @interface Subscribe {
+public @interface Subscribe {
String eventCategory();
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
index 3c51cd7..9049fe8 100644
--- a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
+++ b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
@@ -1,126 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.cloudstack.mom.rabbitmq;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.MessageProperties;
-import org.apache.cloudstack.framework.events.Event;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventSubscriber;
-import org.apache.cloudstack.framework.events.EventTopic;
+import com.rabbitmq.client.*;
+import org.apache.cloudstack.framework.events.*;
import org.apache.log4j.Logger;
+import com.cloud.utils.Ternary;
+
import javax.ejb.Local;
import javax.naming.ConfigurationException;
+import java.io.IOException;
+import java.net.ConnectException;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
@Local(value=EventBus.class)
public class RabbitMQEventBus implements EventBus {
+ // details of AMQP server
+ private static String _amqpHost;
+ private static Integer _port;
+ private static String _username;
+ private static String _password;
+
+ // AMQP exchange name where all CloudStack events will be published
+ private static String _amqpExchangeName;
+
+ // hashmap to book keep the registered subscribers
+ private static ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>> _subscribers;
+
+ // connection to AMQP server,
+ private static Connection _connection=null;
- public static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
- public Connection _connection = null;
- public Channel _channel = null;
- private String _rabbitMqHost;
- private Integer _port;
- private String _username;
- private String _password;
+ // AMQP server should consider messages acknowledged once delivered if _autoAck is true
+ private static boolean _autoAck = true;
+
+ private ExecutorService executorService;
+ private String _name;
+ private static DisconnectHandler disconnectHandler;
+ private static Integer _retryInterval;
+ private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
- _rabbitMqHost = (String) params.get("server");
- _port = Integer.parseInt((String) params.get("port"));
+
+ _amqpHost = (String) params.get("server");
+ if (_amqpHost == null || _amqpHost.isEmpty()) {
+ throw new ConfigurationException("Unable to get the AMQP server details");
+ }
+
_username = (String) params.get("username");
+ if (_username == null || _username.isEmpty()) {
+ throw new ConfigurationException("Unable to get the username details");
+ }
+
_password = (String) params.get("password");
+ if (_password == null || _password.isEmpty()) {
+ throw new ConfigurationException("Unable to get the password details");
+ }
+
+ _amqpExchangeName = (String) params.get("exchangename");
+ if (_amqpExchangeName == null || _amqpExchangeName.isEmpty()) {
+ throw new ConfigurationException("Unable to get the _exchange details on the AMQP server");
+ }
+
+ try {
+ String portStr = (String) params.get("port");
+ if (portStr == null || portStr.isEmpty()) {
+ throw new ConfigurationException("Unable to get the port details of AMQP server");
+ }
+ _port = Integer.parseInt(portStr);
+
+ String retryIntervalStr = (String) params.get("retryinterval");
+ if (retryIntervalStr == null || retryIntervalStr.isEmpty()) {
+ // default to 10s to try out reconnect
+ retryIntervalStr = "10000";
+ }
+ _retryInterval = Integer.parseInt(retryIntervalStr);
+ } catch (NumberFormatException e) {
+ throw new ConfigurationException("Invalid port number/retry interval");
+ }
+
+ _subscribers = new ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>>();
+
+ executorService = Executors.newCachedThreadPool();
+ disconnectHandler = new DisconnectHandler();
+ _name = name;
return true;
}
+ /** Call to subscribe to interested set of events
+ *
+ * @param topic defines category and type of the events being subscribed to
+ * @param subscriber subscriber that intends to receive event notification
+ * @return UUID that represents the subscription with event bus
+ * @throws EventBusException
+ */
@Override
- public String getName() {
- return null;
- }
+ public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
- @Override
- public boolean start() {
- return true;
+ if (subscriber == null || topic == null) {
+ throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
+ }
+
+ // create a UUID, that will be used for managing subscriptions and also used as queue name
+ // for on the queue used for the subscriber on the AMQP broker
+ UUID queueId = UUID.randomUUID();
+ String queueName = queueId.toString();
+
+ try {
+ String bindingKey = createBindingKey(topic);
+
+ // store the subscriber details before creating channel
+ _subscribers.put(queueName, new Ternary(bindingKey, null, subscriber));
+
+ // create a channel dedicated for this subscription
+ Connection connection = getConnection();
+ Channel channel = createChannel(connection);
+
+ // create a queue and bind it to the exchange with binding key formed from event topic
+ createExchange(channel, _amqpExchangeName);
+ channel.queueDeclare(queueName, false, false, false, null);
+ channel.queueBind(queueName, _amqpExchangeName, bindingKey);
+
+ // register a callback handler to receive the events that a subscriber subscribed to
+ channel.basicConsume(queueName, _autoAck, queueName,
+ new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(String queueName,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body)
+ throws IOException {
+ Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+ if (queueDetails != null) {
+ EventSubscriber subscriber = queueDetails.third();
+ String routingKey = envelope.getRoutingKey();
+ Event event = new Event(null, getEventCategoryFromRoutingKey(routingKey),
+ getEventTypeFromRoutingKey(routingKey), null, null);
+ event.setDescription(body.toString());
+
+ // deliver the event to call back object provided by subscriber
+ subscriber.onEvent(event);
+ }
+ }
+ }
+ );
+
+ // update the channel details for the subscription
+ Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+ queueDetails.second(channel);
+ _subscribers.put(queueName, queueDetails);
+
+ } catch (AlreadyClosedException closedException) {
+ s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName +
+ " will be active after reconnection");
+ } catch (ConnectException connectException) {
+ s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName +
+ " will be active after reconnection");
+ } catch (Exception e) {
+ throw new EventBusException("Failed to subscribe to event due to " + e.getMessage());
+ }
+
+ return queueId;
}
@Override
- public boolean stop() {
- return true;
+ public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
+ try {
+ String classname = subscriber.getClass().getName();
+ String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString();
+ Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+ Channel channel = queueDetails.second();
+ channel.basicCancel(queueName);
+ _subscribers.remove(queueName, queueDetails);
+ } catch (Exception e) {
+ throw new EventBusException("Failed to unsubscribe from event bus due to " + e.getMessage());
+ }
}
+ // publish event on to the exchange created on AMQP server
@Override
- public boolean publish(Event event) {
- String exchangeName = getExchangeName(event.getCategory());
- String routingKey = getRoutingKey(event.getType());
+ public void publish(Event event) throws EventBusException {
+
+ String routingKey = createRoutingKey(event);
String eventDescription = event.getDescription();
try {
- createConnection();
- createExchange(exchangeName);
- publishEventToExchange(exchangeName, routingKey, eventDescription);
+ Connection connection = getConnection();
+ Channel channel = createChannel(connection);
+ createExchange(channel, _amqpExchangeName);
+ publishEventToExchange(channel, _amqpExchangeName, routingKey, eventDescription);
+ channel.close();
+ } catch (AlreadyClosedException e) {
+ closeConnection();
+ throw new EventBusException("Failed to publish event to message broker as connection to AMQP broker in lost");
} catch (Exception e) {
- s_logger.error("Failed to publish event to message broker due to " + e.getMessage());
- return false;
+ throw new EventBusException("Failed to publish event to message broker due to " + e.getMessage());
}
- return true;
}
- @Override
- public boolean subscribe(EventTopic topic, EventSubscriber subscriber) {
- return true;
- }
+ /** creates a routing key from the event details.
+ * created routing key will be used while publishing the message to exchange on AMQP server
+ */
+ private String createRoutingKey(Event event) {
- @Override
- public boolean unsubscribe(EventTopic topic, EventSubscriber subscriber) {
- return true;
+ StringBuilder routingKey = new StringBuilder();
+
+ String eventSource = replaceNullWithWildcard(event.getEventSource());
+ eventSource = eventSource.replace(".", "-");
+
+ String eventCategory = replaceNullWithWildcard(event.getEventCategory());
+ eventCategory = eventCategory.replace(".", "-");
+
+ String eventType = replaceNullWithWildcard(event.getEventType());
+ eventType = eventType.replace(".", "-");
+
+ String resourceType = replaceNullWithWildcard(event.getResourceType());
+ resourceType = resourceType.replace(".", "-");
+
+ String resourceUuid = replaceNullWithWildcard(event.getResourceUUID());
+ resourceUuid = resourceUuid.replace(".", "-");
+
+ // routing key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid
+ routingKey.append(eventSource);
+ routingKey.append(".");
+ routingKey.append(eventCategory);
+ routingKey.append(".");
+ routingKey.append(eventType);
+ routingKey.append(".");
+ routingKey.append(resourceType);
+ routingKey.append(".");
+ routingKey.append(resourceUuid);
+
+ return routingKey.toString();
}
- private String getExchangeName(String eventCategory) {
- return "CloudStack " + eventCategory;
+ /** creates a binding key from the event topic that subscriber specified
+ * binding key will be used to bind the queue created for subscriber to exchange on AMQP server
+ */
+ private String createBindingKey(EventTopic topic) {
+
+ StringBuilder bindingKey = new StringBuilder();
+
+ String eventSource = replaceNullWithWildcard(topic.getEventSource());
+ eventSource = eventSource.replace(".", "-");
+
+ String eventCategory = replaceNullWithWildcard(topic.getEventCategory());
+ eventCategory = eventCategory.replace(".", "-");
+
+ String eventType = replaceNullWithWildcard(topic.getEventType());
+ eventType = eventType.replace(".", "-");
+
+ String resourceType = replaceNullWithWildcard(topic.getResourceType());
+ resourceType = resourceType.replace(".", "-");
+
+ String resourceUuid = replaceNullWithWildcard(topic.getResourceUUID());
+ resourceUuid = resourceUuid.replace(".", "-");
+
+ // binding key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid
+ bindingKey.append(eventSource);
+ bindingKey.append(".");
+ bindingKey.append(eventCategory);
+ bindingKey.append(".");
+ bindingKey.append(eventType);
+ bindingKey.append(".");
+ bindingKey.append(resourceType);
+ bindingKey.append(".");
+ bindingKey.append(resourceUuid);
+
+ return bindingKey.toString();
}
- private String getRoutingKey(String eventType) {
- return eventType;
+ private synchronized Connection getConnection() throws Exception {
+ if (_connection == null) {
+ try {
+ return createConnection();
+ } catch (Exception e) {
+ s_logger.error("Failed to create a connection to AMQP server due to " + e.getMessage());
+ throw e;
+ }
+ } else {
+ return _connection;
+ }
}
- private void createConnection() throws Exception {
+ private synchronized Connection createConnection() throws Exception {
try {
- // obtain a connection to RabbitMQ server
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(_username);
factory.setPassword(_password);
factory.setVirtualHost("/");
- factory.setHost(_rabbitMqHost);
+ factory.setHost(_amqpHost);
factory.setPort(_port);
- _connection = factory.newConnection();
- _channel = _connection.createChannel();
+ Connection connection = factory.newConnection();
+ connection.addShutdownListener(disconnectHandler);
+ _connection = connection;
+ return _connection;
} catch (Exception e) {
- s_logger.error("Failed to create a connection to RabbitMQ server due to " + e.getMessage());
throw e;
}
}
- private void createExchange(String exchangeName) throws Exception {
+ private synchronized void closeConnection() {
try {
- _channel.exchangeDeclare(exchangeName, "topic", true);
+ if (_connection != null) {
+ _connection.close();
+ }
+ } catch (Exception e) {
+ s_logger.warn("Failed to close connection to AMQP server due to " + e.getMessage());
+ }
+ _connection = null;
+ }
+
+ private synchronized void abortConnection () {
+ if (_connection == null)
+ return;
+
+ try {
+ _connection.abort();
+ } catch (Exception e) {
+ s_logger.warn("Failed to abort connection due to " + e.getMessage());
+ }
+ _connection = null;
+ }
+
+ private String replaceNullWithWildcard(String key) {
+ if (key == null || key.isEmpty()) {
+ return "*";
+ } else {
+ return key;
+ }
+ }
+
+ private Channel createChannel(Connection connection) throws Exception {
+ try {
+ return connection.createChannel();
+ } catch (java.io.IOException exception) {
+ s_logger.warn("Failed to create a channel due to " + exception.getMessage());
+ throw exception;
+ }
+ }
+
+ private void createExchange(Channel channel, String exchangeName) throws Exception {
+ try {
+ channel.exchangeDeclare(exchangeName, "topic", true);
} catch (java.io.IOException exception) {
s_logger.error("Failed to create exchange" + exchangeName + " on RabbitMQ server");
throw exception;
}
}
- private void publishEventToExchange(String exchangeName, String routingKey, String eventDescription) throws Exception {
+ private void publishEventToExchange(Channel channel, String exchangeName,
+ String routingKey, String eventDescription) throws Exception {
try {
- _channel.txSelect();
byte[] messageBodyBytes = eventDescription.getBytes();
- _channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
- _channel.txCommit();
+ channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
} catch (Exception e) {
s_logger.error("Failed to publish event " + routingKey + " on exchange " + exchangeName +
" of message broker due to " + e.getMessage());
throw e;
}
}
+
+ private String getEventCategoryFromRoutingKey(String routingKey) {
+ String[] keyParts = routingKey.split("\\.");
+ return keyParts[1];
+ }
+
+ private String getEventTypeFromRoutingKey(String routingKey) {
+ String[] keyParts = routingKey.split("\\.");
+ return keyParts[2];
+ }
+
+ private String getEventSourceFromRoutingKey(String routingKey) {
+ String[] keyParts = routingKey.split("\\.");
+ return keyParts[0];
+ }
+
+ @Override
+ public String getName() {
+ return _name;
+ }
+
+ @Override
+ public boolean start() {
+ ReconnectionTask reconnect = new ReconnectionTask(); // initiate connection to AMQP server
+ executorService.submit(reconnect);
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+
+ if (_connection.isOpen()) {
+ for (String subscriberId : _subscribers.keySet()) {
+ Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+ Channel channel = subscriberDetails.second();
+ String queueName = subscriberId;
+ try {
+ channel.queueDelete(queueName);
+ channel.abort();
+ } catch (IOException ioe) {
+ s_logger.warn("Failed to delete queue: " + queueName + " on AMQP server due to " + ioe.getMessage() );
+ }
+ }
+ }
+
+ closeConnection();
+ return true;
+ }
+
+ // logic to deal with loss of connection to AMQP server
+ private class DisconnectHandler implements ShutdownListener {
+
+ @Override
+ public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
+ if (!shutdownSignalException.isInitiatedByApplication()) {
+
+ for (String subscriberId : _subscribers.keySet()) {
+ Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+ subscriberDetails.second(null);
+ _subscribers.put(subscriberId, subscriberDetails);
+ }
+
+ abortConnection(); // disconnected to AMQP server, so abort the connection and channels
+ s_logger.warn("Connection has been shutdown by AMQP server. Attempting to reconnect.");
+
+ // initiate re-connect process
+ ReconnectionTask reconnect = new ReconnectionTask();
+ executorService.submit(reconnect);
+ }
+ }
+ }
+
+ // retry logic to connect back to AMQP server after loss of connection
+ private class ReconnectionTask implements Runnable {
+
+ boolean connected = false;
+ Connection connection = null;
+
+ public void run() {
+
+ while (!connected) {
+ try {
+ Thread.sleep(_retryInterval);
+ } catch (InterruptedException ie) {
+ // ignore timer interrupts
+ }
+
+ try {
+ try {
+ connection = createConnection();
+ connected = true;
+ } catch (IOException ie) {
+ continue; // can't establish connection to AMQP server yet, so continue
+ }
+
+ // prepare consumer on AMQP server for each of subscriber
+ for (String subscriberId : _subscribers.keySet()) {
+ Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+ String bindingKey = subscriberDetails.first();
+ EventSubscriber subscriber = subscriberDetails.third();
+
+ /** create a queue with subscriber ID as queue name and bind it to the exchange
+ * with binding key formed from event topic
+ */
+ Channel channel = createChannel(connection);
+ createExchange(channel, _amqpExchangeName);
+ channel.queueDeclare(subscriberId, false, false, false, null);
+ channel.queueBind(subscriberId, _amqpExchangeName, bindingKey);
+
+ // register a callback handler to receive the events that a subscriber subscribed to
+ channel.basicConsume(subscriberId, _autoAck, subscriberId,
+ new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(String queueName,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body)
+ throws IOException {
+
+ Ternary<String, Channel, EventSubscriber> subscriberDetails
+ = _subscribers.get(queueName); // queue name == subscriber ID
+
+ if (subscriberDetails != null) {
+ EventSubscriber subscriber = subscriberDetails.third();
+ String routingKey = envelope.getRoutingKey();
+ Event event = new Event(null, getEventCategoryFromRoutingKey(routingKey),
+ getEventTypeFromRoutingKey(routingKey), null, null);
+ event.setDescription(body.toString());
+
+ // deliver the event to call back object provided by subscriber
+ subscriber.onEvent(event);
+ }
+ }
+ }
+ );
+
+ // update the channel details for the subscription
+ subscriberDetails.second(channel);
+ _subscribers.put(subscriberId, subscriberDetails);
+ }
+ } catch (Exception e) {
+ s_logger.warn("Failed to recreate queues and binding for the subscribers due to " + e.getMessage());
+ }
+ }
+ return;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/server/src/com/cloud/alert/AlertManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/alert/AlertManagerImpl.java b/server/src/com/cloud/alert/AlertManagerImpl.java
index 8eda011..1a93f97 100755
--- a/server/src/com/cloud/alert/AlertManagerImpl.java
+++ b/server/src/com/cloud/alert/AlertManagerImpl.java
@@ -263,7 +263,7 @@ public class AlertManagerImpl implements AlertManager {
public void sendAlert(short alertType, long dataCenterId, Long podId, String subject, String body) {
// publish alert
- AlertGenerator.publishAlert(getAlertType(alertType), dataCenterId, podId, subject, body);
+ AlertGenerator.publishAlertOnEventBus(getAlertType(alertType), dataCenterId, podId, subject, body);
// TODO: queue up these messages and send them as one set of issues once a certain number of issues is reached? If that's the case,
// shouldn't we have a type/severity as part of the API so that severe errors get sent right away?
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/server/src/com/cloud/event/ActionEventCallback.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/event/ActionEventCallback.java b/server/src/com/cloud/event/ActionEventCallback.java
index 93c2fdc..eeec2b4 100644
--- a/server/src/com/cloud/event/ActionEventCallback.java
+++ b/server/src/com/cloud/event/ActionEventCallback.java
@@ -25,6 +25,8 @@ import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
import org.apache.cloudstack.framework.events.Event;
import org.apache.cloudstack.framework.events.EventBus;
+import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.log4j.Logger;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
@@ -36,6 +38,7 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce
protected static EventBus _eventBus = null;
protected static boolean _eventBusLoaded = false;
+ private static final Logger s_logger = Logger.getLogger(ActionEventCallback.class);
@Override
public Object intercept(Object object, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
@@ -144,21 +147,34 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce
return this;
}
- void publishOnEventBus(long userId, long accountId, String type, com.cloud.event.Event.State state, String description) {
+ void publishOnEventBus(long userId, long accountId, String eventType, com.cloud.event.Event.State state, String description) {
if (getEventBus() != null) {
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("user", String.valueOf(userId));
eventDescription.put("account", String.valueOf(accountId));
eventDescription.put("state", state.toString());
eventDescription.put("description", description);
- Event event = new Event(EventCategory.ACTION_EVENT.getName(), type, type);
+
+ int index = eventType.lastIndexOf("\\.");
+
+ String resourceType = null;
+ if (index != -1 ) {
+ resourceType = eventType.substring(0, index);
+ }
+
+ Event event = new Event(null, EventCategory.ACTION_EVENT.getName(), eventType,
+ resourceType, null);
event.setDescription(eventDescription);
- _eventBus.publish(event);
+
+ try {
+ _eventBus.publish(event);
+ } catch (EventBusException e) {
+ s_logger.warn("Failed to publish action event on the the event bus.");
+ }
}
}
private EventBus getEventBus() {
- //TODO: check if there is way of getting single adapter
if (_eventBus == null) {
if (!_eventBusLoaded) {
ComponentLocator locator = ComponentLocator.getLocator("management-server");
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/server/src/com/cloud/event/AlertGenerator.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/event/AlertGenerator.java b/server/src/com/cloud/event/AlertGenerator.java
index 16ef73e..5421aec 100644
--- a/server/src/com/cloud/event/AlertGenerator.java
+++ b/server/src/com/cloud/event/AlertGenerator.java
@@ -1,9 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
package com.cloud.event;
import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.ComponentLocator;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.Event;
+import org.apache.cloudstack.framework.events.*;
+import org.apache.log4j.Logger;
import java.util.Enumeration;
import java.util.HashMap;
@@ -11,13 +28,11 @@ import java.util.Map;
public class AlertGenerator {
+ private static final Logger s_logger = Logger.getLogger(AlertGenerator.class);
protected static EventBus _eventBus = null;
protected static boolean _eventBusLoaded = false;
- public static void publishAlert(String alertType, long dataCenterId, Long podId, String subject, String body) {
- }
-
- void publishOnEventBus(String alertType, long dataCenterId, Long podId, String subject, String body) {
+ public static void publishAlertOnEventBus(String alertType, long dataCenterId, Long podId, String subject, String body) {
if (getEventBus() != null) {
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("alertType", alertType);
@@ -25,14 +40,22 @@ public class AlertGenerator {
eventDescription.put("podId", Long.toString(podId));
eventDescription.put("subject", subject);
eventDescription.put("body", body);
- Event event = new Event(EventCategory.ALERT_EVENT.getName(), alertType, alertType);
+ org.apache.cloudstack.framework.events.Event event =
+ new org.apache.cloudstack.framework.events.Event(null,
+ EventCategory.ALERT_EVENT.getName(),
+ alertType,
+ null,
+ null);
event.setDescription(eventDescription);
- _eventBus.publish(event);
+ try {
+ _eventBus.publish(event);
+ } catch (EventBusException e) {
+ s_logger.warn("Failed to publish alert on the the event bus.");
+ }
}
}
- private EventBus getEventBus() {
- //TODO: check if there is way of getting single adapter
+ private static EventBus getEventBus() {
if (_eventBus == null) {
if (!_eventBusLoaded) {
ComponentLocator locator = ComponentLocator.getLocator("management-server");
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/server/src/com/cloud/event/UsageEventGenerator.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/event/UsageEventGenerator.java b/server/src/com/cloud/event/UsageEventGenerator.java
index edf6a42..80353e5 100644
--- a/server/src/com/cloud/event/UsageEventGenerator.java
+++ b/server/src/com/cloud/event/UsageEventGenerator.java
@@ -4,6 +4,8 @@ import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.ComponentLocator;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.Event;
+import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.log4j.Logger;
import java.util.Enumeration;
import java.util.HashMap;
@@ -11,6 +13,7 @@ import java.util.Map;
public class UsageEventGenerator {
+ private static final Logger s_logger = Logger.getLogger(UsageEventGenerator.class);
protected static EventBus _eventBus = null;
protected static boolean _eventBusLoaded = false;
@@ -54,14 +57,17 @@ public class UsageEventGenerator {
}
eventDescription.put("resourceName", resourceName);
eventDescription.put("resourceType", resourceType);
- Event event = new Event(EventCategory.USAGE_EVENT.getName(), usageType, usageType);
+ Event event = new Event(null, EventCategory.USAGE_EVENT.getName(), usageType, null, null);
event.setDescription(eventDescription);
- _eventBus.publish(event);
+ try {
+ _eventBus.publish(event);
+ } catch (EventBusException e) {
+ s_logger.warn("Failed to publish usage event on the the event bus.");
+ }
}
}
private static EventBus getEventBus() {
- //TODO: check if there is way of getting single adapter
if (_eventBus == null) {
if (!_eventBusLoaded) {
ComponentLocator locator = ComponentLocator.getLocator("management-server");
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/server/src/com/cloud/network/NetworkManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/network/NetworkManagerImpl.java b/server/src/com/cloud/network/NetworkManagerImpl.java
index 1775f88..04f4488 100755
--- a/server/src/com/cloud/network/NetworkManagerImpl.java
+++ b/server/src/com/cloud/network/NetworkManagerImpl.java
@@ -16,39 +16,9 @@
// under the License.
package com.cloud.network;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import javax.ejb.Local;
-import javax.naming.ConfigurationException;
-
-import org.apache.cloudstack.acl.ControlledEntity.ACLType;
-import org.apache.cloudstack.acl.SecurityChecker.AccessType;
-import org.apache.log4j.Logger;
-
import com.cloud.agent.AgentManager;
import com.cloud.agent.Listener;
-import com.cloud.agent.api.AgentControlAnswer;
-import com.cloud.agent.api.AgentControlCommand;
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.CheckNetworkAnswer;
-import com.cloud.agent.api.CheckNetworkCommand;
-import com.cloud.agent.api.Command;
-import com.cloud.agent.api.StartupCommand;
-import com.cloud.agent.api.StartupRoutingCommand;
+import com.cloud.agent.api.*;
import com.cloud.agent.api.to.NicTO;
import com.cloud.alert.AlertManager;
import com.cloud.api.ApiDBUtils;
@@ -56,15 +26,9 @@ import com.cloud.configuration.Config;
import com.cloud.configuration.ConfigurationManager;
import com.cloud.configuration.Resource.ResourceType;
import com.cloud.configuration.dao.ConfigurationDao;
-import com.cloud.dc.AccountVlanMapVO;
-import com.cloud.dc.DataCenter;
+import com.cloud.dc.*;
import com.cloud.dc.DataCenter.NetworkType;
-import com.cloud.dc.DataCenterVO;
-import com.cloud.dc.Pod;
-import com.cloud.dc.PodVlanMapVO;
-import com.cloud.dc.Vlan;
import com.cloud.dc.Vlan.VlanType;
-import com.cloud.dc.VlanVO;
import com.cloud.dc.dao.AccountVlanMapDao;
import com.cloud.dc.dao.DataCenterDao;
import com.cloud.dc.dao.PodVlanMapDao;
@@ -77,64 +41,28 @@ import com.cloud.domain.dao.DomainDao;
import com.cloud.event.EventTypes;
import com.cloud.event.UsageEventGenerator;
import com.cloud.event.dao.UsageEventDao;
-import com.cloud.exception.AccountLimitException;
-import com.cloud.exception.ConcurrentOperationException;
-import com.cloud.exception.ConnectionException;
-import com.cloud.exception.InsufficientAddressCapacityException;
-import com.cloud.exception.InsufficientCapacityException;
-import com.cloud.exception.InsufficientVirtualNetworkCapcityException;
-import com.cloud.exception.InvalidParameterValueException;
-import com.cloud.exception.PermissionDeniedException;
-import com.cloud.exception.ResourceAllocationException;
-import com.cloud.exception.ResourceUnavailableException;
-import com.cloud.exception.UnsupportedServiceException;
+import com.cloud.exception.*;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.dao.HostDao;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.network.IpAddress.State;
-import com.cloud.network.Network.Capability;
-import com.cloud.network.Network.GuestType;
-import com.cloud.network.Network.Provider;
-import com.cloud.network.Network.Service;
+import com.cloud.network.Network.*;
import com.cloud.network.Networks.AddressFormat;
import com.cloud.network.Networks.BroadcastDomainType;
import com.cloud.network.Networks.IsolationType;
import com.cloud.network.Networks.TrafficType;
import com.cloud.network.addr.PublicIp;
-import com.cloud.network.Network.Event;
import com.cloud.network.dao.*;
import com.cloud.network.element.*;
-import com.cloud.network.dao.FirewallRulesDao;
-import com.cloud.network.dao.IPAddressDao;
-import com.cloud.network.dao.LoadBalancerDao;
-import com.cloud.network.dao.NetworkDao;
-import com.cloud.network.dao.NetworkServiceMapDao;
-import com.cloud.network.dao.PhysicalNetworkDao;
-import com.cloud.network.dao.PhysicalNetworkServiceProviderDao;
-import com.cloud.network.dao.PhysicalNetworkTrafficTypeDao;
-import com.cloud.network.dao.PhysicalNetworkTrafficTypeVO;
-import com.cloud.network.element.DhcpServiceProvider;
-import com.cloud.network.element.IpDeployer;
-import com.cloud.network.element.LoadBalancingServiceProvider;
-import com.cloud.network.element.NetworkElement;
-import com.cloud.network.element.StaticNatServiceProvider;
-import com.cloud.network.element.UserDataServiceProvider;
import com.cloud.network.guru.NetworkGuru;
import com.cloud.network.lb.LoadBalancingRule;
import com.cloud.network.lb.LoadBalancingRule.LbDestination;
import com.cloud.network.lb.LoadBalancingRule.LbStickinessPolicy;
import com.cloud.network.lb.LoadBalancingRulesManager;
-import com.cloud.network.rules.FirewallManager;
-import com.cloud.network.rules.FirewallRule;
+import com.cloud.network.rules.*;
import com.cloud.network.rules.FirewallRule.Purpose;
-import com.cloud.network.rules.FirewallRuleVO;
-import com.cloud.network.rules.PortForwardingRuleVO;
-import com.cloud.network.rules.RulesManager;
-import com.cloud.network.rules.StaticNat;
-import com.cloud.network.rules.StaticNatRule;
-import com.cloud.network.rules.StaticNatRuleImpl;
import com.cloud.network.rules.dao.PortForwardingRulesDao;
import com.cloud.network.vpc.NetworkACLManager;
import com.cloud.network.vpc.VpcManager;
@@ -147,11 +75,7 @@ import com.cloud.offerings.NetworkOfferingVO;
import com.cloud.offerings.dao.NetworkOfferingDao;
import com.cloud.offerings.dao.NetworkOfferingServiceMapDao;
import com.cloud.org.Grouping;
-import com.cloud.user.Account;
-import com.cloud.user.AccountManager;
-import com.cloud.user.ResourceLimitService;
-import com.cloud.user.User;
-import com.cloud.user.UserContext;
+import com.cloud.user.*;
import com.cloud.user.dao.AccountDao;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Pair;
@@ -159,32 +83,30 @@ import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.Inject;
import com.cloud.utils.component.Manager;
import com.cloud.utils.concurrency.NamedThreadFactory;
-import com.cloud.utils.db.DB;
-import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.*;
import com.cloud.utils.db.JoinBuilder.JoinType;
-import com.cloud.utils.db.SearchBuilder;
-import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
-import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.fsm.NoTransitionException;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.utils.net.Ip;
import com.cloud.utils.net.NetUtils;
-import com.cloud.vm.Nic;
-import com.cloud.vm.NicProfile;
-import com.cloud.vm.NicVO;
-import com.cloud.vm.ReservationContext;
-import com.cloud.vm.ReservationContextImpl;
-import com.cloud.vm.UserVmVO;
-import com.cloud.vm.VMInstanceVO;
-import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.*;
import com.cloud.vm.VirtualMachine.Type;
-import com.cloud.vm.VirtualMachineProfile;
-import com.cloud.vm.VirtualMachineProfileImpl;
import com.cloud.vm.dao.NicDao;
import com.cloud.vm.dao.UserVmDao;
import com.cloud.vm.dao.VMInstanceDao;
+import org.apache.cloudstack.acl.ControlledEntity.ACLType;
+import org.apache.cloudstack.acl.SecurityChecker.AccessType;
+import org.apache.log4j.Logger;
+
+import javax.ejb.Local;
+import javax.naming.ConfigurationException;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* NetworkManagerImpl implements NetworkManager.
@@ -2059,9 +1981,12 @@ public class NetworkManagerImpl implements NetworkManager, Manager, Listener {
s_logger.debug("Network is not implemented: " + network);
return false;
}
-
- network.setState(Network.State.Shutdown);
- _networksDao.update(network.getId(), network);
+ try {
+ stateTransitTo(network, Event.DestroyNetwork);
+ } catch (NoTransitionException e) {
+ network.setState(Network.State.Shutdown);
+ _networksDao.update(network.getId(), network);
+ }
boolean success = shutdownNetworkElementsAndResources(context, cleanupElements, network);
@@ -2076,15 +2001,22 @@ public class NetworkManagerImpl implements NetworkManager, Manager, Listener {
guru.shutdown(profile, _networkOfferingDao.findById(network.getNetworkOfferingId()));
applyProfileToNetwork(network, profile);
-
- network.setState(Network.State.Allocated);
- network.setRestartRequired(false);
+ try {
+ stateTransitTo(network, Event.OperationSucceeded);
+ } catch (NoTransitionException e) {
+ network.setState(Network.State.Allocated);
+ network.setRestartRequired(false);
+ }
_networksDao.update(network.getId(), network);
_networksDao.clearCheckForGc(networkId);
result = true;
} else {
- network.setState(Network.State.Implemented);
- _networksDao.update(network.getId(), network);
+ try {
+ stateTransitTo(network, Event.OperationFailed);
+ } catch (NoTransitionException e) {
+ network.setState(Network.State.Implemented);
+ _networksDao.update(network.getId(), network);
+ }
result = false;
}
txn.commit();