You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/06 14:08:34 UTC

[1/2] git commit: Implemented tenant message receiver, queue, deletagor and processor chain

Updated Branches:
  refs/heads/master 4c0241dec -> 257f08aae


Implemented tenant message receiver, queue, deletagor and processor chain


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/82829f74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/82829f74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/82829f74

Branch: refs/heads/master
Commit: 82829f74ceb598dde48b41958e03c1fce9ebb8ac
Parents: e2261ac
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Dec 6 18:38:11 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Dec 6 18:38:11 2013 +0530

----------------------------------------------------------------------
 .../event/topology/CompleteTopologyEvent.java   |  4 +-
 .../tenant/CompleteTenantMessageProcessor.java  | 78 ++++++++++++++++
 .../tenant/TenantCreatedMessageProcessor.java   |  9 +-
 .../tenant/TenantMessageProcessorChain.java     | 74 +++++++++++++++
 .../tenant/TenantRemovedMessageProcessor.java   |  9 +-
 .../tenant/TenantUpdatedMessageProcessor.java   |  9 +-
 .../tenant/TenantEventMessageDelegator.java     | 97 ++++++++++++++++++++
 .../tenant/TenantEventMessageQueue.java         | 44 +++++++++
 .../tenant/TenantEventMessageReceiver.java      | 55 +++++++++++
 .../message/receiver/tenant/TenantManager.java  |  9 ++
 .../topology/TopologyEventMessageDelegator.java |  6 +-
 .../topology/TopologyEventMessageQueue.java     | 45 +++++++++
 .../topology/TopologyEventMessageReceiver.java  |  4 +-
 .../receiver/topology/TopologyEventQueue.java   | 45 ---------
 14 files changed, 431 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/CompleteTopologyEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/CompleteTopologyEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/CompleteTopologyEvent.java
