You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/04/18 19:15:09 UTC

[1/2] git commit: Introduced an event publisher pool to avoid large number of connections being created to message broker

Repository: incubator-stratos
Updated Branches:
  refs/heads/master 270e3d1e4 -> e50c61e11


Introduced an event publisher pool to avoid large number of connections  being created to message broker


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/f3f2d136
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/f3f2d136
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/f3f2d136

Branch: refs/heads/master
Commit: f3f2d136dcb26fe65568655b435fc27538348bc2
Parents: c59ac28
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Apr 18 22:43:21 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Apr 18 22:43:21 2014 +0530

----------------------------------------------------------------------
 .../apache/stratos/cartridge/agent/Main.java    | 20 +++++-
 .../publisher/CartridgeAgentEventPublisher.java | 10 +--
 .../internal/CloudControllerDSComponent.java    | 35 ++++++----
 .../runtime/FasterLookUpDataHolder.java         | 20 ------
 .../topology/TopologyEventPublisher.java        |  3 +-
 .../internal/ADCManagementServerComponent.java  |  6 +-
 .../stratos/manager/internal/DataHolder.java    | 12 ----
 .../InstanceNotificationPublisher.java          |  7 +-
 .../manager/publisher/TenantEventPublisher.java |  7 +-
 .../publisher/TenantSynzhronizerTask.java       |  3 +-
 .../utils/CartridgeSubscriptionUtils.java       |  5 +-
 .../broker/publish/EventPublisher.java          |  2 +-
 .../broker/publish/EventPublisherPool.java      | 68 ++++++++++++++++++++
 .../broker/publish/TopicPublisher.java          | 16 ++---
 14 files changed, 143 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
index 3bf73e7..a1be237 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
@@ -19,13 +19,15 @@
 
 package org.apache.stratos.cartridge.agent;
 
-import java.lang.reflect.Constructor;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
 import org.apache.stratos.cartridge.agent.config.configurator.JndiConfigurator;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.lang.reflect.Constructor;
 
 /**
  * Cartridge agent main class.
@@ -37,6 +39,20 @@ public class Main {
 
     public static void main(String[] args) {
         try {
+            // Add shutdown hook
+            final Thread mainThread = Thread.currentThread();
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                public void run() {
+                    try {
+                        // Close event publisher connections to message broker
+                        EventPublisherPool.close(Constants.INSTANCE_STATUS_TOPIC);
+                        mainThread.join();
+                    } catch (Exception e) {
+                        log.error(e);
+                    }
+                }
+            });
+
             // Configure log4j properties
             if(log.isDebugEnabled()) {
                 log.debug("Configuring log4j.properties file path");

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
index 367f61d..9c2e21f 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
@@ -27,6 +27,7 @@ import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
 import org.apache.stratos.cartridge.agent.statistics.publisher.HealthStatisticsNotifier;
 import org.apache.stratos.cartridge.agent.util.ExtensionUtils;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
 import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent;
 import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent;
@@ -55,7 +56,7 @@ public class CartridgeAgentEventPublisher {
                     CartridgeAgentConfiguration.getInstance().getPartitionId(),
                     CartridgeAgentConfiguration.getInstance().getMemberId());
 
-            EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+            EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
             eventPublisher.publish(event);
             setStarted(true);
             if (log.isInfoEnabled()) {
@@ -82,7 +83,8 @@ public class CartridgeAgentEventPublisher {
                     CartridgeAgentConfiguration.getInstance().getPartitionId(),
                     CartridgeAgentConfiguration.getInstance().getMemberId());
 
-            EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+            // Event publisher connection will
+            EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
             eventPublisher.publish(event);
             if (log.isInfoEnabled()) {
                 log.info("Instance activated event published");
@@ -118,7 +120,7 @@ public class CartridgeAgentEventPublisher {
                     CartridgeAgentConfiguration.getInstance().getPartitionId(),
                     CartridgeAgentConfiguration.getInstance().getMemberId());
 
-            EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+            EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
             eventPublisher.publish(event);
             setReadyToShutdown(true);
             if (log.isInfoEnabled()) {
@@ -143,7 +145,7 @@ public class CartridgeAgentEventPublisher {
                     CartridgeAgentConfiguration.getInstance().getPartitionId(),
                     CartridgeAgentConfiguration.getInstance().getMemberId());
 
-            EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+            EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
             eventPublisher.publish(event);
             setMaintenance(true);
             if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
index 9cb2869..77e3ac4 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
@@ -32,19 +32,16 @@ import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusE
 import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.util.Constants;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.ComponentContext;
-import org.wso2.carbon.context.PrivilegedCarbonContext;
 import org.wso2.carbon.ntask.core.service.TaskService;
 import org.wso2.carbon.registry.core.exceptions.RegistryException;
 import org.wso2.carbon.registry.core.service.RegistryService;
 import org.wso2.carbon.registry.core.session.UserRegistry;
 import org.wso2.carbon.utils.ConfigurationContextService;
-import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
-
-import java.util.List;
 
 /**
  * Registering Cloud Controller Service.
@@ -57,7 +54,24 @@ import java.util.List;
  *                interface=
  *                "org.wso2.carbon.registry.core.service.RegistryService"
  *                cardinality="1..1" policy="dynamic" bind="setRegistryService"
- *                unbind="unsetRegistryService"   
+ *                unbind="unsetR/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */egistryService"
  * @scr.reference name="config.context.service"
  *                interface="org.wso2.carbon.utils.ConfigurationContextService"
  *                cardinality="1..1" policy="dynamic"
