You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 15:56:16 UTC
[13/14] stratos git commit: adding activator class for messaging and
calling terminate of event recievers in de-activation of te bundle
adding activator class for messaging and calling terminate of event recievers in de-activation of te bundle
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/905e140a
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/905e140a
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/905e140a
Branch: refs/heads/master
Commit: 905e140a109d2818df66ee349a878439c59d6885
Parents: 63f931f
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Thu Dec 17 23:17:57 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:06:15 2015 +0530
----------------------------------------------------------------------
...LoadBalancerCommonTopologyEventReceiver.java | 7 --
.../internal/LoadBalancerServiceComponent.java | 82 ++------------------
.../internal/MessagingServiceComponent.java | 66 ++++++++++++++++
.../status/ClusterStatusEventReceiver.java | 5 ++
.../mapping/DomainMappingEventReceiver.java | 5 ++
.../health/stat/HealthStatEventReceiver.java | 5 ++
6 files changed, 89 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
index 85142e3..93f391f 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
@@ -59,13 +59,6 @@ public class LoadBalancerCommonTopologyEventReceiver {
}
}
-// public void execute() {
-// super.execute();
-// if (log.isInfoEnabled()) {
-// log.info("Load balancer topology receiver thread started");
-// }
-// }
-
public void initializeTopology() {
if (initialized) {
return;
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 3786af8..b235208 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -25,7 +25,6 @@ import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.services.DistributedObjectProvider;
-import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.load.balancer.common.event.receivers.LoadBalancerCommonApplicationSignUpEventReceiver;
import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier;
import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
@@ -38,7 +37,6 @@ import org.apache.stratos.load.balancer.event.receivers.LoadBalancerDomainMappin
import org.apache.stratos.load.balancer.event.receivers.LoadBalancerTopologyEventReceiver;
import org.apache.stratos.load.balancer.exception.TenantAwareLoadBalanceEndpointException;
import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector;
-import org.apache.stratos.load.balancer.util.LoadBalancerConstants;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -63,7 +61,6 @@ import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import java.io.File;
import java.util.Collection;
-import java.util.concurrent.ExecutorService;
/**
* @scr.component name="org.apache.stratos.load.balancer.internal.LoadBalancerServiceComponent" immediate="true"
@@ -90,7 +87,6 @@ public class LoadBalancerServiceComponent {
private static final Log log = LogFactory.getLog(LoadBalancerServiceComponent.class);
private boolean activated = false;
- private ExecutorService executorService;
private LoadBalancerTopologyEventReceiver topologyEventReceiver;
private TenantEventReceiver tenantEventReceiver;
private LoadBalancerDomainMappingEventReceiver domainMappingEventReceiver;
@@ -124,11 +120,6 @@ public class LoadBalancerServiceComponent {
// Configure topology filters
TopologyFilterConfigurator.configure(configuration);
- int threadPoolSize = Integer.getInteger(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_SIZE_KEY,
- LoadBalancerConstants.LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE);
- executorService = StratosThreadPool.getExecutorService(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_ID,
- threadPoolSize);
-
TopologyProvider topologyProvider = LoadBalancerConfiguration.getInstance().getTopologyProvider();
if (topologyProvider == null) {
topologyProvider = new TopologyProvider();
@@ -137,18 +128,18 @@ public class LoadBalancerServiceComponent {
if (configuration.isMultiTenancyEnabled() || configuration.isDomainMappingEnabled()) {
// Start tenant & application signup event receivers
- startTenantEventReceiver(executorService);
- startApplicationSignUpEventReceiver(executorService, topologyProvider);
+ startTenantEventReceiver();
+ startApplicationSignUpEventReceiver(topologyProvider);
}
if (configuration.isDomainMappingEnabled()) {
// Start domain mapping event receiver
- startDomainMappingEventReceiver(executorService, topologyProvider);
+ startDomainMappingEventReceiver(topologyProvider);
}
if (configuration.isTopologyEventListenerEnabled()) {
// Start topology receiver
- startTopologyEventReceiver(executorService, topologyProvider);
+ startTopologyEventReceiver(topologyProvider);
}
if (configuration.isCepStatsPublisherEnabled()) {
@@ -167,43 +158,28 @@ public class LoadBalancerServiceComponent {
}
}
- private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ private void startDomainMappingEventReceiver(TopologyProvider topologyProvider) {
if (domainMappingEventReceiver != null) {
return;
}
domainMappingEventReceiver = new LoadBalancerDomainMappingEventReceiver(topologyProvider);
-// domainMappingEventReceiver.setExecutorService(executorService);
-// domainMappingEventReceiver.execute();
-// if (log.isInfoEnabled()) {
-// log.info("Domain mapping event receiver thread started");
-// }
}
- private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ private void startApplicationSignUpEventReceiver(TopologyProvider topologyProvider) {
if (applicationSignUpEventReceiver != null) {
return;
}
applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
-// applicationSignUpEventReceiver.setExecutorService(executorService);
-// applicationSignUpEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Application signup event receiver thread started");
- }
}
- private void startTopologyEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ private void startTopologyEventReceiver(TopologyProvider topologyProvider) {
if (topologyEventReceiver != null) {
return;
}
topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
-// topologyEventReceiver.setExecutorService(executorService);
-// topologyEventReceiver.execute();
-// if (log.isInfoEnabled()) {
-// log.info("Topology receiver thread started");
-// }
if (log.isInfoEnabled()) {
if (TopologyServiceFilter.getInstance().isActive()) {
@@ -223,14 +199,8 @@ public class LoadBalancerServiceComponent {
}
}
- private void startTenantEventReceiver(ExecutorService executorService) {
-
+ private void startTenantEventReceiver() {
tenantEventReceiver = TenantEventReceiver.getInstance();
-// tenantEventReceiver.setExecutorService(executorService);
-// tenantEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Tenant event receiver thread started");
- }
}
private void startStatisticsNotifier(TopologyProvider topologyProvider) {
@@ -256,33 +226,6 @@ public class LoadBalancerServiceComponent {
log.warn("An error occurred while removing endpoint deployer", e);
}
- // Terminate topology receiver
-// if (topologyEventReceiver != null) {
-// try {
-// topologyEventReceiver.terminate();
-// } catch (Exception e) {
-// log.warn("An error occurred while terminating topology event receiver", e);
-// }
-// }
-
- // Terminate application signup event receiver
-// if (applicationSignUpEventReceiver != null) {
-// try {
-// applicationSignUpEventReceiver.terminate();
-// } catch (Exception e) {
-// log.warn("An error occurred while terminating application signup event receiver", e);
-// }
-// }
-
- // Terminate domain mapping event receiver
-// if (domainMappingEventReceiver != null) {
-// try {
-// domainMappingEventReceiver.terminate();
-// } catch (Exception e) {
-// log.warn("An error occurred while terminating domain mapping event receiver", e);
-// }
-// }
-
// Terminate statistics notifier
if (statisticsNotifier != null) {
try {
@@ -291,15 +234,6 @@ public class LoadBalancerServiceComponent {
log.warn("An error occurred while terminating health statistics notifier", e);
}
}
-
- // Shutdown executor service
- if (executorService != null) {
- try {
- executorService.shutdownNow();
- } catch (Exception e) {
- log.warn("An error occurred while shutting down load balancer executor service", e);
- }
- }
}
/**
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
new file mode 100644
index 0000000..c97125b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.internal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
+import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
+import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
+import org.apache.stratos.messaging.message.receiver.domain.mapping.DomainMappingEventReceiver;
+import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
+import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.osgi.service.component.ComponentContext;
+
+/**
+ * @scr.component name="org.apache.stratos.messaging.internal.MessagingServiceComponent"
+ * immediate="true"
+ */
+public class MessagingServiceComponent {
+
+ private static final Log log = LogFactory.getLog(MessagingServiceComponent.class);
+
+ protected void activate(ComponentContext context) {
+ try {
+ log.info("Messaging Service bundle activated");
+ } catch (Exception e) {
+ log.error("Could not activate Messaging Service component", e);
+ }
+ }
+
+ protected void deactivate(ComponentContext context) {
+ // deactivate all message receivers
+ try {
+ ApplicationSignUpEventReceiver.getInstance().terminate();
+ ApplicationsEventReceiver.getInstance().terminate();
+ ClusterStatusEventReceiver.getInstance().terminate();
+ DomainMappingEventReceiver.getInstance().terminate();
+ HealthStatEventReceiver.getInstance().terminate();
+ InitializerEventReceiver.getInstance().terminate();
+ TenantEventReceiver.getInstance().terminate();
+ TopologyEventReceiver.getInstance().terminate();
+ log.info("Messaging Service component is deactivated");
+ } catch (Exception e) {
+ log.error("Could not de-activate Messaging Service component", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index e191799..be42b43 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -87,6 +87,11 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
}
}
+ public void terminate() {
+ eventSubscriber.terminate();
+ messageDelegator.terminate();
+ }
+
public boolean isSubscribed() {
return ((eventSubscriber != null) && (eventSubscriber.isSubscribed()));
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 4e4c04b..6de99c0 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -64,6 +64,11 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
return instance;
}
+ public void terminate() {
+ eventSubscriber.terminate();
+ messageDelegator.terminate();
+ }
+
private void execute() {
try {
// Start topic subscriber thread
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index ba124a7..a9d2602 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -82,4 +82,9 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
}
}
}
+
+ public void terminate() {
+ eventSubscriber.terminate();
+ messageDelegator.terminate();
+ }
}