index 1211550..68b5d73 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/CompleteTopologyEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/CompleteTopologyEvent.java
@@ -24,7 +24,9 @@ import org.apache.stratos.messaging.domain.topology.Topology;
 import java.io.Serializable;
 
 /**
- *
+ *  This event is fired periodically with the complete topology. It would be a
+ *  starting point for subscribers to initialize the current state of the topology
+ *  before receiving other topology events.
  */
 public class CompleteTopologyEvent extends TopologyEvent implements Serializable {
     private static final long serialVersionUID = 8580862188444892004L;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/CompleteTenantMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/CompleteTenantMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/CompleteTenantMessageProcessor.java
new file mode 100644
index 0000000..caf72f4
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/CompleteTenantMessageProcessor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.tenant.TenantCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * Complete tenant message processor for initializing the tenant manager and
+ * triggering complete tenant event listeners when the complete tenant event
+ * message is received.
+ */
+public class CompleteTenantMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(CompleteTenantMessageProcessor.class);
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (TenantCreatedEvent.class.getName().equals(type)) {
+            // Return if tenant manager has already initialized
+            if(TenantManager.getInstance().isInitialized()) {
+                return false;
+            }
+
+            // Parse complete message and build event
+            TenantCreatedEvent event = (TenantCreatedEvent) Util.jsonToObject(message, TenantCreatedEvent.class);
+
+            try {
+                TenantManager.acquireWriteLock();
+                TenantManager.getInstance().addTenant(event.getTenant());
+                if(log.isInfoEnabled()) {
+                    log.info("Tenant initialized");
+                }
+                TenantManager.getInstance().setInitialized(true);
+                return true;
+            }
+            finally {
+                TenantManager.releaseWriteLock();
+            }
+        }
+        else {
+            if(nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            }
+            else {
+                throw new RuntimeException(String.format("Failed to process tenant message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java
index ebd8b5d..728b484 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java
@@ -27,8 +27,8 @@ import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
 import org.apache.stratos.messaging.util.Util;
 
 /**
- * Tenant created message processor for triggering tenant created event
- * listener when a tenant created event message is received.
+ * Tenant created message processor for adding tenant to the tenant manager and
+ * triggering tenant created event listeners when a tenant created event message is received.
  */
 public class TenantCreatedMessageProcessor extends MessageProcessor {
 
@@ -44,6 +44,11 @@ public class TenantCreatedMessageProcessor extends MessageProcessor {
     @Override
     public boolean process(String type, String message, Object object) {
         if (TenantCreatedEvent.class.getName().equals(type)) {
+            // Return if tenant manager has not initialized
+            if(!TenantManager.getInstance().isInitialized()) {
+                return false;
+            }
+
             // Parse complete message and build event
             TenantCreatedEvent event = (TenantCreatedEvent) Util.jsonToObject(message, TenantCreatedEvent.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java
new file mode 100644
index 0000000..57d1b8c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.tenant.TenantCreatedEventListener;
+import org.apache.stratos.messaging.listener.tenant.TenantRemovedEventListener;
+import org.apache.stratos.messaging.listener.tenant.TenantUpdatedEventListener;
+import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Defines default tenant message processor chain.
+ */
+public class TenantMessageProcessorChain extends MessageProcessorChain {
+    private static final Log log = LogFactory.getLog(TenantMessageProcessorChain.class);
+
+    private CompleteTenantMessageProcessor completeTopologyMessageProcessor;
+    private TenantCreatedMessageProcessor tenantCreatedMessageProcessor;
+    private TenantUpdatedMessageProcessor tenantUpdatedMessageProcessor;
+    private TenantRemovedMessageProcessor tenantRemovedMessageProcessor;
+
+    public void initialize() {
+        // Add topology event processors
+        completeTopologyMessageProcessor = new CompleteTenantMessageProcessor();
+        add(completeTopologyMessageProcessor);
+
+        tenantCreatedMessageProcessor = new TenantCreatedMessageProcessor();
+        add(tenantCreatedMessageProcessor);
+
+        tenantUpdatedMessageProcessor = new TenantUpdatedMessageProcessor();
+        add(tenantUpdatedMessageProcessor);
+
+        tenantRemovedMessageProcessor = new TenantRemovedMessageProcessor();
+        add(tenantRemovedMessageProcessor);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Tenant message processor chain initialized");
+        }
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        if (eventListener instanceof CompleteTopologyEventListener) {
+            completeTopologyMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof TenantCreatedEventListener) {
+            tenantCreatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof TenantUpdatedEventListener) {
+            tenantUpdatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof TenantRemovedEventListener) {
+            tenantRemovedMessageProcessor.addEventListener(eventListener);
+        } else {
+            throw new RuntimeException("Unknown event listener");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java
index 2ad5cb8..cfbab03 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java
@@ -28,8 +28,8 @@ import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
 import org.apache.stratos.messaging.util.Util;
 
 /**
- * Tenant removed message processor for triggering tenant removed event
- * listener when a tenant removed event message is received.
+ * Tenant removed message processor for removing a given tenant from tenant manager
+ * and triggering tenant removed event listeners when a tenant removed event message is received.
  */
 public class TenantRemovedMessageProcessor extends MessageProcessor {
 
@@ -45,6 +45,11 @@ public class TenantRemovedMessageProcessor extends MessageProcessor {
     @Override
     public boolean process(String type, String message, Object object) {
         if (TenantRemovedEvent.class.getName().equals(type)) {
+            // Return if tenant manager has not initialized
+            if(!TenantManager.getInstance().isInitialized()) {
+                return false;
+            }
+
             // Parse complete message and build event
             TenantRemovedEvent event = (TenantRemovedEvent) Util.jsonToObject(message, TenantRemovedEvent.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java
index e89ef36..22aca45 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java
@@ -28,8 +28,8 @@ import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
 import org.apache.stratos.messaging.util.Util;
 
 /**
- * Tenant updated message processor for triggering tenant updated event
- * listener when a tenant updated event message is received.
+ * Tenant updated message processor for updating a given tenant in tenant manager and
+ * triggering tenant updated event listeners when a tenant updated event message is received.
  */
 public class TenantUpdatedMessageProcessor extends MessageProcessor {
 
@@ -45,6 +45,11 @@ public class TenantUpdatedMessageProcessor extends MessageProcessor {
     @Override
     public boolean process(String type, String message, Object object) {
         if (TenantUpdatedEvent.class.getName().equals(type)) {
+            // Return if tenant manager has not initialized
+            if(!TenantManager.getInstance().isInitialized()) {
+                return false;
+            }
+
             // Parse complete message and build event
             TenantUpdatedEvent event = (TenantUpdatedEvent) Util.jsonToObject(message, TenantUpdatedEvent.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
new file mode 100644
index 0000000..e93c886
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.tenant.TenantMessageProcessorChain;
+import org.apache.stratos.messaging.util.Constants;
+
+import javax.jms.TextMessage;
+
+
+/**
+ * Implements logic for processing topology event messages based on a given
+ * topology process chain.
+ */
+public class TenantEventMessageDelegator implements Runnable {
+
+    private static final Log log = LogFactory.getLog(TenantEventMessageDelegator.class);
+    private MessageProcessorChain processorChain;
+    private boolean terminated;
+
+    public TenantEventMessageDelegator() {
+        this.processorChain = new TenantMessageProcessorChain();
+    }
+
+    public TenantEventMessageDelegator(MessageProcessorChain processorChain) {
+        this.processorChain = processorChain;
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (log.isInfoEnabled()) {
+                log.info("Tenant event message delegator started");
+            }
+
+            while (!terminated) {
+                try {
+                    TextMessage message = TenantEventMessageQueue.getInstance().take();
+
+                    // Retrieve the header
+                    String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+                    // Retrieve the actual message
+                    String json = message.getText();
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Tenant event message received from queue: %s", type));
+                    }
+
+                    try {
+                        TenantManager.acquireWriteLock();
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Delegating tenant event message: %s", type));
+                        }
+                        processorChain.process(type, json, null);
+                    } finally {
+                        TenantManager.releaseWriteLock();
+                    }
+
+                } catch (Exception e) {
+                    log.error("Failed to retrieve tenant event message", e);
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Tenant event message delegator failed", e);
+            }
+        }
+    }
+
+    /**
+     * Terminate topology event message delegator thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java
new file mode 100644
index 0000000..ca2f7a6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.tenant;
+
+import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implements a blocking queue for managing tenant event messages.
+ */
+public class TenantEventMessageQueue extends LinkedBlockingQueue<TextMessage>{
+    private static volatile TenantEventMessageQueue instance;
+
+    private TenantEventMessageQueue(){
+    }
+
+    public static synchronized TenantEventMessageQueue getInstance() {
+        if (instance == null) {
+            synchronized (TenantEventMessageQueue.class){
+                if (instance == null) {
+                    instance = new TenantEventMessageQueue();
+                }
+            }
+        }
+        return instance;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageReceiver.java
new file mode 100644
index 0000000..89424a2
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageReceiver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageQueue;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+/**
+ * Implements functionality for receiving text based event messages from the tenant
+ * message broker topic and add them to the event queue.
+ */
+public class TenantEventMessageReceiver implements MessageListener {
+
+    private static final Log log = LogFactory.getLog(TenantEventMessageReceiver.class);
+
+    @Override
+    public void onMessage(Message message) {
+        if (message instanceof TextMessage) {
+            TextMessage receivedMessage = (TextMessage) message;
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText()));
+                }
+                // Add received message to the queue
+                TopologyEventMessageQueue.getInstance().add(receivedMessage);
+
+            } catch (JMSException e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java
index a6b59ff..230d0b3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java
@@ -44,6 +44,7 @@ public class TenantManager {
 
     private Map<Integer, Tenant> tenantIdTenantMap;
     private Map<String, Tenant> tenantDomainTenantMap;
+    private boolean initialized;
 
     public static void acquireReadLock() {
         if(log.isDebugEnabled()) {
@@ -112,4 +113,12 @@ public class TenantManager {
             tenantDomainTenantMap.remove(tenant.getTenantDomain());
         }
     }
+
+    public void setInitialized(boolean initialized) {
+        this.initialized = initialized;
+    }
+
+    public boolean isInitialized() {
+        return initialized;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 6f672c4..64793bf 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -54,7 +54,7 @@ public class TopologyEventMessageDelegator implements Runnable {
 
             while (!terminated) {
                 try {
-                    TextMessage message = TopologyEventQueue.getInstance().take();
+                    TextMessage message = TopologyEventMessageQueue.getInstance().take();
 
                     // Retrieve the header
                     String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
@@ -63,13 +63,13 @@ public class TopologyEventMessageDelegator implements Runnable {
                     String json = message.getText();
 
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("Event message received from queue: %s", type));
+                        log.debug(String.format("Topology event message received from queue: %s", type));
                     }
 
                     try {
                         TopologyManager.acquireWriteLock();
                         if (log.isDebugEnabled()) {
-                            log.debug(String.format("Delegating : %s", type));
+                            log.debug(String.format("Delegating topology event message: %s", type));
                         }
                         processorChain.process(type, json, TopologyManager.getTopology());
                     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java
new file mode 100644
index 0000000..8c6fbdb
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.topology;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.TextMessage;
+
+/**
+ * Implements a blocking queue for managing topology event messages.
+ */
+public class TopologyEventMessageQueue extends LinkedBlockingQueue<TextMessage>{
+    private static volatile TopologyEventMessageQueue instance;
+
+    private TopologyEventMessageQueue(){
+    }
+
+    public static synchronized TopologyEventMessageQueue getInstance() {
+        if (instance == null) {
+            synchronized (TopologyEventMessageQueue.class){
+                if (instance == null) {
+                    instance = new TopologyEventMessageQueue();
+                }
+            }
+        }
+        return instance;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageReceiver.java
index d496b38..947fc0c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageReceiver.java
@@ -40,10 +40,10 @@ public class TopologyEventMessageReceiver implements MessageListener {
             TextMessage receivedMessage = (TextMessage) message;
             try {
                 if (log.isDebugEnabled()) {
-                    log.debug("Message received: " + ((TextMessage) message).getText());
+                    log.debug(String.format("Topology message received: %s", ((TextMessage) message).getText()));
                 }
                 // Add received message to the queue
-                TopologyEventQueue.getInstance().add(receivedMessage);
+                TopologyEventMessageQueue.getInstance().add(receivedMessage);
 
             } catch (JMSException e) {
                 log.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventQueue.java
deleted file mode 100644
index 3667ada..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventQueue.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.receiver.topology;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.TextMessage;
-
-/**
- * Implements a blocking queue for managing topology event messages.
- */
-public class TopologyEventQueue extends LinkedBlockingQueue<TextMessage>{
-    private static volatile TopologyEventQueue instance;
-
-    private TopologyEventQueue(){
-    }
-
-    public static synchronized TopologyEventQueue getInstance() {
-        if (instance == null) {
-            synchronized (TopologyEventQueue.class){
-                if (instance == null) {
-                    instance = new TopologyEventQueue ();
-                }
-            }
-        }
-        return instance;
-    }
-}


[2/2] git commit: Merge remote-tracking branch 'origin/master'

Posted by im...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/257f08aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/257f08aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/257f08aa

Branch: refs/heads/master
Commit: 257f08aae3c1ca14eb3e5f0f6c60d666f8dafbc4
Parents: 82829f7 4c0241d
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Dec 6 18:38:25 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Dec 6 18:38:25 2013 +0530

----------------------------------------------------------------------
 .../org.apache.stratos.autoscaler/pom.xml       |  3 -
 .../internal/AutoscalerActivator.java           | 66 --------------------
 2 files changed, 69 deletions(-)
----------------------------------------------------------------------