You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by mu...@apache.org on 2013/01/25 13:55:09 UTC

[47/50] git commit: -Added recoonect logic using shutdown listner in RabbitMQEventBus -changed EventBus interface method signtures to return/take UUID for subscriber management

-Added recoonect logic using shutdown listner in RabbitMQEventBus
-changed EventBus interface method signtures to return/take UUID for
subscriber management


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/2dd40c28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/2dd40c28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/2dd40c28

Branch: refs/heads/events-framework
Commit: 2dd40c2823bb908ef55be7adbfaa8df7638c30c4
Parents: 28738e4
Author: Murali Reddy <mu...@citrix.com>
Authored: Fri Jan 25 14:18:49 2013 +0530
Committer: Murali Reddy <mu...@citrix.com>
Committed: Fri Jan 25 14:18:49 2013 +0530

----------------------------------------------------------------------
 api/src/com/cloud/event/EventCategory.java         |    7 +-
 api/src/com/cloud/network/Network.java             |    2 +
 client/tomcatconf/components.xml.in                |    1 +
 .../apache/cloudstack/framework/events/Event.java  |   68 ++-
 .../cloudstack/framework/events/EventBus.java      |   19 +-
 .../framework/events/EventBusException.java        |   26 +
 .../framework/events/EventSubscriber.java          |    6 +-
 .../cloudstack/framework/events/EventTopic.java    |   22 +-
 .../cloudstack/framework/events/Subscribe.java     |    8 +-
 .../cloudstack/mom/rabbitmq/RabbitMQEventBus.java  |  521 +++++++++++++--
 server/src/com/cloud/alert/AlertManagerImpl.java   |    2 +-
 .../src/com/cloud/event/ActionEventCallback.java   |   24 +-
 server/src/com/cloud/event/AlertGenerator.java     |   43 +-
 .../src/com/cloud/event/UsageEventGenerator.java   |   12 +-
 .../src/com/cloud/network/NetworkManagerImpl.java  |  142 +---
 15 files changed, 670 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/api/src/com/cloud/event/EventCategory.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/event/EventCategory.java b/api/src/com/cloud/event/EventCategory.java
index a0042dd..cee6529 100644
--- a/api/src/com/cloud/event/EventCategory.java
+++ b/api/src/com/cloud/event/EventCategory.java
@@ -48,7 +48,8 @@ public class EventCategory {
         return null;
     }
 