@@ -87,7 +101,7 @@ public class CloudControllerDSComponent {
             Thread tdelegator = new Thread(delegator);
             tdelegator.start();
         	
-        	// Register cloud controller service
+        	// Register cloud controller service                                                   E
             BundleContext bundleContext = context.getBundleContext();
             bundleContext.registerService(CloudControllerService.class.getName(), new CloudControllerServiceImpl(), null);
 
@@ -151,11 +165,8 @@ public class CloudControllerDSComponent {
     }
 	
 	protected void deactivate(ComponentContext ctx) {
-
-		List<EventPublisher> publishers = dataHolder.getAllEventPublishers();
-		for (EventPublisher topicPublisher : publishers) {
-			topicPublisher.close();
-		}
+        // Close event publisher connections to message broker
+        EventPublisherPool.close(Constants.TOPOLOGY_TOPIC);
 	}
 	
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
index 970e2c0..9b05b5d 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
@@ -22,12 +22,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.pojo.*;
 import org.apache.stratos.cloud.controller.registry.RegistryManager;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -88,12 +86,6 @@ public class FasterLookUpDataHolder implements Serializable{
 	private transient DataPublisherConfig dataPubConfig;
 	private boolean enableTopologySync;
 	private transient TopologyConfig topologyConfig;
-	
-	/**
-     * Key - name of the topic
-     * Value - corresponding EventPublisher
-     */
-    private transient Map<String, EventPublisher> topicToPublisherMap = new HashMap<String, EventPublisher>();
 
 	private transient AsyncDataPublisher dataPublisher;
 	private String streamId;
@@ -244,18 +236,6 @@ public class FasterLookUpDataHolder implements Serializable{
 	public void setTopologyConfig(TopologyConfig topologyConfig) {
 		this.topologyConfig = topologyConfig;
 	}
-	
-	public EventPublisher getEventPublisher(String topic){
-    	return topicToPublisherMap.get(topic);
-    }
-	
-	public List<EventPublisher> getAllEventPublishers() {
-		return new ArrayList<EventPublisher>(topicToPublisherMap.values());
-	}
-	
-    public void addEventPublisher(EventPublisher publisher, String topicName) {
-        topicToPublisherMap.put(topicName, publisher);
-    }
 
     public DataPublisherConfig getDataPubConfig() {
         return dataPubConfig;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
index c039af7..86237d8 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
@@ -23,6 +23,7 @@ import org.apache.stratos.cloud.controller.pojo.Cartridge;
 import org.apache.stratos.cloud.controller.pojo.ClusterContext;
 import org.apache.stratos.cloud.controller.pojo.PortMapping;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Port;
 import org.apache.stratos.messaging.domain.topology.ServiceType;
@@ -162,7 +163,7 @@ public class TopologyEventPublisher {
     }
 
     public static void publishEvent(Event event) {
-        EventPublisher eventPublisher = new EventPublisher(Constants.TOPOLOGY_TOPIC);
+        EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TOPOLOGY_TOPIC);
         eventPublisher.publish(event);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index c843fdb..6857bd6 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -26,7 +26,7 @@ import org.apache.stratos.manager.publisher.TenantSynchronizerTaskScheduler;
 import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
 import org.apache.stratos.manager.topology.receiver.StratosManagerTopologyEventReceiver;
 import org.apache.stratos.manager.utils.CartridgeConfigFileReader;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
 import org.apache.stratos.messaging.util.Constants;
 import org.osgi.service.component.ComponentContext;
@@ -65,7 +65,6 @@ public class ADCManagementServerComponent {
     protected void activate(ComponentContext componentContext) throws Exception {
 		try {
 			CartridgeConfigFileReader.readProperties();
-			DataHolder.setEventPublisher(new EventPublisher(Constants.INSTANCE_NOTIFIER_TOPIC));
 
             // Schedule complete tenant event synchronizer
             if(log.isDebugEnabled()) {
@@ -172,6 +171,9 @@ public class ADCManagementServerComponent {
     }
 
     protected void deactivate(ComponentContext context) {
+        // Close event publisher connections to message broker
+        EventPublisherPool.close(Constants.INSTANCE_NOTIFIER_TOPIC);
+        EventPublisherPool.close(Constants.TENANT_TOPIC);
 
         //terminate Stratos Manager Topology Receiver
         stratosManagerTopologyEventReceiver.terminate();

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
index 496a54c..07f21de 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
@@ -20,7 +20,6 @@
 package org.apache.stratos.manager.internal;
 
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.wso2.carbon.registry.core.service.RegistryService;
 import org.wso2.carbon.user.core.service.RealmService;
 import org.wso2.carbon.utils.CarbonUtils;
@@ -34,8 +33,6 @@ public class DataHolder {
 
 	private static RealmService realmService;
 	private static RegistryService registryService;
-	//private static TopologyManagementService topologyMgtService;
-	private static EventPublisher eventPublisher;
 
 	public static RealmService getRealmService() {
 		return realmService;
@@ -70,13 +67,4 @@ public class DataHolder {
 	public static void setRegistryService(RegistryService registryService) {
 		DataHolder.registryService = registryService;
 	}
-
-	public static EventPublisher getEventPublisher() {
-		return eventPublisher;
-	}
-
-	public static void setEventPublisher(EventPublisher eventPublisher) {
-		DataHolder.eventPublisher = eventPublisher;
-	}	
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
index f8aea72..e10d4ff 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
@@ -20,13 +20,14 @@ package org.apache.stratos.manager.publisher;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.internal.DataHolder;
 import org.apache.stratos.manager.repository.Repository;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
-import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
 import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent;
+import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
+import org.apache.stratos.messaging.util.Constants;
 
 /**
  * Creating the relevant instance notification event and publish it to the instances.
@@ -38,7 +39,7 @@ public class InstanceNotificationPublisher {
     }
 
     private void publish(Event event) {
-        EventPublisher depsyncEventPublisher = DataHolder.getEventPublisher();
+        EventPublisher depsyncEventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_NOTIFIER_TOPIC);
         depsyncEventPublisher.publish(event);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
index 2df631a..8213ed9 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
@@ -25,6 +25,7 @@ import org.apache.stratos.common.beans.TenantInfoBean;
 import org.apache.stratos.common.exception.StratosException;
 import org.apache.stratos.common.listeners.TenantMgtListener;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.event.tenant.TenantCreatedEvent;
 import org.apache.stratos.messaging.event.tenant.TenantRemovedEvent;
@@ -49,7 +50,7 @@ public class TenantEventPublisher implements TenantMgtListener {
                 }
                 Tenant tenant = new Tenant(tenantInfo.getTenantId(), tenantInfo.getTenantDomain());
                 TenantCreatedEvent event = new TenantCreatedEvent(tenant);
-                EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+                EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
                 eventPublisher.publish(event);
             }
             catch (Exception e) {
@@ -64,7 +65,7 @@ public class TenantEventPublisher implements TenantMgtListener {
                     log.info(String.format("Publishing tenant updated event: [tenant-id] %d [tenant-domain] %s", tenantInfo.getTenantId(), tenantInfo.getTenantDomain()));
                 }
                 TenantUpdatedEvent event = new TenantUpdatedEvent(tenantInfo.getTenantId(), tenantInfo.getTenantDomain());
-                EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+                EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
                 eventPublisher.publish(event);
             }
             catch (Exception e) {
@@ -79,7 +80,7 @@ public class TenantEventPublisher implements TenantMgtListener {
                     log.info(String.format("Publishing tenant removed event: [tenant-id] %d", tenantId));
                 }
                 TenantRemovedEvent event = new TenantRemovedEvent(tenantId);
-                EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+                EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
                 eventPublisher.publish(event);
             }
             catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
index af5cd5f..3eac3f5 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
@@ -25,6 +25,7 @@ import org.apache.stratos.manager.internal.DataHolder;
 import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
 import org.apache.stratos.manager.subscription.CartridgeSubscription;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
 import org.apache.stratos.messaging.util.Constants;
@@ -81,7 +82,7 @@ public class TenantSynzhronizerTask implements Task {
                 tenants.add(tenant);
             }
             CompleteTenantEvent event = new CompleteTenantEvent(tenants);
-            EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+            EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
             eventPublisher.publish(event);
         } catch (Exception e) {
             if (log.isErrorEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
index cd50fd8..a5c5517 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
@@ -39,6 +39,7 @@ import org.apache.stratos.manager.repository.Repository;
 import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
 import org.apache.stratos.manager.subscriber.Subscriber;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
 import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent;
 import org.apache.stratos.messaging.util.Constants;
@@ -166,7 +167,7 @@ public class CartridgeSubscriptionUtils {
 					log.info(String.format("Publishing tenant subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName));
 				}
 				TenantSubscribedEvent subscribedEvent = new TenantSubscribedEvent(tenantId, serviceName);
-				EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+				EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
 				eventPublisher.publish(subscribedEvent);
 			} catch (Exception e) {
 				if (log.isErrorEnabled()) {
@@ -196,7 +197,7 @@ public class CartridgeSubscriptionUtils {
                 log.info(String.format("Publishing tenant un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName));
             }
             TenantUnSubscribedEvent event = new TenantUnSubscribedEvent(tenantId, serviceName);
-            EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+            EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
             eventPublisher.publish(event);
         } catch (Exception e) {
             if (log.isErrorEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 1e11142..5d39956 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -34,7 +34,7 @@ public class EventPublisher extends TopicPublisher {
     /**
      * @param topicName topic name of this publisher instance.
      */
-    public EventPublisher(String topicName) {
+    EventPublisher(String topicName) {
         super(topicName);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
new file mode 100644
index 0000000..175d09b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.broker.publish;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Event publisher instance pool.
+ */
+public class EventPublisherPool {
+    private static final Log log = LogFactory.getLog(EventPublisherPool.class);
+    private static Map<String, EventPublisher> topicNameEventPublisherMap = new HashMap<String, EventPublisher>();
+
+    public static EventPublisher getPublisher(String topicName) {
+        synchronized (EventPublisherPool.class) {
+            if(topicNameEventPublisherMap.containsKey(topicName)) {
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Event publisher fetched from pool: [topic] %s", topicName));
+                }
+                return topicNameEventPublisherMap.get(topicName);
+            }
+            EventPublisher eventPublisher = new EventPublisher(topicName);
+            topicNameEventPublisherMap.put(topicName, eventPublisher);
+            if(log.isDebugEnabled()) {
+                log.debug(String.format("Event publisher instance created: [topic] %s", topicName));
+            }
+            return eventPublisher;
+        }
+    }
+
+    public static void close(String topicName) {
+        synchronized (EventPublisherPool.class) {
+            if(topicNameEventPublisherMap.containsKey(topicName)) {
+                topicNameEventPublisherMap.get(topicName).close();
+                topicNameEventPublisherMap.remove(topicName);
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Event publisher closed and removed from pool: [topic] %s", topicName));
+                }
+            }
+            else {
+                if(log.isWarnEnabled()) {
+                    log.warn(String.format("Event publisher instance not found in pool: [topic] %s", topicName));
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index 6614e75..004be13 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -19,18 +19,18 @@
 
 package org.apache.stratos.messaging.broker.publish;
 
-import java.util.Enumeration;
-import java.util.Properties;
-
-import javax.jms.*;
-
+import com.google.gson.Gson;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.connect.TopicConnector;
 import org.apache.stratos.messaging.publish.MessagePublisher;
 
-import com.google.gson.Gson;
-import org.apache.stratos.messaging.util.Constants;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import java.util.Enumeration;
+import java.util.Properties;
 
 /**
  * Any instance who needs to publish data to a topic, should communicate with
@@ -53,7 +53,7 @@ public class TopicPublisher extends MessagePublisher {
 	 * @param aTopicName
 	 *            topic name of this publisher instance.
 	 */
-	public TopicPublisher(String aTopicName) {
+	TopicPublisher(String aTopicName) {
 		super(aTopicName);
 		connector = new TopicConnector();
 	}


[2/2] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos

Posted by im...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/e50c61e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/e50c61e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/e50c61e1

Branch: refs/heads/master
Commit: e50c61e11a19b1c87c73b91e93367a7e85c70c68
Parents: f3f2d13 270e3d1
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Apr 18 22:44:38 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Apr 18 22:44:38 2014 +0530

----------------------------------------------------------------------
 README.md                                       |   2 +-
 .../console/LICENSE                             | 191 -------------------
 extensions/cep/stratos-cep-extension/pom.xml    |  10 +-
 products/stratos/modules/distribution/pom.xml   |   2 +-
 .../distribution/src/main/license/LICENSE       |   4 +-
 .../agent/templates/extensions/addons/_php.erb  |   2 +-
 tools/puppet3/modules/lb/LICENSE                |  13 --
 7 files changed, 13 insertions(+), 211 deletions(-)
----------------------------------------------------------------------