You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/04/18 19:15:09 UTC
[1/2] git commit: Introduced an event publisher pool to avoid large
number of connections being created to message broker
Repository: incubator-stratos
Updated Branches:
refs/heads/master 270e3d1e4 -> e50c61e11
Introduced an event publisher pool to avoid large number of connections being created to message broker
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/f3f2d136
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/f3f2d136
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/f3f2d136
Branch: refs/heads/master
Commit: f3f2d136dcb26fe65568655b435fc27538348bc2
Parents: c59ac28
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Apr 18 22:43:21 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Apr 18 22:43:21 2014 +0530
----------------------------------------------------------------------
.../apache/stratos/cartridge/agent/Main.java | 20 +++++-
.../publisher/CartridgeAgentEventPublisher.java | 10 +--
.../internal/CloudControllerDSComponent.java | 35 ++++++----
.../runtime/FasterLookUpDataHolder.java | 20 ------
.../topology/TopologyEventPublisher.java | 3 +-
.../internal/ADCManagementServerComponent.java | 6 +-
.../stratos/manager/internal/DataHolder.java | 12 ----
.../InstanceNotificationPublisher.java | 7 +-
.../manager/publisher/TenantEventPublisher.java | 7 +-
.../publisher/TenantSynzhronizerTask.java | 3 +-
.../utils/CartridgeSubscriptionUtils.java | 5 +-
.../broker/publish/EventPublisher.java | 2 +-
.../broker/publish/EventPublisherPool.java | 68 ++++++++++++++++++++
.../broker/publish/TopicPublisher.java | 16 ++---
14 files changed, 143 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
index 3bf73e7..a1be237 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
@@ -19,13 +19,15 @@
package org.apache.stratos.cartridge.agent;
-import java.lang.reflect.Constructor;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.PropertyConfigurator;
import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
import org.apache.stratos.cartridge.agent.config.configurator.JndiConfigurator;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.lang.reflect.Constructor;
/**
* Cartridge agent main class.
@@ -37,6 +39,20 @@ public class Main {
public static void main(String[] args) {
try {
+ // Add shutdown hook
+ final Thread mainThread = Thread.currentThread();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ // Close event publisher connections to message broker
+ EventPublisherPool.close(Constants.INSTANCE_STATUS_TOPIC);
+ mainThread.join();
+ } catch (Exception e) {
+ log.error(e);
+ }
+ }
+ });
+
// Configure log4j properties
if(log.isDebugEnabled()) {
log.debug("Configuring log4j.properties file path");
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
index 367f61d..9c2e21f 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
@@ -27,6 +27,7 @@ import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
import org.apache.stratos.cartridge.agent.statistics.publisher.HealthStatisticsNotifier;
import org.apache.stratos.cartridge.agent.util.ExtensionUtils;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent;
import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent;
@@ -55,7 +56,7 @@ public class CartridgeAgentEventPublisher {
CartridgeAgentConfiguration.getInstance().getPartitionId(),
CartridgeAgentConfiguration.getInstance().getMemberId());
- EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
eventPublisher.publish(event);
setStarted(true);
if (log.isInfoEnabled()) {
@@ -82,7 +83,8 @@ public class CartridgeAgentEventPublisher {
CartridgeAgentConfiguration.getInstance().getPartitionId(),
CartridgeAgentConfiguration.getInstance().getMemberId());
- EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+ // Event publisher connection will
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
eventPublisher.publish(event);
if (log.isInfoEnabled()) {
log.info("Instance activated event published");
@@ -118,7 +120,7 @@ public class CartridgeAgentEventPublisher {
CartridgeAgentConfiguration.getInstance().getPartitionId(),
CartridgeAgentConfiguration.getInstance().getMemberId());
- EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
eventPublisher.publish(event);
setReadyToShutdown(true);
if (log.isInfoEnabled()) {
@@ -143,7 +145,7 @@ public class CartridgeAgentEventPublisher {
CartridgeAgentConfiguration.getInstance().getPartitionId(),
CartridgeAgentConfiguration.getInstance().getMemberId());
- EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
eventPublisher.publish(event);
setMaintenance(true);
if (log.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
index 9cb2869..77e3ac4 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
@@ -32,19 +32,16 @@ import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusE
import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener;
import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.util.Constants;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
-import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.ntask.core.service.TaskService;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import org.wso2.carbon.registry.core.service.RegistryService;
import org.wso2.carbon.registry.core.session.UserRegistry;
import org.wso2.carbon.utils.ConfigurationContextService;
-import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
-
-import java.util.List;
/**
* Registering Cloud Controller Service.
@@ -57,7 +54,24 @@ import java.util.List;
* interface=
* "org.wso2.carbon.registry.core.service.RegistryService"
* cardinality="1..1" policy="dynamic" bind="setRegistryService"
- * unbind="unsetRegistryService"
+ * unbind="unsetR/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */egistryService"
* @scr.reference name="config.context.service"
* interface="org.wso2.carbon.utils.ConfigurationContextService"
* cardinality="1..1" policy="dynamic"
@@ -87,7 +101,7 @@ public class CloudControllerDSComponent {
Thread tdelegator = new Thread(delegator);
tdelegator.start();
- // Register cloud controller service
+ // Register cloud controller service E
BundleContext bundleContext = context.getBundleContext();
bundleContext.registerService(CloudControllerService.class.getName(), new CloudControllerServiceImpl(), null);
@@ -151,11 +165,8 @@ public class CloudControllerDSComponent {
}
protected void deactivate(ComponentContext ctx) {
-
- List<EventPublisher> publishers = dataHolder.getAllEventPublishers();
- for (EventPublisher topicPublisher : publishers) {
- topicPublisher.close();
- }
+ // Close event publisher connections to message broker
+ EventPublisherPool.close(Constants.TOPOLOGY_TOPIC);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
index 970e2c0..9b05b5d 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
@@ -22,12 +22,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cloud.controller.pojo.*;
import org.apache.stratos.cloud.controller.registry.RegistryManager;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -88,12 +86,6 @@ public class FasterLookUpDataHolder implements Serializable{
private transient DataPublisherConfig dataPubConfig;
private boolean enableTopologySync;
private transient TopologyConfig topologyConfig;
-
- /**
- * Key - name of the topic
- * Value - corresponding EventPublisher
- */
- private transient Map<String, EventPublisher> topicToPublisherMap = new HashMap<String, EventPublisher>();
private transient AsyncDataPublisher dataPublisher;
private String streamId;
@@ -244,18 +236,6 @@ public class FasterLookUpDataHolder implements Serializable{
public void setTopologyConfig(TopologyConfig topologyConfig) {
this.topologyConfig = topologyConfig;
}
-
- public EventPublisher getEventPublisher(String topic){
- return topicToPublisherMap.get(topic);
- }
-
- public List<EventPublisher> getAllEventPublishers() {
- return new ArrayList<EventPublisher>(topicToPublisherMap.values());
- }
-
- public void addEventPublisher(EventPublisher publisher, String topicName) {
- topicToPublisherMap.put(topicName, publisher);
- }
public DataPublisherConfig getDataPubConfig() {
return dataPubConfig;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
index c039af7..86237d8 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
@@ -23,6 +23,7 @@ import org.apache.stratos.cloud.controller.pojo.Cartridge;
import org.apache.stratos.cloud.controller.pojo.ClusterContext;
import org.apache.stratos.cloud.controller.pojo.PortMapping;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Port;
import org.apache.stratos.messaging.domain.topology.ServiceType;
@@ -162,7 +163,7 @@ public class TopologyEventPublisher {
}
public static void publishEvent(Event event) {
- EventPublisher eventPublisher = new EventPublisher(Constants.TOPOLOGY_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TOPOLOGY_TOPIC);
eventPublisher.publish(event);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index c843fdb..6857bd6 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -26,7 +26,7 @@ import org.apache.stratos.manager.publisher.TenantSynchronizerTaskScheduler;
import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
import org.apache.stratos.manager.topology.receiver.StratosManagerTopologyEventReceiver;
import org.apache.stratos.manager.utils.CartridgeConfigFileReader;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
import org.apache.stratos.messaging.util.Constants;
import org.osgi.service.component.ComponentContext;
@@ -65,7 +65,6 @@ public class ADCManagementServerComponent {
protected void activate(ComponentContext componentContext) throws Exception {
try {
CartridgeConfigFileReader.readProperties();
- DataHolder.setEventPublisher(new EventPublisher(Constants.INSTANCE_NOTIFIER_TOPIC));
// Schedule complete tenant event synchronizer
if(log.isDebugEnabled()) {
@@ -172,6 +171,9 @@ public class ADCManagementServerComponent {
}
protected void deactivate(ComponentContext context) {
+ // Close event publisher connections to message broker
+ EventPublisherPool.close(Constants.INSTANCE_NOTIFIER_TOPIC);
+ EventPublisherPool.close(Constants.TENANT_TOPIC);
//terminate Stratos Manager Topology Receiver
stratosManagerTopologyEventReceiver.terminate();
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
index 496a54c..07f21de 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
@@ -20,7 +20,6 @@
package org.apache.stratos.manager.internal;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.wso2.carbon.registry.core.service.RegistryService;
import org.wso2.carbon.user.core.service.RealmService;
import org.wso2.carbon.utils.CarbonUtils;
@@ -34,8 +33,6 @@ public class DataHolder {
private static RealmService realmService;
private static RegistryService registryService;
- //private static TopologyManagementService topologyMgtService;
- private static EventPublisher eventPublisher;
public static RealmService getRealmService() {
return realmService;
@@ -70,13 +67,4 @@ public class DataHolder {
public static void setRegistryService(RegistryService registryService) {
DataHolder.registryService = registryService;
}
-
- public static EventPublisher getEventPublisher() {
- return eventPublisher;
- }
-
- public static void setEventPublisher(EventPublisher eventPublisher) {
- DataHolder.eventPublisher = eventPublisher;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
index f8aea72..e10d4ff 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
@@ -20,13 +20,14 @@ package org.apache.stratos.manager.publisher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.internal.DataHolder;
import org.apache.stratos.manager.repository.Repository;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
-import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent;
+import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
+import org.apache.stratos.messaging.util.Constants;
/**
* Creating the relevant instance notification event and publish it to the instances.
@@ -38,7 +39,7 @@ public class InstanceNotificationPublisher {
}
private void publish(Event event) {
- EventPublisher depsyncEventPublisher = DataHolder.getEventPublisher();
+ EventPublisher depsyncEventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_NOTIFIER_TOPIC);
depsyncEventPublisher.publish(event);
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
index 2df631a..8213ed9 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
@@ -25,6 +25,7 @@ import org.apache.stratos.common.beans.TenantInfoBean;
import org.apache.stratos.common.exception.StratosException;
import org.apache.stratos.common.listeners.TenantMgtListener;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.tenant.Tenant;
import org.apache.stratos.messaging.event.tenant.TenantCreatedEvent;
import org.apache.stratos.messaging.event.tenant.TenantRemovedEvent;
@@ -49,7 +50,7 @@ public class TenantEventPublisher implements TenantMgtListener {
}
Tenant tenant = new Tenant(tenantInfo.getTenantId(), tenantInfo.getTenantDomain());
TenantCreatedEvent event = new TenantCreatedEvent(tenant);
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
}
catch (Exception e) {
@@ -64,7 +65,7 @@ public class TenantEventPublisher implements TenantMgtListener {
log.info(String.format("Publishing tenant updated event: [tenant-id] %d [tenant-domain] %s", tenantInfo.getTenantId(), tenantInfo.getTenantDomain()));
}
TenantUpdatedEvent event = new TenantUpdatedEvent(tenantInfo.getTenantId(), tenantInfo.getTenantDomain());
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
}
catch (Exception e) {
@@ -79,7 +80,7 @@ public class TenantEventPublisher implements TenantMgtListener {
log.info(String.format("Publishing tenant removed event: [tenant-id] %d", tenantId));
}
TenantRemovedEvent event = new TenantRemovedEvent(tenantId);
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
index af5cd5f..3eac3f5 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
@@ -25,6 +25,7 @@ import org.apache.stratos.manager.internal.DataHolder;
import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
import org.apache.stratos.manager.subscription.CartridgeSubscription;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.tenant.Tenant;
import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
import org.apache.stratos.messaging.util.Constants;
@@ -81,7 +82,7 @@ public class TenantSynzhronizerTask implements Task {
tenants.add(tenant);
}
CompleteTenantEvent event = new CompleteTenantEvent(tenants);
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
} catch (Exception e) {
if (log.isErrorEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
index cd50fd8..a5c5517 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
@@ -39,6 +39,7 @@ import org.apache.stratos.manager.repository.Repository;
import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
import org.apache.stratos.manager.subscriber.Subscriber;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent;
import org.apache.stratos.messaging.util.Constants;
@@ -166,7 +167,7 @@ public class CartridgeSubscriptionUtils {
log.info(String.format("Publishing tenant subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName));
}
TenantSubscribedEvent subscribedEvent = new TenantSubscribedEvent(tenantId, serviceName);
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(subscribedEvent);
} catch (Exception e) {
if (log.isErrorEnabled()) {
@@ -196,7 +197,7 @@ public class CartridgeSubscriptionUtils {
log.info(String.format("Publishing tenant un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName));
}
TenantUnSubscribedEvent event = new TenantUnSubscribedEvent(tenantId, serviceName);
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
} catch (Exception e) {
if (log.isErrorEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 1e11142..5d39956 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -34,7 +34,7 @@ public class EventPublisher extends TopicPublisher {
/**
* @param topicName topic name of this publisher instance.
*/
- public EventPublisher(String topicName) {
+ EventPublisher(String topicName) {
super(topicName);
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
new file mode 100644
index 0000000..175d09b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.broker.publish;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Event publisher instance pool.
+ */
+public class EventPublisherPool {
+ private static final Log log = LogFactory.getLog(EventPublisherPool.class);
+ private static Map<String, EventPublisher> topicNameEventPublisherMap = new HashMap<String, EventPublisher>();
+
+ public static EventPublisher getPublisher(String topicName) {
+ synchronized (EventPublisherPool.class) {
+ if(topicNameEventPublisherMap.containsKey(topicName)) {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Event publisher fetched from pool: [topic] %s", topicName));
+ }
+ return topicNameEventPublisherMap.get(topicName);
+ }
+ EventPublisher eventPublisher = new EventPublisher(topicName);
+ topicNameEventPublisherMap.put(topicName, eventPublisher);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Event publisher instance created: [topic] %s", topicName));
+ }
+ return eventPublisher;
+ }
+ }
+
+ public static void close(String topicName) {
+ synchronized (EventPublisherPool.class) {
+ if(topicNameEventPublisherMap.containsKey(topicName)) {
+ topicNameEventPublisherMap.get(topicName).close();
+ topicNameEventPublisherMap.remove(topicName);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Event publisher closed and removed from pool: [topic] %s", topicName));
+ }
+ }
+ else {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Event publisher instance not found in pool: [topic] %s", topicName));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index 6614e75..004be13 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -19,18 +19,18 @@
package org.apache.stratos.messaging.broker.publish;
-import java.util.Enumeration;
-import java.util.Properties;
-
-import javax.jms.*;
-
+import com.google.gson.Gson;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.broker.connect.TopicConnector;
import org.apache.stratos.messaging.publish.MessagePublisher;
-import com.google.gson.Gson;
-import org.apache.stratos.messaging.util.Constants;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import java.util.Enumeration;
+import java.util.Properties;
/**
* Any instance who needs to publish data to a topic, should communicate with
@@ -53,7 +53,7 @@ public class TopicPublisher extends MessagePublisher {
* @param aTopicName
* topic name of this publisher instance.
*/
- public TopicPublisher(String aTopicName) {
+ TopicPublisher(String aTopicName) {
super(aTopicName);
connector = new TopicConnector();
}
[2/2] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-stratos
Posted by im...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/e50c61e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/e50c61e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/e50c61e1
Branch: refs/heads/master
Commit: e50c61e11a19b1c87c73b91e93367a7e85c70c68
Parents: f3f2d13 270e3d1
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Apr 18 22:44:38 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Apr 18 22:44:38 2014 +0530
----------------------------------------------------------------------
README.md | 2 +-
.../console/LICENSE | 191 -------------------
extensions/cep/stratos-cep-extension/pom.xml | 10 +-
products/stratos/modules/distribution/pom.xml | 2 +-
.../distribution/src/main/license/LICENSE | 4 +-
.../agent/templates/extensions/addons/_php.erb | 2 +-
tools/puppet3/modules/lb/LICENSE | 13 --
7 files changed, 13 insertions(+), 211 deletions(-)
----------------------------------------------------------------------