You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by mu...@apache.org on 2014/01/06 09:38:15 UTC

git commit: updated refs/heads/master to 3a6fcaf

Updated Branches:
  refs/heads/master 3ad0e8fb4 -> 3a6fcaf1f


CLOUDSTACK-5787:  support in-memroy eventbus

this checkin adds support for plug-in that provides an in memory event
bus which could be used as alternative to RabbitMQ based event bus. Both
publisher are subscriber should be running with management server to use
in-memroy event bus.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/3a6fcaf1
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/3a6fcaf1
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/3a6fcaf1

Branch: refs/heads/master
Commit: 3a6fcaf1fc4132a59b5260b1854fcbe7390d848c
Parents: 3ad0e8f
Author: Murali Reddy <mu...@gmail.com>
Authored: Mon Jan 6 13:21:49 2014 +0530
Committer: Murali Reddy <mu...@gmail.com>
Committed: Mon Jan 6 13:24:36 2014 +0530

----------------------------------------------------------------------
 client/pom.xml                                  |   5 +
 plugins/event-bus/inmemory/pom.xml              |  40 +++++
 .../mom/inmemory/InMemoryEventBus.java          | 163 +++++++++++++++++++
 plugins/pom.xml                                 |   1 +
 4 files changed, 209 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/3a6fcaf1/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 75b5504..33d3f1e 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -207,6 +207,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-mom-inmemory</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>mysql</groupId>
       <artifactId>mysql-connector-java</artifactId>
       <scope>runtime</scope>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/3a6fcaf1/plugins/event-bus/inmemory/pom.xml