-    public static final EventCategory ACTION_EVENT = new EventCategory("Action Events");
-    public static final EventCategory ALERT_EVENT  = new EventCategory("Alert Event");
-    public static final EventCategory USAGE_EVENT  = new EventCategory("Usage Event");
+    public static final EventCategory ACTION_EVENT = new EventCategory("ActionEvent");
+    public static final EventCategory ALERT_EVENT  = new EventCategory("AlertEvent");
+    public static final EventCategory USAGE_EVENT  = new EventCategory("UsageEvent");
+    public static final EventCategory RESOURCE_STATE_CHANGE_EVENT = new EventCategory("ResourceStateEvent");
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/api/src/com/cloud/network/Network.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/network/Network.java b/api/src/com/cloud/network/Network.java
index f70d898..b3c8381 100644
--- a/api/src/com/cloud/network/Network.java
+++ b/api/src/com/cloud/network/Network.java
@@ -202,6 +202,7 @@ public interface Network extends ControlledEntity, StateObject<Network.State>, I
     }
 
     public enum State {
+
         Allocated("Indicates the network configuration is in allocated but not setup"),
         Setup("Indicates the network configuration is setup"),
         Implementing("Indicates the network configuration is being implemented"),
@@ -210,6 +211,7 @@ public interface Network extends ControlledEntity, StateObject<Network.State>, I
         Destroy("Indicates that the network is destroyed");
 
         protected static final StateMachine2<State, Network.Event, Network> s_fsm = new StateMachine2<State, Network.Event, Network>();
+
         static {
             s_fsm.addTransition(State.Allocated, Event.ImplementNetwork, State.Implementing);
             s_fsm.addTransition(State.Implementing, Event.OperationSucceeded, State.Implemented);

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/client/tomcatconf/components.xml.in
----------------------------------------------------------------------
diff --git a/client/tomcatconf/components.xml.in b/client/tomcatconf/components.xml.in
index 8af6d9b..5cd1985 100755
--- a/client/tomcatconf/components.xml.in
+++ b/client/tomcatconf/components.xml.in
@@ -234,6 +234,7 @@ under the License.
             <param name="port">5672</param>
             <param name="username">guest</param>
             <param name="password">guest</param>
+            <param name="exchangename">cloudstack-evetns</param>
             </adapter>
         </adapters>
         <manager name="OvsTunnelManager" key="com.cloud.network.ovs.OvsTunnelManager" class="com.cloud.network.ovs.OvsTunnelManagerImpl"/>

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/Event.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/Event.java b/framework/events/src/org/apache/cloudstack/framework/events/Event.java
index f35bbef..eb6f48d 100644
--- a/framework/events/src/org/apache/cloudstack/framework/events/Event.java
+++ b/framework/events/src/org/apache/cloudstack/framework/events/Event.java
@@ -23,34 +23,44 @@ import com.google.gson.Gson;
 
 public class Event {
 
-    String category;
-    String type;
-    String routingKey;
-    String description;
-    String publisher;
-    String date;
+    String eventCategory;
+    String eventType;
+    String eventSource;
     String resourceType;
+    String resourceUUID;
+    String description;
+
+    public Event(String eventSource, String eventCategory, String eventType, String resourceType,
+                 String resourceUUID) {
+        this.eventCategory = eventCategory;
+        this.eventType = eventType;
+        this.eventSource = eventSource;
+        this.resourceType = resourceType;
+        this.resourceUUID = resourceUUID;
+    }
 
-    public Event(String category, String type, String routingKey) {
-        this.category = category;
-        this.type = type;
-        this.routingKey = routingKey;
+    public String getEventCategory() {
+        return eventCategory;
     }
 
-    public String getCategory() {
-        return category;
+    public void setEventCategory(String category) {
+        eventCategory = category;
     }
 
-    public String getType() {
-        return type;
+    public String getEventType() {
+        return eventType;
     }
 
-    public String getRoutingKey() {
-        return routingKey;
+    public void setEventType(String type) {
+        eventType = type;
     }
 
-    public void setRoutingKey(String routingKey) {
-        this.routingKey = routingKey;
+    public String getEventSource() {
+        return eventSource;
+    }
+
+    void setEventSource(String source) {
+        eventSource = source;
     }
 
     public String getDescription() {
@@ -62,19 +72,23 @@ public class Event {
         this.description = gson.toJson(message).toString();
     }
 
-    public String getEventPublisher() {
-        return publisher;
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public String getResourceType() {
+        return resourceType;
     }
 
-    void setEventPublisher(String source) {
-        this.publisher = source;
+    public void setResourceType(String resourceType) {
+        this.resourceType = resourceType;
     }
 
-    public String getDate() {
-        return date;
+    public void setResourceUUID(String uuid) {
+        this.resourceUUID = uuid;
     }
 
-    void setDate(String date) {
-        this.date = date;
+    public String getResourceUUID () {
+        return resourceUUID;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
index 3782b32..c16ee6f 100644
--- a/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
@@ -21,6 +21,8 @@ package org.apache.cloudstack.framework.events;
 
 import com.cloud.utils.component.Adapter;
 
+import java.util.UUID;
+
 /**
  * Interface to publish and subscribe to CloudStack events
  *
@@ -28,29 +30,26 @@ import com.cloud.utils.component.Adapter;
 public interface EventBus extends Adapter{
 
     /**
-     * publish an event
+     * publish an event on to the event bus
      *
-     * @param event event that needs to be published
-     * @return true if the event has been successfully published on event bus
+     * @param event event that needs to be published on the event bus
      */
-    boolean publish(Event event);
+    void publish(Event event) throws EventBusException;
 
     /**
-     * subscribe to events of a category and a type
+     * subscribe to events that matches specified event topics
      *
      * @param topic defines category and type of the events being subscribed to
      * @param subscriber subscriber that intends to receive event notification
-     * @return true if the subscriber has been successfully registered.
+     * @return UUID returns the subscription ID
      */
-    boolean subscribe(EventTopic topic, EventSubscriber subscriber);
+     UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException;
 
     /**
      * unsubscribe to events of a category and a type
      *
-     * @param topic defines category and type of the events to unsubscribe
      * @param subscriber subscriber that intends to unsubscribe from the event notification
-     * @return true if the subscriber has been successfully unsubscribed.
      */
-    boolean unsubscribe(EventTopic topic, EventSubscriber subscriber);
+    void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java b/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
new file mode 100644
index 0000000..5654ba0
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+public class EventBusException extends Exception{
+    public EventBusException (String msg) {
+      super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java b/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
index d69035a..b1c30c2 100644
--- a/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
@@ -24,9 +24,7 @@ public interface EventSubscriber {
     /**
      * Callback method. EventBus calls this method on occurrence of subscribed event
      *
-     * @param category category of the event being subscribed (e.g. action, usage, alert etc)
-     * @param type type of the event (e.g. vm stop, volume delete etc)
-     * @param description description of the event
+     * @param event details of the event
      */
-    void recieve(Event event);
+    void onEvent(Event event);
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java b/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
index eabcdf6..19b727d 100644
--- a/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
@@ -23,12 +23,16 @@ public class EventTopic {
 
     String eventCategory;
     String eventType;
-    String bindingKey;
+    String resourceType;
+    String resourceUUID;
+    String eventSource;
 
-    public EventTopic(String eventCategory, String eventType, String bindingKey) {
+    public EventTopic(String eventCategory, String eventType, String resourceType, String resourceUUID, String eventSource) {
         this.eventCategory = eventCategory;
         this.eventType = eventType;
-        this.bindingKey = bindingKey;
+        this.resourceType = resourceType;
+        this.resourceUUID = resourceUUID;
+        this.eventSource = eventSource;
     }
 
     public String getEventCategory() {
@@ -39,7 +43,15 @@ public class EventTopic {
         return eventType;
     }
 
-    public String getBindingKey() {
-        return bindingKey;
+    public String getResourceType() {
+        return resourceType;
+    }
+
+    public String getEventSource() {
+        return eventSource;
+    }
+
+    public String getResourceUUID() {
+        return resourceUUID;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java b/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java
index 00aa5e5..74997f6 100644
--- a/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java
+++ b/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java
@@ -19,15 +19,15 @@
 
 package org.apache.cloudstack.framework.events;
 
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
 
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
 @Target({METHOD })
 @Retention(RUNTIME)
-public @interface Subscribe {
+public @interface        Subscribe {
 	
     String eventCategory();
 

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
index 3c51cd7..9049fe8 100644
--- a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
+++ b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
@@ -1,126 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.cloudstack.mom.rabbitmq;
 
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.MessageProperties;
-import org.apache.cloudstack.framework.events.Event;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventSubscriber;
-import org.apache.cloudstack.framework.events.EventTopic;
+import com.rabbitmq.client.*;
+import org.apache.cloudstack.framework.events.*;
 import org.apache.log4j.Logger;
 
+import com.cloud.utils.Ternary;
+
 import javax.ejb.Local;
 import javax.naming.ConfigurationException;
+import java.io.IOException;
+import java.net.ConnectException;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 @Local(value=EventBus.class)
 public class RabbitMQEventBus implements EventBus {
 
+    // details of AMQP server
+    private static String _amqpHost;
+    private static Integer _port;
+    private static String _username;
+    private static String _password;
+
+    // AMQP exchange name where all CloudStack events will be published
+    private static String _amqpExchangeName;
+
+    // hashmap to book keep the registered subscribers
+    private static ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>> _subscribers;
+
+    // connection to AMQP server,
+    private static Connection _connection=null;
 
-    public static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
-    public Connection _connection = null;
-    public Channel _channel = null;
-    private String _rabbitMqHost;
-    private Integer _port;
-    private String _username;
-    private String _password;
+    // AMQP server should consider messages acknowledged once delivered if _autoAck is true
+    private static boolean _autoAck = true;
+
+    private ExecutorService executorService;
+    private String _name;
+    private static DisconnectHandler disconnectHandler;
+    private static Integer _retryInterval;
+    private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
 
     @Override
     public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
-        _rabbitMqHost = (String) params.get("server");
-        _port = Integer.parseInt((String) params.get("port"));
+
+        _amqpHost = (String) params.get("server");
+        if (_amqpHost == null || _amqpHost.isEmpty()) {
+            throw new ConfigurationException("Unable to get the AMQP server details");
+        }
+
         _username = (String) params.get("username");
+        if (_username == null || _username.isEmpty()) {
+            throw new ConfigurationException("Unable to get the username details");
+        }
+
         _password = (String) params.get("password");
+        if (_password == null || _password.isEmpty()) {
+            throw new ConfigurationException("Unable to get the password details");
+        }
+
+        _amqpExchangeName = (String) params.get("exchangename");
+        if (_amqpExchangeName == null || _amqpExchangeName.isEmpty()) {
+            throw new ConfigurationException("Unable to get the _exchange details on the AMQP server");
+        }
+
+        try {
+            String portStr =  (String) params.get("port");
+            if (portStr == null || portStr.isEmpty()) {
+                throw new ConfigurationException("Unable to get the port details of AMQP server");
+            }
+            _port = Integer.parseInt(portStr);
+
+            String retryIntervalStr = (String) params.get("retryinterval");
+            if (retryIntervalStr == null || retryIntervalStr.isEmpty()) {
+                // default to 10s to try out reconnect
+                retryIntervalStr = "10000";
+            }
+            _retryInterval = Integer.parseInt(retryIntervalStr);
+        } catch (NumberFormatException e) {
+            throw new ConfigurationException("Invalid port number/retry interval");
+        }
+
+        _subscribers = new ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>>();
+
+        executorService = Executors.newCachedThreadPool();
+        disconnectHandler = new DisconnectHandler();
+        _name = name;
         return true;
     }
 
+    /** Call to subscribe to interested set of events
+     *
+     * @param topic defines category and type of the events being subscribed to
+     * @param subscriber subscriber that intends to receive event notification
+     * @return UUID that represents the subscription with event bus
+     * @throws EventBusException
+     */
     @Override
-    public String getName() {
-        return null;
-    }
+    public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
 
-    @Override
-    public boolean start() {
-        return true;
+        if (subscriber == null || topic == null) {
+            throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
+        }
+
+        // create a UUID, that will be used for managing subscriptions and also used as queue name
+        // for on the queue used for the subscriber on the AMQP broker
+        UUID queueId = UUID.randomUUID();
+        String queueName = queueId.toString();
+
+        try {
+            String bindingKey = createBindingKey(topic);
+
+            // store the subscriber details before creating channel
+            _subscribers.put(queueName, new Ternary(bindingKey, null, subscriber));
+
+            // create a channel dedicated for this subscription
+            Connection connection = getConnection();
+            Channel channel = createChannel(connection);
+
+            // create a queue and bind it to the exchange with binding key formed from event topic
+            createExchange(channel, _amqpExchangeName);
+            channel.queueDeclare(queueName, false, false, false, null);
+            channel.queueBind(queueName, _amqpExchangeName, bindingKey);
+
+            // register a callback handler to receive the events that a subscriber subscribed to
+            channel.basicConsume(queueName, _autoAck, queueName,
+                    new DefaultConsumer(channel) {
+                        @Override
+                        public void handleDelivery(String queueName,
+                                                   Envelope envelope,
+                                                   AMQP.BasicProperties properties,
+                                                   byte[] body)
+                            throws IOException {
+                            Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+                            if (queueDetails != null) {
+                                EventSubscriber subscriber = queueDetails.third();
+                                String routingKey =  envelope.getRoutingKey();
+                                Event event = new Event(null, getEventCategoryFromRoutingKey(routingKey),
+                                        getEventTypeFromRoutingKey(routingKey), null, null);
+                                event.setDescription(body.toString());
+
+                                // deliver the event to call back object provided by subscriber
+                                subscriber.onEvent(event);
+                            }
+                        }
+                    }
+            );
+
+            // update the channel details for the subscription
+            Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+            queueDetails.second(channel);
+            _subscribers.put(queueName, queueDetails);
+
+        } catch (AlreadyClosedException closedException) {
+            s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName +
+                    " will be active after reconnection");
+        } catch (ConnectException connectException) {
+            s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName +
+                    " will be active after reconnection");
+        } catch (Exception e) {
+            throw new EventBusException("Failed to subscribe to event due to " + e.getMessage());
+        }
+
+        return queueId;
     }
 
     @Override
-    public boolean stop() {
-        return true;
+    public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
+        try {
+            String classname =  subscriber.getClass().getName();
+            String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString();
+            Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+            Channel channel = queueDetails.second();
+            channel.basicCancel(queueName);
+            _subscribers.remove(queueName, queueDetails);
+        } catch (Exception e) {
+            throw new EventBusException("Failed to unsubscribe from event bus due to " + e.getMessage());
+        }
     }
 
+    // publish event on to the exchange created on AMQP server
     @Override
-    public boolean publish(Event event) {
-        String exchangeName = getExchangeName(event.getCategory());
-        String routingKey = getRoutingKey(event.getType());
+    public void publish(Event event) throws EventBusException {
+
+        String routingKey = createRoutingKey(event);
         String eventDescription = event.getDescription();
 
         try {
-            createConnection();
-            createExchange(exchangeName);
-            publishEventToExchange(exchangeName, routingKey, eventDescription);
+            Connection connection = getConnection();
+            Channel channel = createChannel(connection);
+            createExchange(channel, _amqpExchangeName);
+            publishEventToExchange(channel, _amqpExchangeName, routingKey, eventDescription);
+            channel.close();
+        } catch (AlreadyClosedException e) {
+            closeConnection();
+            throw new EventBusException("Failed to publish event to message broker as connection to AMQP broker in lost");
         } catch (Exception e) {
-            s_logger.error("Failed to publish event to message broker due to " + e.getMessage());
-            return false;
+            throw new EventBusException("Failed to publish event to message broker due to " + e.getMessage());
         }
-        return true;
     }
 
-    @Override
-    public boolean subscribe(EventTopic topic, EventSubscriber subscriber) {
-        return true;
-    }
+    /** creates a routing key from the event details.
+     *  created routing key will be used while publishing the message to exchange on AMQP server
+     */
+    private String createRoutingKey(Event event) {
 
-    @Override
-    public boolean unsubscribe(EventTopic topic, EventSubscriber subscriber) {
-        return true;
+        StringBuilder routingKey = new StringBuilder();
+
+        String eventSource =  replaceNullWithWildcard(event.getEventSource());
+        eventSource = eventSource.replace(".", "-");
+
+        String eventCategory = replaceNullWithWildcard(event.getEventCategory());
+        eventCategory = eventCategory.replace(".", "-");
+
+        String eventType = replaceNullWithWildcard(event.getEventType());
+        eventType = eventType.replace(".", "-");
+
+        String resourceType = replaceNullWithWildcard(event.getResourceType());
+        resourceType = resourceType.replace(".", "-");
+
+        String resourceUuid = replaceNullWithWildcard(event.getResourceUUID());
+        resourceUuid = resourceUuid.replace(".", "-");
+
+        // routing key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid
+        routingKey.append(eventSource);
+        routingKey.append(".");
+        routingKey.append(eventCategory);
+        routingKey.append(".");
+        routingKey.append(eventType);
+        routingKey.append(".");
+        routingKey.append(resourceType);
+        routingKey.append(".");
+        routingKey.append(resourceUuid);
+
+        return routingKey.toString();
     }
 
-    private String getExchangeName(String eventCategory) {
-        return "CloudStack " + eventCategory;
+    /** creates a binding key from the event topic that subscriber specified
+     *  binding key will be used to bind the queue created for subscriber to exchange on AMQP server
+     */
+    private String createBindingKey(EventTopic topic) {
+
+        StringBuilder bindingKey = new StringBuilder();
+
+        String eventSource =  replaceNullWithWildcard(topic.getEventSource());
+        eventSource = eventSource.replace(".", "-");
+
+        String eventCategory = replaceNullWithWildcard(topic.getEventCategory());
+        eventCategory = eventCategory.replace(".", "-");
+
+        String eventType = replaceNullWithWildcard(topic.getEventType());
+        eventType = eventType.replace(".", "-");
+
+        String resourceType = replaceNullWithWildcard(topic.getResourceType());
+        resourceType = resourceType.replace(".", "-");
+
+        String resourceUuid = replaceNullWithWildcard(topic.getResourceUUID());
+        resourceUuid = resourceUuid.replace(".", "-");
+
+        // binding key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid
+        bindingKey.append(eventSource);
+        bindingKey.append(".");
+        bindingKey.append(eventCategory);
+        bindingKey.append(".");
+        bindingKey.append(eventType);
+        bindingKey.append(".");
+        bindingKey.append(resourceType);
+        bindingKey.append(".");
+        bindingKey.append(resourceUuid);
+
+        return bindingKey.toString();
     }
 
-    private String getRoutingKey(String eventType) {
-        return eventType;
+    private synchronized Connection getConnection() throws Exception {
+        if (_connection == null) {
+            try {
+                return createConnection();
+            } catch (Exception e) {
+                s_logger.error("Failed to create a connection to AMQP server due to " + e.getMessage());
+                throw e;
+            }
+        } else {
+            return _connection;
+        }
     }
 
-    private void createConnection() throws Exception {
+    private synchronized Connection createConnection() throws Exception {
         try {
-            // obtain a connection to RabbitMQ server
             ConnectionFactory factory = new ConnectionFactory();
             factory.setUsername(_username);
             factory.setPassword(_password);
             factory.setVirtualHost("/");
-            factory.setHost(_rabbitMqHost);
+            factory.setHost(_amqpHost);
             factory.setPort(_port);
-            _connection = factory.newConnection();
-            _channel = _connection.createChannel();
+            Connection connection = factory.newConnection();
+            connection.addShutdownListener(disconnectHandler);
+            _connection = connection;
+            return _connection;
         } catch (Exception e) {
-            s_logger.error("Failed to create a connection to RabbitMQ server due to " + e.getMessage());
             throw e;
         }
     }
 
-    private void createExchange(String exchangeName) throws Exception {
+    private synchronized void closeConnection() {
         try {
-            _channel.exchangeDeclare(exchangeName, "topic", true);
+            if (_connection != null) {
+                _connection.close();
+            }
+        } catch (Exception e) {
+            s_logger.warn("Failed to close connection to AMQP server due to " + e.getMessage());
+        }
+        _connection = null;
+    }
+
+    private synchronized void abortConnection () {
+        if (_connection == null)
+            return;
+
+        try {
+            _connection.abort();
+        } catch (Exception e) {
+            s_logger.warn("Failed to abort connection due to " + e.getMessage());
+        }
+        _connection = null;
+    }
+
+    private String replaceNullWithWildcard(String key) {
+        if (key == null || key.isEmpty()) {
+            return "*";
+        } else {
+            return key;
+        }
+    }
+
+    private Channel createChannel(Connection connection) throws Exception {
+        try {
+            return connection.createChannel();
+        } catch (java.io.IOException exception) {
+            s_logger.warn("Failed to create a channel due to " + exception.getMessage());
+            throw exception;
+        }
+    }
+
+    private void createExchange(Channel channel, String exchangeName) throws Exception {
+        try {
+            channel.exchangeDeclare(exchangeName, "topic", true);
         } catch (java.io.IOException exception) {
             s_logger.error("Failed to create exchange" + exchangeName + " on RabbitMQ server");
             throw exception;
         }
     }
 
-    private void publishEventToExchange(String exchangeName, String routingKey, String eventDescription) throws Exception {
+    private void publishEventToExchange(Channel channel, String exchangeName,
+                                        String routingKey, String eventDescription) throws Exception {
         try {
-            _channel.txSelect();
             byte[] messageBodyBytes = eventDescription.getBytes();
-            _channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
-            _channel.txCommit();
+            channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
         } catch (Exception e) {
             s_logger.error("Failed to publish event " + routingKey + " on exchange " + exchangeName +
                     "  of message broker due to " + e.getMessage());
             throw e;
         }
     }
+
+    private String getEventCategoryFromRoutingKey(String routingKey) {
+        String[] keyParts =  routingKey.split("\\.");
+        return keyParts[1];
+    }
+
+    private String getEventTypeFromRoutingKey(String routingKey) {
+        String[] keyParts =  routingKey.split("\\.");
+        return keyParts[2];
+    }
+
+    private String getEventSourceFromRoutingKey(String routingKey) {
+        String[] keyParts =  routingKey.split("\\.");
+        return keyParts[0];
+    }
+
+    @Override
+    public String getName() {
+        return _name;
+    }
+
+    @Override
+    public boolean start() {
+        ReconnectionTask reconnect = new ReconnectionTask(); // initiate connection to AMQP server
+        executorService.submit(reconnect);
+        return true;
+    }
+
+    @Override
+    public boolean stop() {
+
+        if (_connection.isOpen()) {
+            for (String subscriberId : _subscribers.keySet()) {
+                Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+                Channel channel =  subscriberDetails.second();
+                String queueName = subscriberId;
+                try {
+                    channel.queueDelete(queueName);
+                    channel.abort();
+                } catch (IOException ioe) {
+                    s_logger.warn("Failed to delete queue: " + queueName + " on AMQP server due to " + ioe.getMessage() );
+                }
+            }
+        }
+
+        closeConnection();
+        return true;
+    }
+
+    // logic to deal with loss of connection to AMQP server
+    private class DisconnectHandler implements ShutdownListener {
+
+        @Override
+        public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
+            if (!shutdownSignalException.isInitiatedByApplication()) {
+
+                for (String subscriberId : _subscribers.keySet()) {
+                    Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+                    subscriberDetails.second(null);
+                    _subscribers.put(subscriberId, subscriberDetails);
+                }
+
+                abortConnection(); // disconnected to AMQP server, so abort the connection and channels
+                s_logger.warn("Connection has been shutdown by AMQP server. Attempting to reconnect.");
+
+                // initiate re-connect process
+                ReconnectionTask reconnect = new ReconnectionTask();
+                executorService.submit(reconnect);
+            }
+        }
+    }
+
+    // retry logic to connect back to AMQP server after loss of connection
+    private class ReconnectionTask implements Runnable {
+
+        boolean connected = false;
+        Connection connection = null;
+
+        public void run() {
+
+            while (!connected) {
+                try {
+                    Thread.sleep(_retryInterval);
+                } catch (InterruptedException ie) {
+                    // ignore timer interrupts
+                }
+
+                try {
+                    try {
+                        connection = createConnection();
+                        connected = true;
+                    } catch (IOException ie) {
+                        continue; // can't establish connection to AMQP server yet, so continue
+                    }
+
+                    // prepare consumer on AMQP server for each of subscriber
+                    for (String subscriberId : _subscribers.keySet()) {
+                        Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+                        String bindingKey = subscriberDetails.first();
+                        EventSubscriber subscriber = subscriberDetails.third();
+
+                        /** create a queue with subscriber ID as queue name and bind it to the exchange
+                         *  with binding key formed from event topic
+                         */
+                        Channel channel = createChannel(connection);
+                        createExchange(channel, _amqpExchangeName);
+                        channel.queueDeclare(subscriberId, false, false, false, null);
+                        channel.queueBind(subscriberId, _amqpExchangeName, bindingKey);
+
+                        // register a callback handler to receive the events that a subscriber subscribed to
+                        channel.basicConsume(subscriberId, _autoAck, subscriberId,
+                                new DefaultConsumer(channel) {
+                                    @Override
+                                    public void handleDelivery(String queueName,
+                                                               Envelope envelope,
+                                                               AMQP.BasicProperties properties,
+                                                               byte[] body)
+                                            throws IOException {
+
+                                        Ternary<String, Channel, EventSubscriber> subscriberDetails
+                                                = _subscribers.get(queueName); // queue name == subscriber ID
+
+                                        if (subscriberDetails != null) {
+                                            EventSubscriber subscriber = subscriberDetails.third();
+                                            String routingKey =  envelope.getRoutingKey();
+                                            Event event = new Event(null, getEventCategoryFromRoutingKey(routingKey),
+                                                    getEventTypeFromRoutingKey(routingKey), null, null);
+                                            event.setDescription(body.toString());
+
+                                            // deliver the event to call back object provided by subscriber
+                                            subscriber.onEvent(event);
+                                        }
+                                    }
+                                }
+                        );
+
+                        // update the channel details for the subscription
+                        subscriberDetails.second(channel);
+                        _subscribers.put(subscriberId, subscriberDetails);
+                    }
+                } catch (Exception e) {
+                    s_logger.warn("Failed to recreate queues and binding for the subscribers due to " + e.getMessage());
+                }
+            }
+            return;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/server/src/com/cloud/alert/AlertManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/alert/AlertManagerImpl.java b/server/src/com/cloud/alert/AlertManagerImpl.java
index 8eda011..1a93f97 100755
--- a/server/src/com/cloud/alert/AlertManagerImpl.java
+++ b/server/src/com/cloud/alert/AlertManagerImpl.java
@@ -263,7 +263,7 @@ public class AlertManagerImpl implements AlertManager {
     public void sendAlert(short alertType, long dataCenterId, Long podId, String subject, String body) {
 
         // publish alert
-        AlertGenerator.publishAlert(getAlertType(alertType), dataCenterId, podId, subject, body);
+        AlertGenerator.publishAlertOnEventBus(getAlertType(alertType), dataCenterId, podId, subject, body);
 
         // TODO:  queue up these messages and send them as one set of issues once a certain number of issues is reached?  If that's the case,
         //         shouldn't we have a type/severity as part of the API so that severe errors get sent right away?

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/server/src/com/cloud/event/ActionEventCallback.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/event/ActionEventCallback.java b/server/src/com/cloud/event/ActionEventCallback.java
index 93c2fdc..eeec2b4 100644
--- a/server/src/com/cloud/event/ActionEventCallback.java
+++ b/server/src/com/cloud/event/ActionEventCallback.java
@@ -25,6 +25,8 @@ import net.sf.cglib.proxy.MethodInterceptor;
 import net.sf.cglib.proxy.MethodProxy;
 import org.apache.cloudstack.framework.events.Event;
 import org.apache.cloudstack.framework.events.EventBus;
+import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.log4j.Logger;
 
 import java.lang.reflect.AnnotatedElement;
 import java.lang.reflect.Method;
@@ -36,6 +38,7 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce
 
     protected static EventBus _eventBus = null;
     protected static boolean _eventBusLoaded = false;
+    private static final Logger s_logger = Logger.getLogger(ActionEventCallback.class);
 
     @Override
     public Object intercept(Object object, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
@@ -144,21 +147,34 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce
         return this;
     }
 
-    void publishOnEventBus(long userId, long accountId, String type, com.cloud.event.Event.State state, String description) {
+    void publishOnEventBus(long userId, long accountId, String eventType, com.cloud.event.Event.State state, String description) {
         if (getEventBus() != null) {
             Map<String, String> eventDescription = new HashMap<String, String>();
             eventDescription.put("user", String.valueOf(userId));
             eventDescription.put("account", String.valueOf(accountId));
             eventDescription.put("state", state.toString());
             eventDescription.put("description", description);
-            Event event = new Event(EventCategory.ACTION_EVENT.getName(), type, type);
+
+            int index = eventType.lastIndexOf("\\.");
+
+            String resourceType = null;
+            if (index != -1 ) {
+                resourceType = eventType.substring(0, index);
+            }
+
+            Event event = new Event(null, EventCategory.ACTION_EVENT.getName(), eventType,
+                    resourceType, null);
             event.setDescription(eventDescription);
-            _eventBus.publish(event);
+
+            try {
+                _eventBus.publish(event);
+            } catch (EventBusException e) {
+                s_logger.warn("Failed to publish action event on the the event bus.");
+            }
         }
     }
 
     private EventBus getEventBus() {
-        //TODO: check if there is way of getting single adapter
         if (_eventBus == null) {
             if (!_eventBusLoaded) {
                 ComponentLocator locator = ComponentLocator.getLocator("management-server");

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/server/src/com/cloud/event/AlertGenerator.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/event/AlertGenerator.java b/server/src/com/cloud/event/AlertGenerator.java
index 16ef73e..5421aec 100644
--- a/server/src/com/cloud/event/AlertGenerator.java
+++ b/server/src/com/cloud/event/AlertGenerator.java
@@ -1,9 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
 package com.cloud.event;
 
 import com.cloud.utils.component.Adapters;
 import com.cloud.utils.component.ComponentLocator;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.Event;
+import org.apache.cloudstack.framework.events.*;
+import org.apache.log4j.Logger;
 
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -11,13 +28,11 @@ import java.util.Map;
 
 public class AlertGenerator {
 
+    private static final Logger s_logger = Logger.getLogger(AlertGenerator.class);
     protected static EventBus _eventBus = null;
     protected static boolean _eventBusLoaded = false;
 
-    public static void publishAlert(String alertType, long dataCenterId, Long podId, String subject, String body) {
-    }
-
-    void publishOnEventBus(String alertType, long dataCenterId, Long podId, String subject, String body) {
+    public static void publishAlertOnEventBus(String alertType, long dataCenterId, Long podId, String subject, String body) {
         if (getEventBus() != null) {
             Map<String, String> eventDescription = new HashMap<String, String>();
             eventDescription.put("alertType", alertType);
@@ -25,14 +40,22 @@ public class AlertGenerator {
             eventDescription.put("podId", Long.toString(podId));
             eventDescription.put("subject", subject);
             eventDescription.put("body", body);
-            Event event = new Event(EventCategory.ALERT_EVENT.getName(), alertType, alertType);
+            org.apache.cloudstack.framework.events.Event event =
+                    new org.apache.cloudstack.framework.events.Event(null,
+                            EventCategory.ALERT_EVENT.getName(),
+                            alertType,
+                            null,
+                            null);
             event.setDescription(eventDescription);
-            _eventBus.publish(event);
+            try {
+                _eventBus.publish(event);
+            } catch (EventBusException e) {
+                s_logger.warn("Failed to publish alert on the the event bus.");
+            }
         }
     }
 
-    private EventBus getEventBus() {
-        //TODO: check if there is way of getting single adapter
+    private static EventBus getEventBus() {
         if (_eventBus == null) {
             if (!_eventBusLoaded) {
                 ComponentLocator locator = ComponentLocator.getLocator("management-server");

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/server/src/com/cloud/event/UsageEventGenerator.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/event/UsageEventGenerator.java b/server/src/com/cloud/event/UsageEventGenerator.java
index edf6a42..80353e5 100644
--- a/server/src/com/cloud/event/UsageEventGenerator.java
+++ b/server/src/com/cloud/event/UsageEventGenerator.java
@@ -4,6 +4,8 @@ import com.cloud.utils.component.Adapters;
 import com.cloud.utils.component.ComponentLocator;
 import org.apache.cloudstack.framework.events.EventBus;
 import org.apache.cloudstack.framework.events.Event;
+import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.log4j.Logger;
 
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -11,6 +13,7 @@ import java.util.Map;
 
 public class UsageEventGenerator {
 
+    private static final Logger s_logger = Logger.getLogger(UsageEventGenerator.class);
     protected static EventBus _eventBus = null;
     protected static boolean _eventBusLoaded = false;
 
@@ -54,14 +57,17 @@ public class UsageEventGenerator {
             }
             eventDescription.put("resourceName", resourceName);
             eventDescription.put("resourceType", resourceType);
-            Event event = new Event(EventCategory.USAGE_EVENT.getName(), usageType, usageType);
+            Event event = new Event(null, EventCategory.USAGE_EVENT.getName(), usageType, null, null);
             event.setDescription(eventDescription);
-            _eventBus.publish(event);
+            try {
+                _eventBus.publish(event);
+            } catch (EventBusException e) {
+                s_logger.warn("Failed to publish usage event on the the event bus.");
+            }
         }
     }
 
     private static EventBus getEventBus() {
-        //TODO: check if there is way of getting single adapter
         if (_eventBus == null) {
             if (!_eventBusLoaded) {
                 ComponentLocator locator = ComponentLocator.getLocator("management-server");

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/2dd40c28/server/src/com/cloud/network/NetworkManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/network/NetworkManagerImpl.java b/server/src/com/cloud/network/NetworkManagerImpl.java
index 1775f88..04f4488 100755
--- a/server/src/com/cloud/network/NetworkManagerImpl.java
+++ b/server/src/com/cloud/network/NetworkManagerImpl.java
@@ -16,39 +16,9 @@
 // under the License.
 package com.cloud.network;
 
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import javax.ejb.Local;
-import javax.naming.ConfigurationException;
-
-import org.apache.cloudstack.acl.ControlledEntity.ACLType;
-import org.apache.cloudstack.acl.SecurityChecker.AccessType;
-import org.apache.log4j.Logger;
-
 import com.cloud.agent.AgentManager;
 import com.cloud.agent.Listener;
-import com.cloud.agent.api.AgentControlAnswer;
-import com.cloud.agent.api.AgentControlCommand;
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.CheckNetworkAnswer;
-import com.cloud.agent.api.CheckNetworkCommand;
-import com.cloud.agent.api.Command;
-import com.cloud.agent.api.StartupCommand;
-import com.cloud.agent.api.StartupRoutingCommand;
+import com.cloud.agent.api.*;
 import com.cloud.agent.api.to.NicTO;
 import com.cloud.alert.AlertManager;
 import com.cloud.api.ApiDBUtils;
@@ -56,15 +26,9 @@ import com.cloud.configuration.Config;
 import com.cloud.configuration.ConfigurationManager;
 import com.cloud.configuration.Resource.ResourceType;
 import com.cloud.configuration.dao.ConfigurationDao;
-import com.cloud.dc.AccountVlanMapVO;
-import com.cloud.dc.DataCenter;
+import com.cloud.dc.*;
 import com.cloud.dc.DataCenter.NetworkType;
-import com.cloud.dc.DataCenterVO;
-import com.cloud.dc.Pod;
-import com.cloud.dc.PodVlanMapVO;
-import com.cloud.dc.Vlan;
 import com.cloud.dc.Vlan.VlanType;
-import com.cloud.dc.VlanVO;
 import com.cloud.dc.dao.AccountVlanMapDao;
 import com.cloud.dc.dao.DataCenterDao;
 import com.cloud.dc.dao.PodVlanMapDao;
@@ -77,64 +41,28 @@ import com.cloud.domain.dao.DomainDao;
 import com.cloud.event.EventTypes;
 import com.cloud.event.UsageEventGenerator;
 import com.cloud.event.dao.UsageEventDao;
-import com.cloud.exception.AccountLimitException;
-import com.cloud.exception.ConcurrentOperationException;
-import com.cloud.exception.ConnectionException;
-import com.cloud.exception.InsufficientAddressCapacityException;
-import com.cloud.exception.InsufficientCapacityException;
-import com.cloud.exception.InsufficientVirtualNetworkCapcityException;
-import com.cloud.exception.InvalidParameterValueException;
-import com.cloud.exception.PermissionDeniedException;
-import com.cloud.exception.ResourceAllocationException;
-import com.cloud.exception.ResourceUnavailableException;
-import com.cloud.exception.UnsupportedServiceException;
+import com.cloud.exception.*;
 import com.cloud.host.Host;
 import com.cloud.host.HostVO;
 import com.cloud.host.Status;
 import com.cloud.host.dao.HostDao;
 import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.network.IpAddress.State;
-import com.cloud.network.Network.Capability;
-import com.cloud.network.Network.GuestType;
-import com.cloud.network.Network.Provider;
-import com.cloud.network.Network.Service;
+import com.cloud.network.Network.*;
 import com.cloud.network.Networks.AddressFormat;
 import com.cloud.network.Networks.BroadcastDomainType;
 import com.cloud.network.Networks.IsolationType;
 import com.cloud.network.Networks.TrafficType;
 import com.cloud.network.addr.PublicIp;
-import com.cloud.network.Network.Event;
 import com.cloud.network.dao.*;
 import com.cloud.network.element.*;
-import com.cloud.network.dao.FirewallRulesDao;
-import com.cloud.network.dao.IPAddressDao;
-import com.cloud.network.dao.LoadBalancerDao;
-import com.cloud.network.dao.NetworkDao;
-import com.cloud.network.dao.NetworkServiceMapDao;
-import com.cloud.network.dao.PhysicalNetworkDao;
-import com.cloud.network.dao.PhysicalNetworkServiceProviderDao;
-import com.cloud.network.dao.PhysicalNetworkTrafficTypeDao;
-import com.cloud.network.dao.PhysicalNetworkTrafficTypeVO;
-import com.cloud.network.element.DhcpServiceProvider;
-import com.cloud.network.element.IpDeployer;
-import com.cloud.network.element.LoadBalancingServiceProvider;
-import com.cloud.network.element.NetworkElement;
-import com.cloud.network.element.StaticNatServiceProvider;
-import com.cloud.network.element.UserDataServiceProvider;
 import com.cloud.network.guru.NetworkGuru;
 import com.cloud.network.lb.LoadBalancingRule;
 import com.cloud.network.lb.LoadBalancingRule.LbDestination;
 import com.cloud.network.lb.LoadBalancingRule.LbStickinessPolicy;
 import com.cloud.network.lb.LoadBalancingRulesManager;
-import com.cloud.network.rules.FirewallManager;
-import com.cloud.network.rules.FirewallRule;
+import com.cloud.network.rules.*;
 import com.cloud.network.rules.FirewallRule.Purpose;
-import com.cloud.network.rules.FirewallRuleVO;
-import com.cloud.network.rules.PortForwardingRuleVO;
-import com.cloud.network.rules.RulesManager;
-import com.cloud.network.rules.StaticNat;
-import com.cloud.network.rules.StaticNatRule;
-import com.cloud.network.rules.StaticNatRuleImpl;
 import com.cloud.network.rules.dao.PortForwardingRulesDao;
 import com.cloud.network.vpc.NetworkACLManager;
 import com.cloud.network.vpc.VpcManager;
@@ -147,11 +75,7 @@ import com.cloud.offerings.NetworkOfferingVO;
 import com.cloud.offerings.dao.NetworkOfferingDao;
 import com.cloud.offerings.dao.NetworkOfferingServiceMapDao;
 import com.cloud.org.Grouping;
-import com.cloud.user.Account;
-import com.cloud.user.AccountManager;
-import com.cloud.user.ResourceLimitService;
-import com.cloud.user.User;
-import com.cloud.user.UserContext;
+import com.cloud.user.*;
 import com.cloud.user.dao.AccountDao;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.Pair;
@@ -159,32 +83,30 @@ import com.cloud.utils.component.Adapters;
 import com.cloud.utils.component.Inject;
 import com.cloud.utils.component.Manager;
 import com.cloud.utils.concurrency.NamedThreadFactory;
-import com.cloud.utils.db.DB;
-import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.*;
 import com.cloud.utils.db.JoinBuilder.JoinType;
-import com.cloud.utils.db.SearchBuilder;
-import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.SearchCriteria.Op;
-import com.cloud.utils.db.Transaction;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.fsm.NoTransitionException;
 import com.cloud.utils.fsm.StateMachine2;
 import com.cloud.utils.net.Ip;
 import com.cloud.utils.net.NetUtils;
-import com.cloud.vm.Nic;
-import com.cloud.vm.NicProfile;
-import com.cloud.vm.NicVO;
-import com.cloud.vm.ReservationContext;
-import com.cloud.vm.ReservationContextImpl;
-import com.cloud.vm.UserVmVO;
-import com.cloud.vm.VMInstanceVO;
-import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.*;
 import com.cloud.vm.VirtualMachine.Type;
-import com.cloud.vm.VirtualMachineProfile;
-import com.cloud.vm.VirtualMachineProfileImpl;
 import com.cloud.vm.dao.NicDao;
 import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.dao.VMInstanceDao;
+import org.apache.cloudstack.acl.ControlledEntity.ACLType;
+import org.apache.cloudstack.acl.SecurityChecker.AccessType;
+import org.apache.log4j.Logger;
+
+import javax.ejb.Local;
+import javax.naming.ConfigurationException;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * NetworkManagerImpl implements NetworkManager.
@@ -2059,9 +1981,12 @@ public class NetworkManagerImpl implements NetworkManager, Manager, Listener {
             s_logger.debug("Network is not implemented: " + network);
             return false;
         }
-
-        network.setState(Network.State.Shutdown);
-        _networksDao.update(network.getId(), network);
+        try {
+            stateTransitTo(network, Event.DestroyNetwork);
+        } catch (NoTransitionException e) {
+            network.setState(Network.State.Shutdown);
+            _networksDao.update(network.getId(), network);
+        }
 
         boolean success = shutdownNetworkElementsAndResources(context, cleanupElements, network);
 
@@ -2076,15 +2001,22 @@ public class NetworkManagerImpl implements NetworkManager, Manager, Listener {
             guru.shutdown(profile, _networkOfferingDao.findById(network.getNetworkOfferingId()));
 
             applyProfileToNetwork(network, profile);
-
-            network.setState(Network.State.Allocated);
-            network.setRestartRequired(false);
+            try {
+                stateTransitTo(network, Event.OperationSucceeded);
+            } catch (NoTransitionException e) {
+                network.setState(Network.State.Allocated);
+                network.setRestartRequired(false);
+            }
             _networksDao.update(network.getId(), network);
             _networksDao.clearCheckForGc(networkId);
             result = true;
         } else {
-            network.setState(Network.State.Implemented);
-            _networksDao.update(network.getId(), network);
+            try {
+                stateTransitTo(network, Event.OperationFailed);
+            } catch (NoTransitionException e) {
+                network.setState(Network.State.Implemented);
+                _networksDao.update(network.getId(), network);
+            }
             result = false;
         }
         txn.commit();