You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/04/17 21:25:58 UTC
[2/4] Updated TopologyEventReceiver, TenantEventReceiver,
InstanceNotifierEventReceiver,
HealthStatEventReceiver and implemented separate message queues for each
receiver instance
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
new file mode 100644
index 0000000..7efaa1c
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.context.LoadBalancerContext;
+import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
+import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
+import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * Load balancer topology receiver updates load balancer context according to
+ * incoming topology events.
+ */
+public class LoadBalancerTopologyEventReceiver implements Runnable {
+
+ private static final Log log = LogFactory.getLog(LoadBalancerTopologyEventReceiver.class);
+
+ private TopologyEventReceiver topologyEventReceiver;
+ private boolean terminated;
+
+ public LoadBalancerTopologyEventReceiver() {
+ this.topologyEventReceiver = new TopologyEventReceiver();
+ addEventListeners();
+ }
+
+ @Override
+ public void run() {
+ Thread thread = new Thread(topologyEventReceiver);
+ thread.start();
+ if (log.isInfoEnabled()) {
+ log.info("Load balancer topology receiver thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ if (log.isInfoEnabled()) {
+ log.info("Load balancer topology receiver thread terminated");
+ }
+ }
+
+ private void addEventListeners() {
+ // Listen to topology events that affect clusters
+ topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ TopologyManager.acquireReadLock();
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (clusterHasActiveMembers(cluster)) {
+ LoadBalancerContextUtil.addClusterToLbContext(cluster);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Cluster does not have any active members");
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ private boolean clusterHasActiveMembers(Cluster cluster) {
+ for (Member member : cluster.getMembers()) {
+ if (member.getStatus() == MemberStatus.Activated) {
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ TopologyManager.acquireReadLock();
+
+ // Add cluster to load balancer context when its first member is activated
+ MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+ if (LoadBalancerContext.getInstance().getClusterIdClusterMap().containsCluster(memberActivatedEvent.getClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster exists in load balancer context: [service] %s [cluster] %s",
+ memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId()));
+ }
+ return;
+ }
+ // Cluster not found in load balancer context, add it
+ Service service = TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName());
+ if (service != null) {
+ Cluster cluster = service.getCluster(memberActivatedEvent.getClusterId());
+ if (cluster != null) {
+ LoadBalancerContextUtil.addClusterToLbContext(cluster);
+ } else {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Cluster not found in topology: [service] %s [cluster] %s",
+ memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId()));
+ }
+ }
+ } else {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Service not found in topology: [service] %s", memberActivatedEvent.getServiceName()));
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+ });
+ topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ TopologyManager.acquireReadLock();
+
+ // Remove cluster from context
+ ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+ Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId());
+ if (cluster != null) {
+ LoadBalancerContextUtil.removeClusterFromLbContext(cluster.getClusterId());
+ } else {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not found in load balancer context: [service] %s [cluster] %s",
+ clusterRemovedEvent.getServiceName(), clusterRemovedEvent.getClusterId()));
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+ });
+ topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ TopologyManager.acquireReadLock();
+
+ // Remove all clusters of given service from context
+ ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event;
+ Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName());
+ if (service != null) {
+ for (Cluster cluster : service.getClusters()) {
+ LoadBalancerContextUtil.removeClusterFromLbContext(cluster.getClusterId());
+ }
+ } else {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service not found in topology: [service] %s", serviceRemovedEvent.getServiceName()));
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+ });
+ }
+
+ /**
+ * Terminate load balancer topology receiver thread.
+ */
+ public void terminate() {
+ topologyEventReceiver.terminate();
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
deleted file mode 100644
index c039f1b..0000000
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.context.LoadBalancerContext;
-import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
-import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
-import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
-import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
-import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver;
-
-/**
- * Load balancer topology receiver updates load balancer context according to
- * incoming topology events.
- */
-public class LoadBalancerTopologyReceiver implements Runnable {
-
- private static final Log log = LogFactory.getLog(LoadBalancerTopologyReceiver.class);
-
- private TopologyReceiver topologyReceiver;
- private boolean terminated;
-
- public LoadBalancerTopologyReceiver() {
- this.topologyReceiver = new TopologyReceiver(createMessageDelegator());
- }
-
- @Override
- public void run() {
- Thread thread = new Thread(topologyReceiver);
- thread.start();
- if (log.isInfoEnabled()) {
- log.info("Load balancer topology receiver thread started");
- }
-
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
- if (log.isInfoEnabled()) {
- log.info("Load balancer topology receiver thread terminated");
- }
- }
-
- private TopologyEventMessageDelegator createMessageDelegator() {
- TopologyMessageProcessorChain processorChain = createEventProcessorChain();
- return new TopologyEventMessageDelegator(processorChain);
- }
-
- private TopologyMessageProcessorChain createEventProcessorChain() {
- // Listen to topology events that affect clusters
- TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
- processorChain.addEventListener(new CompleteTopologyEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- TopologyManager.acquireReadLock();
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (clusterHasActiveMembers(cluster)) {
- LoadBalancerContextUtil.addClusterToLbContext(cluster);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Cluster does not have any active members");
- }
- }
- }
- }
- } catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- private boolean clusterHasActiveMembers(Cluster cluster) {
- for (Member member : cluster.getMembers()) {
- if (member.getStatus() == MemberStatus.Activated) {
- return true;
- }
- }
- return false;
- }
- });
- processorChain.addEventListener(new MemberActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- TopologyManager.acquireReadLock();
-
- // Add cluster to load balancer context when its first member is activated
- MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
- if (LoadBalancerContext.getInstance().getClusterIdClusterMap().containsCluster(memberActivatedEvent.getClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster exists in load balancer context: [service] %s [cluster] %s",
- memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId()));
- }
- return;
- }
- // Cluster not found in load balancer context, add it
- Service service = TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName());
- if (service != null) {
- Cluster cluster = service.getCluster(memberActivatedEvent.getClusterId());
- if (cluster != null) {
- LoadBalancerContextUtil.addClusterToLbContext(cluster);
- } else {
- if (log.isErrorEnabled()) {
- log.error(String.format("Cluster not found in topology: [service] %s [cluster] %s",
- memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId()));
- }
- }
- } else {
- if (log.isErrorEnabled()) {
- log.error(String.format("Service not found in topology: [service] %s", memberActivatedEvent.getServiceName()));
- }
- }
- } catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
- });
- processorChain.addEventListener(new ClusterRemovedEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- TopologyManager.acquireReadLock();
-
- // Remove cluster from context
- ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
- Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId());
- if (cluster != null) {
- LoadBalancerContextUtil.removeClusterFromLbContext(cluster.getClusterId());
- } else {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster not found in load balancer context: [service] %s [cluster] %s",
- clusterRemovedEvent.getServiceName(), clusterRemovedEvent.getClusterId()));
- }
- }
- } catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
- });
- processorChain.addEventListener(new ServiceRemovedEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- TopologyManager.acquireReadLock();
-
- // Remove all clusters of given service from context
- ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event;
- Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName());
- if (service != null) {
- for (Cluster cluster : service.getClusters()) {
- LoadBalancerContextUtil.removeClusterFromLbContext(cluster.getClusterId());
- }
- } else {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service not found in topology: [service] %s", serviceRemovedEvent.getServiceName()));
- }
- }
- } catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
- });
- return processorChain;
- }
-
- /**
- * Terminate load balancer topology receiver thread.
- */
- public void terminate() {
- topologyReceiver.terminate();
- terminated = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index da7f3de..f7158a7 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -24,8 +24,8 @@ import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.load.balancer.EndpointDeployer;
-import org.apache.stratos.load.balancer.LoadBalancerTenantReceiver;
-import org.apache.stratos.load.balancer.LoadBalancerTopologyReceiver;
+import org.apache.stratos.load.balancer.LoadBalancerTenantEventReceiver;
+import org.apache.stratos.load.balancer.LoadBalancerTopologyEventReceiver;
import org.apache.stratos.load.balancer.TenantAwareLoadBalanceEndpointException;
import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier;
@@ -101,8 +101,8 @@ public class LoadBalancerServiceComponent {
private static final Log log = LogFactory.getLog(LoadBalancerServiceComponent.class);
private boolean activated = false;
- private LoadBalancerTopologyReceiver topologyReceiver;
- private LoadBalancerTenantReceiver tenantReceiver;
+ private LoadBalancerTopologyEventReceiver topologyReceiver;
+ private LoadBalancerTenantEventReceiver tenantReceiver;
private LoadBalancerStatisticsNotifier statisticsNotifier;
protected void activate(ComponentContext ctxt) {
@@ -127,7 +127,7 @@ public class LoadBalancerServiceComponent {
// Configure jndi.properties
JndiConfigurator.configure(configuration);
- tenantReceiver = new LoadBalancerTenantReceiver();
+ tenantReceiver = new LoadBalancerTenantEventReceiver();
Thread tenantReceiverThread = new Thread(tenantReceiver);
tenantReceiverThread.start();
if (log.isInfoEnabled()) {
@@ -142,7 +142,7 @@ public class LoadBalancerServiceComponent {
}
// Start topology receiver
- topologyReceiver = new LoadBalancerTopologyReceiver();
+ topologyReceiver = new LoadBalancerTopologyEventReceiver();
Thread topologyReceiverThread = new Thread(topologyReceiver);
topologyReceiverThread.start();
if (log.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index 81c1dbe..c843fdb 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -24,7 +24,7 @@ import org.apache.stratos.manager.listener.InstanceStatusListener;
import org.apache.stratos.manager.publisher.TenantEventPublisher;
import org.apache.stratos.manager.publisher.TenantSynchronizerTaskScheduler;
import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
-import org.apache.stratos.manager.topology.receiver.StratosManagerTopologyReceiver;
+import org.apache.stratos.manager.topology.receiver.StratosManagerTopologyEventReceiver;
import org.apache.stratos.manager.utils.CartridgeConfigFileReader;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
@@ -60,7 +60,7 @@ import org.wso2.carbon.utils.ConfigurationContextService;
public class ADCManagementServerComponent {
private static final Log log = LogFactory.getLog(ADCManagementServerComponent.class);
- private StratosManagerTopologyReceiver stratosManagerTopologyReceiver;
+ private StratosManagerTopologyEventReceiver stratosManagerTopologyEventReceiver;
protected void activate(ComponentContext componentContext) throws Exception {
try {
@@ -102,8 +102,8 @@ public class ADCManagementServerComponent {
Thread topologyReceiverThread = new Thread(topologyReceiver);
topologyReceiverThread.start();*/
- stratosManagerTopologyReceiver = new StratosManagerTopologyReceiver();
- Thread topologyReceiverThread = new Thread(stratosManagerTopologyReceiver);
+ stratosManagerTopologyEventReceiver = new StratosManagerTopologyEventReceiver();
+ Thread topologyReceiverThread = new Thread(stratosManagerTopologyEventReceiver);
topologyReceiverThread.start();
log.info("Topology receiver thread started");
@@ -174,6 +174,6 @@ public class ADCManagementServerComponent {
protected void deactivate(ComponentContext context) {
//terminate Stratos Manager Topology Receiver
- stratosManagerTopologyReceiver.terminate();
+ stratosManagerTopologyEventReceiver.terminate();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
new file mode 100644
index 0000000..94ba486
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.topology.receiver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.topology.model.TopologyClusterInformationModel;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.listener.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+public class StratosManagerTopologyEventReceiver implements Runnable {
+
+ private static final Log log = LogFactory.getLog(StratosManagerTopologyEventReceiver.class);
+
+ private TopologyEventReceiver topologyEventReceiver;
+ private boolean terminated;
+
+ public StratosManagerTopologyEventReceiver() {
+ this.terminated = false;
+ this.topologyEventReceiver = new TopologyEventReceiver();
+ addEventListeners();
+ }
+
+ private void addEventListeners() {
+ //add listner to Complete Topology Event
+ topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ if (TopologyClusterInformationModel.getInstance().isInitialized()) {
+ return;
+ }
+
+ log.info("[CompleteTopologyEventListener] Received: " + event.getClass());
+
+ try {
+ TopologyManager.acquireReadLock();
+
+ for (Service service : TopologyManager.getTopology()
+ .getServices()) {
+ // iterate through all clusters
+ for (Cluster cluster : service.getClusters()) {
+ TopologyClusterInformationModel.getInstance()
+ .addCluster(cluster);
+ }
+ }
+
+ TopologyClusterInformationModel.getInstance().setInitialized(true);
+
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+ });
+
+ //Cluster Created event listner
+ topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[ClusterCreatedEventListener] Received: " + event.getClass());
+
+ ClusterCreatedEvent clustercreatedEvent = (ClusterCreatedEvent) event;
+
+ String serviceType = clustercreatedEvent.getServiceName();
+ //acquire read lock
+ TopologyManager.acquireReadLock();
+
+ try {
+ Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clustercreatedEvent.getClusterId());
+ TopologyClusterInformationModel.getInstance().addCluster(cluster);
+
+ } finally {
+ //release read lock
+ TopologyManager.releaseReadLock();
+ }
+
+ }
+ });
+
+
+ //Cluster Removed event listner
+ topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[ClusterRemovedEventListener] Received: " + event.getClass());
+
+ ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+ TopologyClusterInformationModel.getInstance().removeCluster(clusterRemovedEvent.getClusterId());
+ }
+ });
+
+
+ //Instance Spawned event listner
+ topologyEventReceiver.addEventListener(new InstanceSpawnedEventListener() {
+
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[InstanceSpawnedEventListener] Received: " + event.getClass());
+
+ InstanceSpawnedEvent instanceSpawnedEvent = (InstanceSpawnedEvent) event;
+
+ String clusterDomain = instanceSpawnedEvent.getClusterId();
+
+ String serviceType = instanceSpawnedEvent.getServiceName();
+ //acquire read lock
+ TopologyManager.acquireReadLock();
+
+ try {
+ Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
+ TopologyClusterInformationModel.getInstance().addCluster(cluster);
+ } finally {
+ //release read lock
+ TopologyManager.releaseReadLock();
+ }
+ }
+ });
+
+ //Member Started event listner
+ topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[MemberStartedEventListener] Received: " + event.getClass());
+
+ MemberStartedEvent memberStartedEvent = (MemberStartedEvent) event;
+
+ String clusterDomain = memberStartedEvent.getClusterId();
+
+ String serviceType = memberStartedEvent.getServiceName();
+ //acquire read lock
+ TopologyManager.acquireReadLock();
+
+ try {
+ Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
+ TopologyClusterInformationModel.getInstance().addCluster(cluster);
+ } finally {
+ //release read lock
+ TopologyManager.releaseReadLock();
+ }
+
+ }
+ });
+
+ //Member Activated event listner
+ topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[MemberActivatedEventListener] Received: " + event.getClass());
+
+ MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+ String clusterDomain = memberActivatedEvent.getClusterId();
+
+ String serviceType = memberActivatedEvent.getServiceName();
+ //acquire read lock
+ TopologyManager.acquireReadLock();
+
+ try {
+ Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
+ TopologyClusterInformationModel.getInstance().addCluster(cluster);
+ } finally {
+ //release read lock
+ TopologyManager.releaseReadLock();
+ }
+ }
+ });
+
+ //Member Suspended event listner
+ topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[MemberSuspendedEventListener] Received: " + event.getClass());
+
+ MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event;
+
+ String clusterDomain = memberSuspendedEvent.getClusterId();
+
+ String serviceType = memberSuspendedEvent.getServiceName();
+ //acquire read lock
+ TopologyManager.acquireReadLock();
+
+ try {
+ Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
+ TopologyClusterInformationModel.getInstance().addCluster(cluster);
+
+ } finally {
+ //release read lock
+ TopologyManager.releaseReadLock();
+ }
+ }
+ });
+
+ //Member Terminated event listner
+ topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[MemberTerminatedEventListener] Received: " + event.getClass());
+
+ MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+
+ String clusterDomain = memberTerminatedEvent.getClusterId();
+
+ String serviceType = memberTerminatedEvent.getServiceName();
+ //acquire read lock
+ TopologyManager.acquireReadLock();
+
+ try {
+ Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
+
+ // check and remove terminated member
+ if (cluster.memberExists(memberTerminatedEvent.getMemberId())) {
+ // release the read lock and acquire the write lock
+ TopologyManager.releaseReadLock();
+ TopologyManager.acquireWriteLock();
+
+ try {
+ // re-check the state; another thread might have acquired the write lock and modified
+ if (cluster.memberExists(memberTerminatedEvent.getMemberId())) {
+ // remove the member from the cluster
+ Member terminatedMember = cluster.getMember(memberTerminatedEvent.getMemberId());
+ cluster.removeMember(terminatedMember);
+ if (log.isDebugEnabled()) {
+ log.debug("Removed the terminated member with id " + memberTerminatedEvent.getMemberId() + " from the cluster");
+ }
+ }
+
+ // downgrade to read lock - 1. acquire read lock, 2. release write lock
+ // acquire read lock
+ TopologyManager.acquireReadLock();
+
+ } finally {
+ // release the write lock
+ TopologyManager.releaseWriteLock();
+ }
+ }
+ TopologyClusterInformationModel.getInstance().addCluster(cluster);
+ } finally {
+ //release read lock
+ TopologyManager.releaseReadLock();
+ }
+ }
+ });
+ }
+
+
+ @Override
+ public void run() {
+ Thread thread = new Thread(topologyEventReceiver);
+ thread.start();
+ log.info("Stratos Manager topology receiver thread started");
+
+ //Keep running till terminate is set from deactivate method of the component
+ while (!terminated) {
+ //loop while terminate = false
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ log.info("Stratos Manager topology receiver thread terminated");
+ }
+
+ //terminate Topology Receiver
+ public void terminate () {
+ topologyEventReceiver.terminate();
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java
deleted file mode 100644
index 46b3313..0000000
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.manager.topology.receiver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.topology.model.TopologyClusterInformationModel;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListener;
-import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
-import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
-import org.apache.stratos.messaging.listener.topology.InstanceSpawnedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberStartedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberSuspendedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
-import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver;
-
-public class StratosManagerTopologyReceiver implements Runnable {
-
- private static final Log log = LogFactory.getLog(StratosManagerTopologyReceiver.class);
-
- private TopologyReceiver stratosManagerTopologyReceiver;
- private boolean terminate;
-
- public StratosManagerTopologyReceiver() {
- this.terminate = false;
- this.stratosManagerTopologyReceiver = new TopologyReceiver(createMessageDelegator());
- }
-
- private TopologyEventMessageDelegator createMessageDelegator() {
- TopologyMessageProcessorChain processorChain = createEventProcessorChain();
- return new TopologyEventMessageDelegator(processorChain);
- }
-
- private TopologyMessageProcessorChain createEventProcessorChain() {
-
- TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
-
- //add listner to Complete Topology Event
- processorChain.addEventListener(new CompleteTopologyEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- if (TopologyClusterInformationModel.getInstance().isInitialized()) {
- return;
- }
-
- log.info("[CompleteTopologyEventListener] Received: " + event.getClass());
-
- try {
- TopologyManager.acquireReadLock();
-
- for (Service service : TopologyManager.getTopology()
- .getServices()) {
- // iterate through all clusters
- for (Cluster cluster : service.getClusters()) {
- TopologyClusterInformationModel.getInstance()
- .addCluster(cluster);
- }
- }
-
- TopologyClusterInformationModel.getInstance().setInitialized(true);
-
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
- });
-
- //Cluster Created event listner
- processorChain.addEventListener(new ClusterCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- log.info("[ClusterCreatedEventListener] Received: " + event.getClass());
-
- ClusterCreatedEvent clustercreatedEvent = (ClusterCreatedEvent) event;
-
- String serviceType = clustercreatedEvent.getServiceName();
- //acquire read lock
- TopologyManager.acquireReadLock();
-
- try {
- Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clustercreatedEvent.getClusterId());
- TopologyClusterInformationModel.getInstance().addCluster(cluster);
-
- } finally {
- //release read lock
- TopologyManager.releaseReadLock();
- }
-
- }
- });
-
-
- //Cluster Removed event listner
- processorChain.addEventListener(new ClusterRemovedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- log.info("[ClusterRemovedEventListener] Received: " + event.getClass());
-
- ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
- TopologyClusterInformationModel.getInstance().removeCluster(clusterRemovedEvent.getClusterId());
- }
- });
-
-
- //Instance Spawned event listner
- processorChain.addEventListener(new InstanceSpawnedEventListener() {
-
- @Override
- protected void onEvent(Event event) {
-
- log.info("[InstanceSpawnedEventListener] Received: " + event.getClass());
-
- InstanceSpawnedEvent instanceSpawnedEvent = (InstanceSpawnedEvent) event;
-
- String clusterDomain = instanceSpawnedEvent.getClusterId();
-
- String serviceType = instanceSpawnedEvent.getServiceName();
- //acquire read lock
- TopologyManager.acquireReadLock();
-
- try {
- Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
- TopologyClusterInformationModel.getInstance().addCluster(cluster);
- } finally {
- //release read lock
- TopologyManager.releaseReadLock();
- }
- }
- });
-
- //Member Started event listner
- processorChain.addEventListener(new MemberStartedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- log.info("[MemberStartedEventListener] Received: " + event.getClass());
-
- MemberStartedEvent memberStartedEvent = (MemberStartedEvent) event;
-
- String clusterDomain = memberStartedEvent.getClusterId();
-
- String serviceType = memberStartedEvent.getServiceName();
- //acquire read lock
- TopologyManager.acquireReadLock();
-
- try {
- Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
- TopologyClusterInformationModel.getInstance().addCluster(cluster);
- } finally {
- //release read lock
- TopologyManager.releaseReadLock();
- }
-
- }
- });
-
- //Member Activated event listner
- processorChain.addEventListener(new MemberActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- log.info("[MemberActivatedEventListener] Received: " + event.getClass());
-
- MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
-
- String clusterDomain = memberActivatedEvent.getClusterId();
-
- String serviceType = memberActivatedEvent.getServiceName();
- //acquire read lock
- TopologyManager.acquireReadLock();
-
- try {
- Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
- TopologyClusterInformationModel.getInstance().addCluster(cluster);
- } finally {
- //release read lock
- TopologyManager.releaseReadLock();
- }
- }
- });
-
- //Member Suspended event listner
- processorChain.addEventListener(new MemberSuspendedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- log.info("[MemberSuspendedEventListener] Received: " + event.getClass());
-
- MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event;
-
- String clusterDomain = memberSuspendedEvent.getClusterId();
-
- String serviceType = memberSuspendedEvent.getServiceName();
- //acquire read lock
- TopologyManager.acquireReadLock();
-
- try {
- Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
- TopologyClusterInformationModel.getInstance().addCluster(cluster);
-
- } finally {
- //release read lock
- TopologyManager.releaseReadLock();
- }
- }
- });
-
- //Member Terminated event listner
- processorChain.addEventListener(new MemberTerminatedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- log.info("[MemberTerminatedEventListener] Received: " + event.getClass());
-
- MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
-
- String clusterDomain = memberTerminatedEvent.getClusterId();
-
- String serviceType = memberTerminatedEvent.getServiceName();
- //acquire read lock
- TopologyManager.acquireReadLock();
-
- try {
- Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
-
- // check and remove terminated member
- if (cluster.memberExists(memberTerminatedEvent.getMemberId())) {
- // release the read lock and acquire the write lock
- TopologyManager.releaseReadLock();
- TopologyManager.acquireWriteLock();
-
- try {
- // re-check the state; another thread might have acquired the write lock and modified
- if (cluster.memberExists(memberTerminatedEvent.getMemberId())) {
- // remove the member from the cluster
- Member terminatedMember = cluster.getMember(memberTerminatedEvent.getMemberId());
- cluster.removeMember(terminatedMember);
- if (log.isDebugEnabled()) {
- log.debug("Removed the terminated member with id " + memberTerminatedEvent.getMemberId() + " from the cluster");
- }
- }
-
- // downgrade to read lock - 1. acquire read lock, 2. release write lock
- // acquire read lock
- TopologyManager.acquireReadLock();
-
- } finally {
- // release the write lock
- TopologyManager.releaseWriteLock();
- }
- }
- TopologyClusterInformationModel.getInstance().addCluster(cluster);
- } finally {
- //release read lock
- TopologyManager.releaseReadLock();
- }
- }
- });
-
- return processorChain;
- }
-
-
- @Override
- public void run() {
-
- Thread thread = new Thread(stratosManagerTopologyReceiver);
- thread.start();
- log.info("Stratos Manager topology receiver thread started");
-
- //Keep running till terminate is set from deactivate method of the component
- while (!terminate) {
- //loop while terminate = false
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
- log.info("Stratos Manager topology receiver thread terminated");
- }
-
- //terminate Topology Receiver
- public void terminate () {
- stratosManagerTopologyReceiver.terminate();
- terminate = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
index 7786c39..17727ed 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
@@ -21,6 +21,7 @@ package org.apache.stratos.messaging.message.receiver.health.stat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
import org.apache.stratos.messaging.message.processor.health.stat.HealthStatMessageProcessorChain;
@@ -31,18 +32,21 @@ import javax.jms.TextMessage;
* Implements logic for processing health stat event messages based on a given
* topology process chain.
*/
-public class HealthStatEventMessageDelegator implements Runnable {
+class HealthStatEventMessageDelegator implements Runnable {
private static final Log log = LogFactory.getLog(HealthStatEventMessageDelegator.class);
+
+ private HealthStatEventMessageQueue messageQueue;
private MessageProcessorChain processorChain;
private boolean terminated;
- public HealthStatEventMessageDelegator() {
+ public HealthStatEventMessageDelegator(HealthStatEventMessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
this.processorChain = new HealthStatMessageProcessorChain();
}
- public HealthStatEventMessageDelegator(MessageProcessorChain processorChain) {
- this.processorChain = processorChain;
+ public void addEventListener(EventListener eventListener) {
+ processorChain.addEventListener(eventListener);
}
@Override
@@ -54,7 +58,7 @@ public class HealthStatEventMessageDelegator implements Runnable {
while (!terminated) {
try {
- TextMessage message = HealthStatEventMessageQueue.getInstance().take();
+ TextMessage message = messageQueue.take();
String messageText = message.getText();
if (log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
index 733521e..5e818bc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
@@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.health.stat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.message.receiver.tenant.TenantEventMessageQueue;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -36,6 +35,12 @@ public class HealthStatEventMessageListener implements MessageListener {
private static final Log log = LogFactory.getLog(HealthStatEventMessageListener.class);
+ private HealthStatEventMessageQueue messageQueue;
+
+ public HealthStatEventMessageListener(HealthStatEventMessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
@@ -45,7 +50,7 @@ public class HealthStatEventMessageListener implements MessageListener {
log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText()));
}
// Add received message to the queue
- HealthStatEventMessageQueue.getInstance().add(receivedMessage);
+ messageQueue.add(receivedMessage);
} catch (JMSException e) {
log.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
index 801667c..d9fea8f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
@@ -26,20 +26,5 @@ import java.util.concurrent.LinkedBlockingQueue;
/**
* Implements a blocking queue for managing instance notifier event messages.
*/
-public class HealthStatEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
- private static volatile HealthStatEventMessageQueue instance;
-
- private HealthStatEventMessageQueue(){
- }
-
- public static HealthStatEventMessageQueue getInstance() {
- if (instance == null) {
- synchronized (HealthStatEventMessageQueue.class){
- if (instance == null) {
- instance = new HealthStatEventMessageQueue();
- }
- }
- }
- return instance;
- }
+class HealthStatEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
new file mode 100644
index 0000000..8b07180
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.health.stat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * A thread for receiving health stat information from message broker
+ */
+public class HealthStatEventReceiver implements Runnable {
+ private static final Log log = LogFactory.getLog(HealthStatEventReceiver.class);
+
+ private HealthStatEventMessageDelegator messageDelegator;
+ private HealthStatEventMessageListener messageListener;
+ private TopicSubscriber topicSubscriber;
+ private boolean terminated;
+
+ public HealthStatEventReceiver() {
+ HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue();
+ this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue);
+ this.messageListener = new HealthStatEventMessageListener(messageQueue);
+ }
+
+ public void addEventListener(EventListener eventListener) {
+ messageDelegator.addEventListener(eventListener);
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Start topic subscriber thread
+ topicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
+ topicSubscriber.setMessageListener(messageListener);
+ Thread subscriberThread = new Thread(topicSubscriber);
+ subscriberThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Health stats event message receiver thread started");
+ }
+
+ // Start health stat event message delegator thread
+ Thread receiverThread = new Thread(messageDelegator);
+ receiverThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Health stats event message delegator thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Topology receiver failed", e);
+ }
+ }
+ }
+
+ public void terminate() {
+ topicSubscriber.terminate();
+ messageDelegator.terminate();
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
deleted file mode 100644
index 0b33abc..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.receiver.health.stat;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.util.Constants;
-
-/**
- * A thread for receiving health stat information from message broker
- */
-public class HealthStatReceiver implements Runnable {
- private static final Log log = LogFactory.getLog(HealthStatReceiver.class);
- private HealthStatEventMessageDelegator messageDelegator;
- private TopicSubscriber topicSubscriber;
- private boolean terminated;
-
- public HealthStatReceiver() {
- this.messageDelegator = new HealthStatEventMessageDelegator();
- }
-
- public HealthStatReceiver(HealthStatEventMessageDelegator messageDelegator) {
- this.messageDelegator = messageDelegator;
- }
-
- @Override
- public void run() {
- try {
- // Start topic subscriber thread
- topicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
- topicSubscriber.setMessageListener(new HealthStatEventMessageListener());
- Thread subscriberThread = new Thread(topicSubscriber);
- subscriberThread.start();
- if (log.isDebugEnabled()) {
- log.debug("Health stats event message receiver thread started");
- }
-
- // Start health stat event message delegator thread
- Thread receiverThread = new Thread(messageDelegator);
- receiverThread.start();
- if (log.isDebugEnabled()) {
- log.debug("Health stats event message delegator thread started");
- }
-
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("Topology receiver failed", e);
- }
- }
- }
-
- public void terminate() {
- topicSubscriber.terminate();
- messageDelegator.terminate();
- terminated = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
index 3ad3015..f086b8c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
@@ -21,6 +21,7 @@ package org.apache.stratos.messaging.message.receiver.instance.notifier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
import org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain;
import org.apache.stratos.messaging.util.Constants;
@@ -32,18 +33,20 @@ import javax.jms.TextMessage;
* Implements logic for processing instance notifier event messages based on a given
* topology process chain.
*/
-public class InstanceNotifierEventMessageDelegator implements Runnable {
+class InstanceNotifierEventMessageDelegator implements Runnable {
private static final Log log = LogFactory.getLog(InstanceNotifierEventMessageDelegator.class);
+ private InstanceNotifierEventMessageQueue messageQueue;
private MessageProcessorChain processorChain;
private boolean terminated;
- public InstanceNotifierEventMessageDelegator() {
+ public InstanceNotifierEventMessageDelegator(InstanceNotifierEventMessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
this.processorChain = new InstanceNotifierMessageProcessorChain();
}
- public InstanceNotifierEventMessageDelegator(MessageProcessorChain processorChain) {
- this.processorChain = processorChain;
+ public void addEventListener(EventListener eventListener) {
+ processorChain.addEventListener(eventListener);
}
@Override
@@ -55,7 +58,7 @@ public class InstanceNotifierEventMessageDelegator implements Runnable {
while (!terminated) {
try {
- TextMessage message = InstanceNotifierEventMessageQueue.getInstance().take();
+ TextMessage message = messageQueue.take();
// Retrieve the header
String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
index 4e134ac..d8cc6b5 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
@@ -31,10 +31,16 @@ import javax.jms.TextMessage;
* Implements functionality for receiving text based event messages from the instance notifier
* message broker topic and add them to the event queue.
*/
-public class InstanceNotifierEventMessageListener implements MessageListener {
+class InstanceNotifierEventMessageListener implements MessageListener {
private static final Log log = LogFactory.getLog(InstanceNotifierEventMessageListener.class);
+ private InstanceNotifierEventMessageQueue messageQueue;
+
+ public InstanceNotifierEventMessageListener(InstanceNotifierEventMessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
@@ -44,7 +50,7 @@ public class InstanceNotifierEventMessageListener implements MessageListener {
log.debug(String.format("Instance notifier message received: %s", ((TextMessage) message).getText()));
}
// Add received message to the queue
- InstanceNotifierEventMessageQueue.getInstance().add(receivedMessage);
+ messageQueue.add(receivedMessage);
} catch (JMSException e) {
log.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
index 1a49fac..a27f586 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
@@ -25,20 +25,5 @@ import java.util.concurrent.LinkedBlockingQueue;
/**
* Implements a blocking queue for managing instance notifier event messages.
*/
-public class InstanceNotifierEventMessageQueue extends LinkedBlockingQueue<TextMessage>{
- private static volatile InstanceNotifierEventMessageQueue instance;
-
- private InstanceNotifierEventMessageQueue(){
- }
-
- public static InstanceNotifierEventMessageQueue getInstance() {
- if (instance == null) {
- synchronized (InstanceNotifierEventMessageQueue.class){
- if (instance == null) {
- instance = new InstanceNotifierEventMessageQueue();
- }
- }
- }
- return instance;
- }
+class InstanceNotifierEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java
deleted file mode 100644
index 88f11f3..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.receiver.instance.notifier;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.util.Constants;
-
-/**
- * A thread for receiving instance notifier information from message broker.
- */
-public class InstanceNotifierEventMessageReceiver implements Runnable {
- private static final Log log = LogFactory.getLog(InstanceNotifierEventMessageReceiver.class);
- private InstanceNotifierEventMessageDelegator messageDelegator;
- private TopicSubscriber topicSubscriber;
- private boolean terminated;
-
- public InstanceNotifierEventMessageReceiver() {
- this.messageDelegator = new InstanceNotifierEventMessageDelegator();
- }
-
- public InstanceNotifierEventMessageReceiver(InstanceNotifierEventMessageDelegator messageDelegator) {
- this.messageDelegator = messageDelegator;
- }
-
- @Override
- public void run() {
- try {
- // Start topic subscriber thread
- topicSubscriber = new TopicSubscriber(Constants.INSTANCE_NOTIFIER_TOPIC);
- topicSubscriber.setMessageListener(new InstanceNotifierEventMessageListener());
- Thread subscriberThread = new Thread(topicSubscriber);
- subscriberThread.start();
- if (log.isDebugEnabled()) {
- log.debug("InstanceNotifier event message receiver thread started");
- }
-
- // Start instance notifier event message delegator thread
- Thread receiverThread = new Thread(messageDelegator);
- receiverThread.start();
- if (log.isDebugEnabled()) {
- log.debug("InstanceNotifier event message delegator thread started");
- }
-
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("InstanceNotifier receiver failed", e);
- }
- }
- }
-
- public boolean isSubscribed() {
- return ((topicSubscriber != null) && (topicSubscriber.isSubscribed()));
- }
-
- public void terminate() {
- topicSubscriber.terminate();
- messageDelegator.terminate();
- terminated = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
new file mode 100644
index 0000000..57fea76
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.instance.notifier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * A thread for receiving instance notifier information from message broker.
+ */
+public class InstanceNotifierEventReceiver implements Runnable {
+ private static final Log log = LogFactory.getLog(InstanceNotifierEventReceiver.class);
+ private InstanceNotifierEventMessageDelegator messageDelegator;
+ private InstanceNotifierEventMessageListener messageListener;
+ private TopicSubscriber topicSubscriber;
+ private boolean terminated;
+
+ public InstanceNotifierEventReceiver() {
+ InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue();
+ this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue);
+ this.messageListener = new InstanceNotifierEventMessageListener(messageQueue);
+ }
+
+ public void addEventListener(EventListener eventListener) {
+ messageDelegator.addEventListener(eventListener);
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Start topic subscriber thread
+ topicSubscriber = new TopicSubscriber(Constants.INSTANCE_NOTIFIER_TOPIC);
+ topicSubscriber.setMessageListener(messageListener);
+ Thread subscriberThread = new Thread(topicSubscriber);
+ subscriberThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("InstanceNotifier event message receiver thread started");
+ }
+
+ // Start instance notifier event message delegator thread
+ Thread receiverThread = new Thread(messageDelegator);
+ receiverThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("InstanceNotifier event message delegator thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("InstanceNotifier receiver failed", e);
+ }
+ }
+ }
+
+ public boolean isSubscribed() {
+ return ((topicSubscriber != null) && (topicSubscriber.isSubscribed()));
+ }
+
+ public void terminate() {
+ topicSubscriber.terminate();
+ messageDelegator.terminate();
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
index ef68da2..de05a34 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
@@ -21,6 +21,7 @@ package org.apache.stratos.messaging.message.receiver.tenant;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
import org.apache.stratos.messaging.message.processor.tenant.TenantMessageProcessorChain;
import org.apache.stratos.messaging.util.Constants;
@@ -32,18 +33,21 @@ import javax.jms.TextMessage;
* Implements logic for processing topology event messages based on a given
* topology process chain.
*/
-public class TenantEventMessageDelegator implements Runnable {
+class TenantEventMessageDelegator implements Runnable {
private static final Log log = LogFactory.getLog(TenantEventMessageDelegator.class);
+
+ private TenantEventMessageQueue messageQueue;
private MessageProcessorChain processorChain;
private boolean terminated;
- public TenantEventMessageDelegator() {
+ public TenantEventMessageDelegator(TenantEventMessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
this.processorChain = new TenantMessageProcessorChain();
}
- public TenantEventMessageDelegator(MessageProcessorChain processorChain) {
- this.processorChain = processorChain;
+ public void addEventListener(EventListener eventListener) {
+ processorChain.addEventListener(eventListener);
}
@Override
@@ -55,7 +59,7 @@ public class TenantEventMessageDelegator implements Runnable {
while (!terminated) {
try {
- TextMessage message = TenantEventMessageQueue.getInstance().take();
+ TextMessage message = messageQueue.take();
// Retrieve the header
String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);