----------------------------------------------------------------------
diff --git a/plugins/event-bus/inmemory/pom.xml b/plugins/event-bus/inmemory/pom.xml
new file mode 100644
index 0000000..1bde8b8
--- /dev/null
+++ b/plugins/event-bus/inmemory/pom.xml
@@ -0,0 +1,40 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>cloud-mom-inmemory</artifactId>
+  <name>Apache CloudStack Plugin - In Memory Event Bus</name>
+  <parent>
+    <groupId>org.apache.cloudstack</groupId>
+    <artifactId>cloudstack-plugins</artifactId>
+    <version>4.4.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+  <dependencies>
+    <dependency>
+    <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-framework-events</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <defaultGoal>install</defaultGoal>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/3a6fcaf1/plugins/event-bus/inmemory/src/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
----------------------------------------------------------------------
diff --git a/plugins/event-bus/inmemory/src/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java b/plugins/event-bus/inmemory/src/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
new file mode 100644
index 0000000..7c282d7
--- /dev/null
+++ b/plugins/event-bus/inmemory/src/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.mom.inmemory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.ejb.Local;
+import javax.naming.ConfigurationException;
+
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.events.Event;
+import org.apache.cloudstack.framework.events.EventBus;
+import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.cloudstack.framework.events.EventSubscriber;
+import org.apache.cloudstack.framework.events.EventTopic;
+
+import com.cloud.utils.component.ManagerBase;
+
+@Local(value = EventBus.class)
+public class InMemoryEventBus extends ManagerBase implements EventBus {
+
+    private String name;
+    private static final Logger s_logger = Logger.getLogger(InMemoryEventBus.class);
+    private static ConcurrentHashMap<UUID, Pair<EventTopic, EventSubscriber>> s_subscribers;
+
+    @Override
+    public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+        s_subscribers = new ConcurrentHashMap<UUID, Pair<EventTopic, EventSubscriber>>();
+        return true;
+    }
+
+    @Override
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
+        if (subscriber == null || topic == null) {
+            throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
+        }
+        UUID subscriberId = UUID.randomUUID();
+
+        s_subscribers.put(subscriberId, new Pair(topic, subscriber));
+        return subscriberId;
+    }
+
+    @Override
+    public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
+        if (s_subscribers != null && s_subscribers.isEmpty()) {
+            throw new EventBusException("There are no registered subscribers to unregister.");
+        }
+        if (s_subscribers.get(subscriberId) == null) {
+            throw new EventBusException("No subscriber found with subscriber id " + subscriberId);
+        }
+        s_subscribers.remove(subscriberId);
+    }
+
+    @Override
+    public void publish(Event event) throws EventBusException {
+        if (s_subscribers == null || s_subscribers.isEmpty()) {
+            return; // no subscriber to publish to, so just return
+        }
+
+        for (UUID subscriberId : s_subscribers.keySet()) {
+            Pair<EventTopic, EventSubscriber>  subscriberDetails =  s_subscribers.get(subscriberId);
+            // if the event matches subscribers interested event topic then call back the subscriber with the event
+            if (isEventMatchesTopic(event, subscriberDetails.first())) {
+                EventSubscriber subscriber =  subscriberDetails.second();
+                subscriber.onEvent(event);
+            }
+        }
+    }
+
+    @Override
+    public String getName() {
+        return _name;
+    }
+
+    @Override
+    public boolean start() {
+        return true;
+    }
+
+    @Override
+    public boolean stop() {
+        return true;
+    }
+
+    private String replaceNullWithWildcard(String key) {
+        if (key == null || key.isEmpty()) {
+            return "*";
+        } else {
+            return key;
+        }
+    }
+
+    private boolean isEventMatchesTopic(Event event, EventTopic topic) {
+
+        String eventTopicSource = replaceNullWithWildcard(topic.getEventSource());
+        eventTopicSource = eventTopicSource.replace(".", "-");
+        String eventSource = replaceNullWithWildcard(event.getEventSource());
+        eventSource = eventSource.replace(".", "-");
+        if (eventTopicSource != "*" && eventSource != "*" && !eventTopicSource.equalsIgnoreCase(eventSource)) {
+            return false;
+        }
+
+        String eventTopicCategory = replaceNullWithWildcard(topic.getEventCategory());
+        eventTopicCategory = eventTopicCategory.replace(".", "-");
+        String eventCategory = replaceNullWithWildcard(event.getEventCategory());
+        eventCategory = eventCategory.replace(".", "-");
+        if (eventTopicCategory != "*" && eventCategory != "*" && !eventTopicCategory.equalsIgnoreCase(eventCategory)) {
+            return false;
+        }
+
+        String eventTopicType = replaceNullWithWildcard(topic.getEventType());
+        eventTopicType = eventTopicType.replace(".", "-");
+        String eventType = replaceNullWithWildcard(event.getEventType());
+        eventType = eventType.replace(".", "-");
+        if (eventTopicType != "*" && eventType != "*" && !eventTopicType.equalsIgnoreCase(eventType)) {
+            return false;
+        }
+
+        String eventTopicResourceType = replaceNullWithWildcard(topic.getResourceType());
+        eventTopicResourceType = eventTopicResourceType.replace(".", "-");
+        String resourceType = replaceNullWithWildcard(event.getResourceType());
+        resourceType = resourceType.replace(".", "-");
+        if (eventTopicResourceType != "*" && resourceType != "*" && !eventTopicResourceType.equalsIgnoreCase(resourceType)) {
+            return false;
+        }
+
+        String resourceUuid = replaceNullWithWildcard(event.getResourceUUID());
+        resourceUuid = resourceUuid.replace(".", "-");
+        String eventTopicresourceUuid = replaceNullWithWildcard(topic.getResourceUUID());
+        eventTopicresourceUuid = eventTopicresourceUuid.replace(".", "-");
+        if (resourceUuid != "*" && eventTopicresourceUuid != "*" && !resourceUuid.equalsIgnoreCase(eventTopicresourceUuid)) {
+            return false;
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/3a6fcaf1/plugins/pom.xml
----------------------------------------------------------------------
diff --git a/plugins/pom.xml b/plugins/pom.xml
index 06cf79f..8ec6a71 100755
--- a/plugins/pom.xml
+++ b/plugins/pom.xml
@@ -39,6 +39,7 @@
     <module>hypervisors/xen</module>
     <module>hypervisors/kvm</module>
     <module>event-bus/rabbitmq</module>
+    <module>event-bus/inmemory</module>
     <module>hypervisors/baremetal</module>
     <module>hypervisors/ucs</module>
     <module>hypervisors/hyperv</module>