You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/06 14:08:34 UTC
[1/2] git commit: Implemented tenant message receiver, queue,
deletagor and processor chain
Updated Branches:
refs/heads/master 4c0241dec -> 257f08aae
Implemented tenant message receiver, queue, deletagor and processor chain
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/82829f74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/82829f74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/82829f74
Branch: refs/heads/master
Commit: 82829f74ceb598dde48b41958e03c1fce9ebb8ac
Parents: e2261ac
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Dec 6 18:38:11 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Dec 6 18:38:11 2013 +0530
----------------------------------------------------------------------
.../event/topology/CompleteTopologyEvent.java | 4 +-
.../tenant/CompleteTenantMessageProcessor.java | 78 ++++++++++++++++
.../tenant/TenantCreatedMessageProcessor.java | 9 +-
.../tenant/TenantMessageProcessorChain.java | 74 +++++++++++++++
.../tenant/TenantRemovedMessageProcessor.java | 9 +-
.../tenant/TenantUpdatedMessageProcessor.java | 9 +-
.../tenant/TenantEventMessageDelegator.java | 97 ++++++++++++++++++++
.../tenant/TenantEventMessageQueue.java | 44 +++++++++
.../tenant/TenantEventMessageReceiver.java | 55 +++++++++++
.../message/receiver/tenant/TenantManager.java | 9 ++
.../topology/TopologyEventMessageDelegator.java | 6 +-
.../topology/TopologyEventMessageQueue.java | 45 +++++++++
.../topology/TopologyEventMessageReceiver.java | 4 +-
.../receiver/topology/TopologyEventQueue.java | 45 ---------
14 files changed, 431 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/CompleteTopologyEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/CompleteTopologyEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/CompleteTopologyEvent.java
index 1211550..68b5d73 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/CompleteTopologyEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/CompleteTopologyEvent.java
@@ -24,7 +24,9 @@ import org.apache.stratos.messaging.domain.topology.Topology;
import java.io.Serializable;
/**
- *
+ * This event is fired periodically with the complete topology. It would be a
+ * starting point for subscribers to initialize the current state of the topology
+ * before receiving other topology events.
*/
public class CompleteTopologyEvent extends TopologyEvent implements Serializable {
private static final long serialVersionUID = 8580862188444892004L;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/CompleteTenantMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/CompleteTenantMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/CompleteTenantMessageProcessor.java
new file mode 100644
index 0000000..caf72f4
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/CompleteTenantMessageProcessor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.tenant.TenantCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * Complete tenant message processor for initializing the tenant manager and
+ * triggering complete tenant event listeners when the complete tenant event
+ * message is received.
+ */
+public class CompleteTenantMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(CompleteTenantMessageProcessor.class);
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (TenantCreatedEvent.class.getName().equals(type)) {
+ // Return if tenant manager has already initialized
+ if(TenantManager.getInstance().isInitialized()) {
+ return false;
+ }
+
+ // Parse complete message and build event
+ TenantCreatedEvent event = (TenantCreatedEvent) Util.jsonToObject(message, TenantCreatedEvent.class);
+
+ try {
+ TenantManager.acquireWriteLock();
+ TenantManager.getInstance().addTenant(event.getTenant());
+ if(log.isInfoEnabled()) {
+ log.info("Tenant initialized");
+ }
+ TenantManager.getInstance().setInitialized(true);
+ return true;
+ }
+ finally {
+ TenantManager.releaseWriteLock();
+ }
+ }
+ else {
+ if(nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ }
+ else {
+ throw new RuntimeException(String.format("Failed to process tenant message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java
index ebd8b5d..728b484 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java
@@ -27,8 +27,8 @@ import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
import org.apache.stratos.messaging.util.Util;
/**
- * Tenant created message processor for triggering tenant created event
- * listener when a tenant created event message is received.
+ * Tenant created message processor for adding tenant to the tenant manager and
+ * triggering tenant created event listeners when a tenant created event message is received.
*/
public class TenantCreatedMessageProcessor extends MessageProcessor {
@@ -44,6 +44,11 @@ public class TenantCreatedMessageProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
if (TenantCreatedEvent.class.getName().equals(type)) {
+ // Return if tenant manager has not initialized
+ if(!TenantManager.getInstance().isInitialized()) {
+ return false;
+ }
+
// Parse complete message and build event
TenantCreatedEvent event = (TenantCreatedEvent) Util.jsonToObject(message, TenantCreatedEvent.class);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java
new file mode 100644
index 0000000..57d1b8c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.tenant.TenantCreatedEventListener;
+import org.apache.stratos.messaging.listener.tenant.TenantRemovedEventListener;
+import org.apache.stratos.messaging.listener.tenant.TenantUpdatedEventListener;
+import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Defines default tenant message processor chain.
+ */
+public class TenantMessageProcessorChain extends MessageProcessorChain {
+ private static final Log log = LogFactory.getLog(TenantMessageProcessorChain.class);
+
+ private CompleteTenantMessageProcessor completeTopologyMessageProcessor;
+ private TenantCreatedMessageProcessor tenantCreatedMessageProcessor;
+ private TenantUpdatedMessageProcessor tenantUpdatedMessageProcessor;
+ private TenantRemovedMessageProcessor tenantRemovedMessageProcessor;
+
+ public void initialize() {
+ // Add topology event processors
+ completeTopologyMessageProcessor = new CompleteTenantMessageProcessor();
+ add(completeTopologyMessageProcessor);
+
+ tenantCreatedMessageProcessor = new TenantCreatedMessageProcessor();
+ add(tenantCreatedMessageProcessor);
+
+ tenantUpdatedMessageProcessor = new TenantUpdatedMessageProcessor();
+ add(tenantUpdatedMessageProcessor);
+
+ tenantRemovedMessageProcessor = new TenantRemovedMessageProcessor();
+ add(tenantRemovedMessageProcessor);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Tenant message processor chain initialized");
+ }
+ }
+
+ public void addEventListener(EventListener eventListener) {
+ if (eventListener instanceof CompleteTopologyEventListener) {
+ completeTopologyMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof TenantCreatedEventListener) {
+ tenantCreatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof TenantUpdatedEventListener) {
+ tenantUpdatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof TenantRemovedEventListener) {
+ tenantRemovedMessageProcessor.addEventListener(eventListener);
+ } else {
+ throw new RuntimeException("Unknown event listener");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java
index 2ad5cb8..cfbab03 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java
@@ -28,8 +28,8 @@ import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
import org.apache.stratos.messaging.util.Util;
/**
- * Tenant removed message processor for triggering tenant removed event
- * listener when a tenant removed event message is received.
+ * Tenant removed message processor for removing a given tenant from tenant manager
+ * and triggering tenant removed event listeners when a tenant removed event message is received.
*/
public class TenantRemovedMessageProcessor extends MessageProcessor {
@@ -45,6 +45,11 @@ public class TenantRemovedMessageProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
if (TenantRemovedEvent.class.getName().equals(type)) {
+ // Return if tenant manager has not initialized
+ if(!TenantManager.getInstance().isInitialized()) {
+ return false;
+ }
+
// Parse complete message and build event
TenantRemovedEvent event = (TenantRemovedEvent) Util.jsonToObject(message, TenantRemovedEvent.class);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java
index e89ef36..22aca45 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java
@@ -28,8 +28,8 @@ import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
import org.apache.stratos.messaging.util.Util;
/**
- * Tenant updated message processor for triggering tenant updated event
- * listener when a tenant updated event message is received.
+ * Tenant updated message processor for updating a given tenant in tenant manager and
+ * triggering tenant updated event listeners when a tenant updated event message is received.
*/
public class TenantUpdatedMessageProcessor extends MessageProcessor {
@@ -45,6 +45,11 @@ public class TenantUpdatedMessageProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
if (TenantUpdatedEvent.class.getName().equals(type)) {
+ // Return if tenant manager has not initialized
+ if(!TenantManager.getInstance().isInitialized()) {
+ return false;
+ }
+
// Parse complete message and build event
TenantUpdatedEvent event = (TenantUpdatedEvent) Util.jsonToObject(message, TenantUpdatedEvent.class);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
new file mode 100644
index 0000000..e93c886
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.tenant.TenantMessageProcessorChain;
+import org.apache.stratos.messaging.util.Constants;
+
+import javax.jms.TextMessage;
+
+
+/**
+ * Implements logic for processing topology event messages based on a given
+ * topology process chain.
+ */
+public class TenantEventMessageDelegator implements Runnable {
+
+ private static final Log log = LogFactory.getLog(TenantEventMessageDelegator.class);
+ private MessageProcessorChain processorChain;
+ private boolean terminated;
+
+ public TenantEventMessageDelegator() {
+ this.processorChain = new TenantMessageProcessorChain();
+ }
+
+ public TenantEventMessageDelegator(MessageProcessorChain processorChain) {
+ this.processorChain = processorChain;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (log.isInfoEnabled()) {
+ log.info("Tenant event message delegator started");
+ }
+
+ while (!terminated) {
+ try {
+ TextMessage message = TenantEventMessageQueue.getInstance().take();
+
+ // Retrieve the header
+ String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+ // Retrieve the actual message
+ String json = message.getText();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Tenant event message received from queue: %s", type));
+ }
+
+ try {
+ TenantManager.acquireWriteLock();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Delegating tenant event message: %s", type));
+ }
+ processorChain.process(type, json, null);
+ } finally {
+ TenantManager.releaseWriteLock();
+ }
+
+ } catch (Exception e) {
+ log.error("Failed to retrieve tenant event message", e);
+ }
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Tenant event message delegator failed", e);
+ }
+ }
+ }
+
+ /**
+ * Terminate topology event message delegator thread.
+ */
+ public void terminate() {
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java
new file mode 100644
index 0000000..ca2f7a6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.tenant;
+
+import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implements a blocking queue for managing tenant event messages.
+ */
+public class TenantEventMessageQueue extends LinkedBlockingQueue<TextMessage>{
+ private static volatile TenantEventMessageQueue instance;
+
+ private TenantEventMessageQueue(){
+ }
+
+ public static synchronized TenantEventMessageQueue getInstance() {
+ if (instance == null) {
+ synchronized (TenantEventMessageQueue.class){
+ if (instance == null) {
+ instance = new TenantEventMessageQueue();
+ }
+ }
+ }
+ return instance;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageReceiver.java
new file mode 100644
index 0000000..89424a2
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageReceiver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageQueue;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+/**
+ * Implements functionality for receiving text based event messages from the tenant
+ * message broker topic and add them to the event queue.
+ */
+public class TenantEventMessageReceiver implements MessageListener {
+
+ private static final Log log = LogFactory.getLog(TenantEventMessageReceiver.class);
+
+ @Override
+ public void onMessage(Message message) {
+ if (message instanceof TextMessage) {
+ TextMessage receivedMessage = (TextMessage) message;
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText()));
+ }
+ // Add received message to the queue
+ TopologyEventMessageQueue.getInstance().add(receivedMessage);
+
+ } catch (JMSException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java
index a6b59ff..230d0b3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java
@@ -44,6 +44,7 @@ public class TenantManager {
private Map<Integer, Tenant> tenantIdTenantMap;
private Map<String, Tenant> tenantDomainTenantMap;
+ private boolean initialized;
public static void acquireReadLock() {
if(log.isDebugEnabled()) {
@@ -112,4 +113,12 @@ public class TenantManager {
tenantDomainTenantMap.remove(tenant.getTenantDomain());
}
}
+
+ public void setInitialized(boolean initialized) {
+ this.initialized = initialized;
+ }
+
+ public boolean isInitialized() {
+ return initialized;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 6f672c4..64793bf 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -54,7 +54,7 @@ public class TopologyEventMessageDelegator implements Runnable {
while (!terminated) {
try {
- TextMessage message = TopologyEventQueue.getInstance().take();
+ TextMessage message = TopologyEventMessageQueue.getInstance().take();
// Retrieve the header
String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
@@ -63,13 +63,13 @@ public class TopologyEventMessageDelegator implements Runnable {
String json = message.getText();
if (log.isDebugEnabled()) {
- log.debug(String.format("Event message received from queue: %s", type));
+ log.debug(String.format("Topology event message received from queue: %s", type));
}
try {
TopologyManager.acquireWriteLock();
if (log.isDebugEnabled()) {
- log.debug(String.format("Delegating : %s", type));
+ log.debug(String.format("Delegating topology event message: %s", type));
}
processorChain.process(type, json, TopologyManager.getTopology());
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java
new file mode 100644
index 0000000..8c6fbdb
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.topology;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.TextMessage;
+
+/**
+ * Implements a blocking queue for managing topology event messages.
+ */
+public class TopologyEventMessageQueue extends LinkedBlockingQueue<TextMessage>{
+ private static volatile TopologyEventMessageQueue instance;
+
+ private TopologyEventMessageQueue(){
+ }
+
+ public static synchronized TopologyEventMessageQueue getInstance() {
+ if (instance == null) {
+ synchronized (TopologyEventMessageQueue.class){
+ if (instance == null) {
+ instance = new TopologyEventMessageQueue();
+ }
+ }
+ }
+ return instance;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageReceiver.java
index d496b38..947fc0c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageReceiver.java
@@ -40,10 +40,10 @@ public class TopologyEventMessageReceiver implements MessageListener {
TextMessage receivedMessage = (TextMessage) message;
try {
if (log.isDebugEnabled()) {
- log.debug("Message received: " + ((TextMessage) message).getText());
+ log.debug(String.format("Topology message received: %s", ((TextMessage) message).getText()));
}
// Add received message to the queue
- TopologyEventQueue.getInstance().add(receivedMessage);
+ TopologyEventMessageQueue.getInstance().add(receivedMessage);
} catch (JMSException e) {
log.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/82829f74/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventQueue.java
deleted file mode 100644
index 3667ada..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventQueue.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.receiver.topology;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.TextMessage;
-
-/**
- * Implements a blocking queue for managing topology event messages.
- */
-public class TopologyEventQueue extends LinkedBlockingQueue<TextMessage>{
- private static volatile TopologyEventQueue instance;
-
- private TopologyEventQueue(){
- }
-
- public static synchronized TopologyEventQueue getInstance() {
- if (instance == null) {
- synchronized (TopologyEventQueue.class){
- if (instance == null) {
- instance = new TopologyEventQueue ();
- }
- }
- }
- return instance;
- }
-}
[2/2] git commit: Merge remote-tracking branch 'origin/master'
Posted by im...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/257f08aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/257f08aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/257f08aa
Branch: refs/heads/master
Commit: 257f08aae3c1ca14eb3e5f0f6c60d666f8dafbc4
Parents: 82829f7 4c0241d
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Dec 6 18:38:25 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Dec 6 18:38:25 2013 +0530
----------------------------------------------------------------------
.../org.apache.stratos.autoscaler/pom.xml | 3 -
.../internal/AutoscalerActivator.java | 66 --------------------
2 files changed, 69 deletions(-)
----------------------------------------------------------------------