You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 15:56:04 UTC
[01/14] stratos git commit: making TopologyEventReceiver and
ApplicationSignupEventReceiver a singleton
Repository: stratos
Updated Branches:
refs/heads/master 418ed02e8 -> c90eb9a72
making TopologyEventReceiver and ApplicationSignupEventReceiver a singleton
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/78db9f1e
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/78db9f1e
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/78db9f1e
Branch: refs/heads/master
Commit: 78db9f1ec301581d2deccd824bd4e4de43e9b21d
Parents: 418ed02
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 15:33:21 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:03:42 2015 +0530
----------------------------------------------------------------------
.../AutoscalerTopologyEventReceiver.java | 18 +++----
.../stratos/cartridge/agent/CartridgeAgent.java | 54 ++++++++++----------
.../agent/CartridgeAgentEventListeners.java | 43 ++++++++--------
.../agent/test/JavaCartridgeAgentTest.java | 6 +--
...cerCommonApplicationSignUpEventReceiver.java | 11 ++--
...LoadBalancerCommonTopologyEventReceiver.java | 37 +++++++-------
.../extension/api/LoadBalancerExtension.java | 2 -
.../internal/LoadBalancerServiceComponent.java | 18 +++----
.../StratosManagerServiceComponent.java | 4 +-
.../StratosManagerTopologyEventReceiver.java | 18 +++----
.../message/receiver/StratosEventReceiver.java | 30 +++++++++++
.../signup/ApplicationSignUpEventReceiver.java | 37 ++++++++++----
.../topology/TopologyEventReceiver.java | 39 ++++++++++----
.../mock/iaas/services/impl/MockInstance.java | 6 +--
14 files changed, 193 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 8336f86..6fd64a7 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -60,18 +60,18 @@ public class AutoscalerTopologyEventReceiver {
private ExecutorService executorService;
public AutoscalerTopologyEventReceiver() {
- this.topologyEventReceiver = new TopologyEventReceiver();
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
addEventListeners();
}
- public void execute() {
- //FIXME this activated before autoscaler deployer activated.
- topologyEventReceiver.setExecutorService(getExecutorService());
- topologyEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Autoscaler topology receiver thread started");
- }
- }
+// public void execute() {
+// //FIXME this activated before autoscaler deployer activated.
+// // topologyEventReceiver.setExecutorService(getExecutorService());
+// //topologyEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Autoscaler topology receiver thread started");
+// }
+// }
private void addEventListeners() {
// Listen to topology events that affect clusters
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index 91f596e..18e6e0a 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -103,10 +103,10 @@ public class CartridgeAgent implements Runnable {
} */
// Start application event receiver thread
- registerApplicationEventListeners();
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent registering all event listeners ... done");
- }
+ //registerApplicationEventListeners();
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent registering all event listeners ... done");
+// }
// Execute instance started shell script
extensionHandler.onInstanceStartedEvent();
@@ -197,29 +197,29 @@ public class CartridgeAgent implements Runnable {
}
}
- protected void registerTenantEventListeners() {
- if (log.isDebugEnabled()) {
- log.debug("registerTenantEventListeners before");
- }
-
- eventListenerns.startTenantEventReceiver();
-
- if (log.isDebugEnabled()) {
- log.debug("registerTenantEventListeners after");
- }
- }
-
- protected void registerApplicationEventListeners() {
- if (log.isDebugEnabled()) {
- log.debug("registerApplicationListeners before");
- }
-
- eventListenerns.startApplicationsEventReceiver();
-
- if (log.isDebugEnabled()) {
- log.debug("registerApplicationEventListeners after");
- }
- }
+// protected void registerTenantEventListeners() {
+// if (log.isDebugEnabled()) {
+// log.debug("registerTenantEventListeners before");
+// }
+//
+// eventListenerns.startTenantEventReceiver();
+//
+// if (log.isDebugEnabled()) {
+// log.debug("registerTenantEventListeners after");
+// }
+// }
+
+// protected void registerApplicationEventListeners() {
+// if (log.isDebugEnabled()) {
+// log.debug("registerApplicationListeners before");
+// }
+//
+// eventListenerns.startApplicationsEventReceiver();
+//
+// if (log.isDebugEnabled()) {
+// log.debug("registerApplicationEventListeners after");
+// }
+// }
protected void validateRequiredSystemProperties() {
String jndiPropertiesDir = System.getProperty(CartridgeAgentConstants.JNDI_PROPERTIES_DIR);
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
index e6bb41b..103d2c7 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
@@ -72,11 +72,10 @@ public class CartridgeAgentEventListeners {
if (log.isDebugEnabled()) {
log.debug("Creating cartridge agent event listeners...");
}
- this.applicationsEventReceiver = new ApplicationSignUpEventReceiver();
- this.applicationsEventReceiver.setExecutorService(eventListenerExecutorService);
+ this.applicationsEventReceiver = ApplicationSignUpEventReceiver.getInstance();
- this.topologyEventReceiver = new TopologyEventReceiver();
- this.topologyEventReceiver.setExecutorService(eventListenerExecutorService);
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
+ //this.topologyEventReceiver.setExecutorService(eventListenerExecutorService);
this.instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
@@ -151,24 +150,24 @@ public class CartridgeAgentEventListeners {
}
- public void startApplicationsEventReceiver() {
-
- if (log.isDebugEnabled()) {
- log.debug("Starting cartridge agent application event message receiver");
- }
-
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- applicationsEventReceiver.execute();
- }
- });
-
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent application receiver thread started, waiting for event messages ...");
- }
-
- }
+// public void startApplicationsEventReceiver() {
+//
+// if (log.isDebugEnabled()) {
+// log.debug("Starting cartridge agent application event message receiver");
+// }
+//
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// applicationsEventReceiver.execute();
+// }
+// });
+//
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent application receiver thread started, waiting for event messages ...");
+// }
+//
+// }
private void addInstanceNotifierEventListeners() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
index 26b8728..430e0b8 100644
--- a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
+++ b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
@@ -107,9 +107,9 @@ public class JavaCartridgeAgentTest {
String agentHome = setupJavaAgent();
ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 5);
- topologyEventReceiver = new TopologyEventReceiver();
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
+ topologyEventReceiver = TopologyEventReceiver.getInstance();
+ //topologyEventReceiver.setExecutorService(executorService);
+ //topologyEventReceiver.execute();
instanceStatusEventReceiver = new InstanceStatusEventReceiver();
instanceStatusEventReceiver.setExecutorService(executorService);
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
index d5819dc..95c4867 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
@@ -41,19 +41,20 @@ import org.apache.stratos.messaging.message.receiver.application.signup.Applicat
* Load balancer common application signup event receiver updates the topology in the given topology provider
* with the hostnames found in application signup events.
*/
-public class LoadBalancerCommonApplicationSignUpEventReceiver extends ApplicationSignUpEventReceiver {
+public class LoadBalancerCommonApplicationSignUpEventReceiver {
private static final Log log = LogFactory.getLog(LoadBalancerCommonApplicationSignUpEventReceiver.class);
-
+ private ApplicationSignUpEventReceiver applicationSignUpEventReceiver;
private TopologyProvider topologyProvider;
public LoadBalancerCommonApplicationSignUpEventReceiver(TopologyProvider topologyProvider) {
+ this.applicationSignUpEventReceiver = ApplicationSignUpEventReceiver.getInstance();
this.topologyProvider = topologyProvider;
addEventListeners();
}
private void addEventListeners() {
- addEventListener(new CompleteApplicationSignUpsEventListener() {
+ applicationSignUpEventReceiver.addEventListener(new CompleteApplicationSignUpsEventListener() {
private boolean initialized = false;
@Override
@@ -96,7 +97,7 @@ public class LoadBalancerCommonApplicationSignUpEventReceiver extends Applicatio
}
});
- addEventListener(new ApplicationSignUpAddedEventListener() {
+ applicationSignUpEventReceiver.addEventListener(new ApplicationSignUpAddedEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -110,7 +111,7 @@ public class LoadBalancerCommonApplicationSignUpEventReceiver extends Applicatio
}
});
- addEventListener(new ApplicationSignUpRemovedEventListener() {
+ applicationSignUpEventReceiver.addEventListener(new ApplicationSignUpRemovedEventListener() {
@Override
protected void onEvent(Event event) {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
index 4fb45a9..85142e3 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
@@ -38,15 +38,17 @@ import java.util.Properties;
* Load balancer common topology receiver updates the topology in the given topology provider
* according to topology events.
*/
-public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiver {
+public class LoadBalancerCommonTopologyEventReceiver {
private static final Log log = LogFactory.getLog(LoadBalancerCommonTopologyEventReceiver.class);
private TopologyProvider topologyProvider;
private boolean initialized;
+ private TopologyEventReceiver topologyEventReceiver;
public LoadBalancerCommonTopologyEventReceiver(TopologyProvider topologyProvider) {
this.topologyProvider = topologyProvider;
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
addEventListeners();
}
@@ -57,12 +59,12 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
}
- public void execute() {
- super.execute();
- if (log.isInfoEnabled()) {
- log.info("Load balancer topology receiver thread started");
- }
- }
+// public void execute() {
+// super.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Load balancer topology receiver thread started");
+// }
+// }
public void initializeTopology() {
if (initialized) {
@@ -115,7 +117,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
* Add default event listeners for updating the topology on topology events
*/
public void addEventListeners() {
- addEventListener(new CompleteTopologyEventListener() {
+ topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
if (!initialized) {
@@ -124,7 +126,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new MemberActivatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -142,11 +144,10 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
if (networkPartitionIdFilter != null && !networkPartitionIdFilter.equals("")) {
if (memberActivatedEvent.getNetworkPartitionId().equals(networkPartitionIdFilter)) {
addMember(serviceName, clusterId, memberId);
- }
- else{
+ } else {
log.debug(String.format("Member exists in a different network partition." +
- "[member id] %s [member network partition] %s [filter network partition] %s ",
- memberId,memberActivatedEvent.getNetworkPartitionId(),networkPartitionIdFilter));
+ "[member id] %s [member network partition] %s [filter network partition] %s ",
+ memberId, memberActivatedEvent.getNetworkPartitionId(), networkPartitionIdFilter));
}
} else {
addMember(serviceName, clusterId, memberId);
@@ -159,7 +160,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new MemberMaintenanceListener() {
+ topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
@Override
protected void onEvent(Event event) {
@@ -181,7 +182,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new MemberSuspendedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -203,7 +204,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new MemberTerminatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -224,7 +225,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new ClusterRemovedEventListener() {
+ topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -253,7 +254,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new ServiceRemovedEventListener() {
+ topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
@Override
protected void onEvent(Event event) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index ae2b6dd..e7a2071 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -201,8 +201,6 @@ public class LoadBalancerExtension {
*/
private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
- applicationSignUpEventReceiver.setExecutorService(executorService);
- applicationSignUpEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Application signup event receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index a7761cd..cb74984 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -186,8 +186,8 @@ public class LoadBalancerServiceComponent {
}
applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
- applicationSignUpEventReceiver.setExecutorService(executorService);
- applicationSignUpEventReceiver.execute();
+// applicationSignUpEventReceiver.setExecutorService(executorService);
+// applicationSignUpEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Application signup event receiver thread started");
}
@@ -266,13 +266,13 @@ public class LoadBalancerServiceComponent {
}
// Terminate application signup event receiver
- if (applicationSignUpEventReceiver != null) {
- try {
- applicationSignUpEventReceiver.terminate();
- } catch (Exception e) {
- log.warn("An error occurred while terminating application signup event receiver", e);
- }
- }
+// if (applicationSignUpEventReceiver != null) {
+// try {
+// applicationSignUpEventReceiver.terminate();
+// } catch (Exception e) {
+// log.warn("An error occurred while terminating application signup event receiver", e);
+// }
+// }
// Terminate domain mapping event receiver
if (domainMappingEventReceiver != null) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 47f401a..76d39a7 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -211,8 +211,8 @@ public class StratosManagerServiceComponent {
*/
private void initializeTopologyEventReceiver() {
topologyEventReceiver = new StratosManagerTopologyEventReceiver();
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
+// topologyEventReceiver.setExecutorService(executorService);
+// topologyEventReceiver.execute();
}
/**
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
index 08ca3d6..51b21ac 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
@@ -23,19 +23,19 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
-public class StratosManagerTopologyEventReceiver extends TopologyEventReceiver {
+public class StratosManagerTopologyEventReceiver {
private static final Log log = LogFactory.getLog(StratosManagerTopologyEventReceiver.class);
public StratosManagerTopologyEventReceiver() {
}
- @Override
- public void execute() {
- super.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Stratos manager topology event receiver thread started");
- }
- }
+// @Override
+// public void execute() {
+// super.execute();
+//
+// if (log.isInfoEnabled()) {
+// log.info("Stratos manager topology event receiver thread started");
+// }
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
new file mode 100644
index 0000000..5ac89e6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver;
+
+import java.util.concurrent.ExecutorService;
+
+public class StratosEventReceiver {
+
+ protected ExecutorService executorService;
+
+ public StratosEventReceiver () {
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index 55e3fd1..dde214d 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.receiver.application.signup;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.event.initializer.CompleteApplicationSignUpsRequestEvent;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
@@ -33,26 +35,41 @@ import java.util.concurrent.ExecutorService;
/**
* Application signup event receiver.
*/
-public class ApplicationSignUpEventReceiver {
+public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(ApplicationSignUpEventReceiver.class);
private ApplicationSignUpEventMessageDelegator messageDelegator;
private ApplicationSignUpEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private ExecutorService executorService;
+ private static volatile ApplicationSignUpEventReceiver instance;
- public ApplicationSignUpEventReceiver() {
+ private ApplicationSignUpEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100);
ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue();
this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue);
this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue);
+ execute();
+ }
+
+ public static ApplicationSignUpEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (ApplicationSignUpEventReceiver.class) {
+ if (instance == null) {
+ instance = new ApplicationSignUpEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_SIGNUP_TOPIC.getTopicName(),
@@ -103,11 +120,11 @@ public class ApplicationSignUpEventReceiver {
messageDelegator.terminate();
}
- public ExecutorService getExecutorService() {
- return executorService;
- }
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index b841d0a..50e078a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.receiver.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
@@ -34,26 +36,41 @@ import java.util.concurrent.ExecutorService;
* A thread for receiving topology information from message broker and
* build topology in topology manager.
*/
-public class TopologyEventReceiver {
+public class TopologyEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(TopologyEventReceiver.class);
private TopologyEventMessageDelegator messageDelegator;
private TopologyEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private ExecutorService executorService;
+ private static volatile TopologyEventReceiver instance;
- public TopologyEventReceiver() {
+ private TopologyEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue();
this.messageDelegator = new TopologyEventMessageDelegator(messageQueue);
this.messageListener = new TopologyEventMessageListener(messageQueue);
+ execute();
+ }
+
+ public static TopologyEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (TopologyEventReceiver.class) {
+ if (instance == null) {
+ instance = new TopologyEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.TOPOLOGY_TOPIC.getTopicName(), messageListener);
@@ -101,11 +118,11 @@ public class TopologyEventReceiver {
});
}
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
index 7b31861..c752f9e 100644
--- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
+++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
@@ -108,7 +108,7 @@ public class MockInstance implements Serializable {
}
private void startTopologyEventReceiver() {
- topologyEventReceiver = new TopologyEventReceiver();
+ topologyEventReceiver = TopologyEventReceiver.getInstance();
topologyEventReceiver.addEventListener(new MemberInitializedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -151,8 +151,8 @@ public class MockInstance implements Serializable {
}
}
});
- topologyEventReceiver.setExecutorService(eventListenerExecutorService);
- topologyEventReceiver.execute();
+// topologyEventReceiver.setExecutorService(eventListenerExecutorService);
+// topologyEventReceiver.execute();
if (log.isDebugEnabled()) {
log.debug(String.format(
"Mock instance topology event message receiver started for mock member [member-id] %s",
[05/14] stratos git commit: making HealthStatEventReceiver singleton
and fixing references in components
Posted by is...@apache.org.
making HealthStatEventReceiver singleton and fixing references in components
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/6ac3b879
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/6ac3b879
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/6ac3b879
Branch: refs/heads/master
Commit: 6ac3b8797ee303593564973a9dd8ef54e3b33b17
Parents: a1ba54c
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 18:09:04 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:04:28 2015 +0530
----------------------------------------------------------------------
.../AutoscalerHealthStatEventReceiver.java | 18 +++----
.../internal/AutoscalerServiceComponent.java | 4 +-
.../health/stat/HealthStatEventReceiver.java | 52 +++++++++++++-------
3 files changed, 45 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/6ac3b879/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
index b8482f0..0b13500 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -45,18 +45,18 @@ public class AutoscalerHealthStatEventReceiver {
private ExecutorService executorService;
public AutoscalerHealthStatEventReceiver() {
- this.healthStatEventReceiver = new HealthStatEventReceiver();
+ this.healthStatEventReceiver = HealthStatEventReceiver.getInstance();
addEventListeners();
}
- public void execute() {
- healthStatEventReceiver.setExecutorService(executorService);
- healthStatEventReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Autoscaler health stat event receiver thread started");
- }
- }
+// public void execute() {
+// healthStatEventReceiver.setExecutorService(executorService);
+// healthStatEventReceiver.execute();
+//
+// if (log.isInfoEnabled()) {
+// log.info("Autoscaler health stat event receiver thread started");
+// }
+// }
private void addEventListeners() {
// Listen to health stat events that affect clusters
http://git-wip-us.apache.org/repos/asf/stratos/blob/6ac3b879/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index ba7c341..bb28577 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -181,8 +181,8 @@ public class AutoscalerServiceComponent {
// Start health stat receiver
autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
- autoscalerHealthStatEventReceiver.setExecutorService(executorService);
- autoscalerHealthStatEventReceiver.execute();
+// autoscalerHealthStatEventReceiver.setExecutorService(executorService);
+// autoscalerHealthStatEventReceiver.execute();
if (log.isDebugEnabled()) {
log.debug("Health statistics receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/6ac3b879/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index e73a326..ede8f17 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -21,8 +21,10 @@ package org.apache.stratos.messaging.message.receiver.health.stat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
@@ -30,19 +32,33 @@ import java.util.concurrent.ExecutorService;
/**
* A thread for receiving health stat information from message broker
*/
-public class HealthStatEventReceiver {
+public class HealthStatEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(HealthStatEventReceiver.class);
private final HealthStatEventMessageDelegator messageDelegator;
private final HealthStatEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private boolean terminated;
- private ExecutorService executorService;
+ private static volatile HealthStatEventReceiver instance;
- public HealthStatEventReceiver() {
+ private HealthStatEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("healthstat-event-receiver", 100);
HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue();
this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue);
this.messageListener = new HealthStatEventMessageListener(messageQueue);
+ execute();
+ }
+
+ public static HealthStatEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (HealthStatEventReceiver.class) {
+ if (instance == null) {
+ instance = new HealthStatEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
@@ -50,7 +66,7 @@ public class HealthStatEventReceiver {
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName(), messageListener);
@@ -69,17 +85,17 @@ public class HealthStatEventReceiver {
}
}
- public void terminate() {
- eventSubscriber.terminate();
- messageDelegator.terminate();
- terminated = true;
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void terminate() {
+// eventSubscriber.terminate();
+// messageDelegator.terminate();
+// terminated = true;
+// }
+//
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
[02/14] stratos git commit: making TopologyEventReceiver a singleton
and fixing references
Posted by is...@apache.org.
making TopologyEventReceiver a singleton and fixing references
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b3dc5462
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b3dc5462
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b3dc5462
Branch: refs/heads/master
Commit: b3dc54628ea70b64835945e3061021c5f8c41de4
Parents: 78db9f1
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 17:28:00 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:03:55 2015 +0530
----------------------------------------------------------------------
.../AutoscalerTopologyEventReceiver.java | 24 +++----
.../internal/AutoscalerServiceComponent.java | 18 ++---
.../stratos/cartridge/agent/CartridgeAgent.java | 28 ++++----
.../agent/CartridgeAgentEventListeners.java | 72 ++++++++++----------
.../extension/api/LoadBalancerExtension.java | 33 ++++-----
.../internal/LoadBalancerServiceComponent.java | 24 +++----
.../service/MetadataTopologyEventReceiver.java | 35 +++++-----
.../service/registry/MetadataApiRegistry.java | 8 +--
.../cep/extension/CEPTopologyEventReceiver.java | 20 +++---
.../extension/FaultHandlingWindowProcessor.java | 10 +--
.../cep/extension/CEPTopologyEventReceiver.java | 20 +++---
.../extension/FaultHandlingWindowProcessor.java | 10 +--
.../tests/PythonAgentIntegrationTest.java | 6 +-
.../integration/common/TopologyHandler.java | 12 ++--
14 files changed, 163 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 6fd64a7..daa70ae 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -510,16 +510,16 @@ public class AutoscalerTopologyEventReceiver {
/**
* Terminate load balancer topology receiver thread.
*/
- public void terminate() {
- topologyEventReceiver.terminate();
- terminated = true;
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void terminate() {
+// topologyEventReceiver.terminate();
+// terminated = true;
+// }
+//
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index 76844a0..4d4c54f 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -173,8 +173,8 @@ public class AutoscalerServiceComponent {
// Start topology receiver
asTopologyReceiver = new AutoscalerTopologyEventReceiver();
- asTopologyReceiver.setExecutorService(executorService);
- asTopologyReceiver.execute();
+// asTopologyReceiver.setExecutorService(executorService);
+ //asTopologyReceiver.execute();
if (log.isDebugEnabled()) {
log.debug("Topology receiver executor service started");
}
@@ -245,13 +245,13 @@ public class AutoscalerServiceComponent {
}
protected void deactivate(ComponentContext context) {
- if (asTopologyReceiver != null) {
- try {
- asTopologyReceiver.terminate();
- } catch (Exception e) {
- log.warn("An error occurred while terminating autoscaler topology event receiver", e);
- }
- }
+// if (asTopologyReceiver != null) {
+// try {
+// asTopologyReceiver.terminate();
+// } catch (Exception e) {
+// log.warn("An error occurred while terminating autoscaler topology event receiver", e);
+// }
+// }
if (autoscalerHealthStatEventReceiver != null) {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index 18e6e0a..b0bf326 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -60,10 +60,10 @@ public class CartridgeAgent implements Runnable {
}
// Start topology event receiver thread
- registerTopologyEventListeners();
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent registerTopologyEventListeners done");
- }
+// registerTopologyEventListeners();
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent registerTopologyEventListeners done");
+// }
if (log.isInfoEnabled()) {
log.info("Waiting for CompleteTopologyEvent..");
@@ -186,16 +186,16 @@ public class CartridgeAgent implements Runnable {
}
}
- protected void registerTopologyEventListeners() {
- if (log.isDebugEnabled()) {
- log.debug("registerTopologyEventListeners before");
- }
- eventListenerns.startTopologyEventReceiver();
-
- if (log.isDebugEnabled()) {
- log.debug("registerTopologyEventListeners after");
- }
- }
+// protected void registerTopologyEventListeners() {
+// if (log.isDebugEnabled()) {
+// log.debug("registerTopologyEventListeners before");
+// }
+// eventListenerns.startTopologyEventReceiver();
+//
+// if (log.isDebugEnabled()) {
+// log.debug("registerTopologyEventListeners after");
+// }
+// }
// protected void registerTenantEventListeners() {
// if (log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
index 103d2c7..1d64ff0 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
@@ -94,24 +94,24 @@ public class CartridgeAgentEventListeners {
}
}
- public void startTopologyEventReceiver() {
-
- if (log.isDebugEnabled()) {
- log.debug("Starting cartridge agent topology event message receiver");
- }
-
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- topologyEventReceiver.execute();
- }
- });
-
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent topology receiver thread started, waiting for event messages ...");
- }
-
- }
+// public void startTopologyEventReceiver() {
+//
+// if (log.isDebugEnabled()) {
+// log.debug("Starting cartridge agent topology event message receiver");
+// }
+//
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// topologyEventReceiver.execute();
+// }
+// });
+//
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent topology receiver thread started, waiting for event messages ...");
+// }
+//
+// }
public void startInstanceNotifierReceiver() {
@@ -131,24 +131,24 @@ public class CartridgeAgentEventListeners {
}
}
- public void startTenantEventReceiver() {
-
- if (log.isDebugEnabled()) {
- log.debug("Starting cartridge agent tenant event message receiver");
- }
-
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- topologyEventReceiver.execute();
- }
- });
-
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent tenant receiver thread started, waiting for event messages ...");
- }
-
- }
+// public void startTenantEventReceiver() {
+//
+// if (log.isDebugEnabled()) {
+// log.debug("Starting cartridge agent tenant event message receiver");
+// }
+//
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// topologyEventReceiver.execute();
+// }
+// });
+//
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent tenant receiver thread started, waiting for event messages ...");
+// }
+//
+// }
// public void startApplicationsEventReceiver() {
//
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index e7a2071..d2a8cb3 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -39,6 +39,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import java.util.concurrent.ExecutorService;
@@ -123,8 +124,8 @@ public class LoadBalancerExtension {
addTopologyEventListeners(topologyEventReceiver);
// Add default topology provider event listeners
topologyEventReceiver.addEventListeners();
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
+// topologyEventReceiver.setExecutorService(executorService);
+// topologyEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Topology receiver thread started");
}
@@ -212,12 +213,12 @@ public class LoadBalancerExtension {
* @param topologyEventReceiver topology event receiver instance
*/
private void addTopologyEventListeners(final LoadBalancerCommonTopologyEventReceiver topologyEventReceiver) {
- topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
try {
- if (!loadBalancerStarted) {
+ if (!loadBalancerStarted) {
configureAndStart();
}
} catch (Exception e) {
@@ -228,37 +229,37 @@ public class LoadBalancerExtension {
}
}
});
- topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
- topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new MemberSuspendedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
- topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
- topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new ClusterRemovedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
- topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new ServiceRemovedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
- topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new MemberMaintenanceListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
@@ -338,12 +339,12 @@ public class LoadBalancerExtension {
* Stop load balancer instance.
*/
public void stop() {
- try {
- if (topologyEventReceiver != null) {
- topologyEventReceiver.terminate();
- }
- } catch (Exception ignore) {
- }
+// try {
+// if (topologyEventReceiver != null) {
+// topologyEventReceiver.terminate();
+// }
+// } catch (Exception ignore) {
+// }
try {
if (statisticsNotifier != null) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index cb74984..442686a 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -199,11 +199,11 @@ public class LoadBalancerServiceComponent {
}
topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Topology receiver thread started");
- }
+// topologyEventReceiver.setExecutorService(executorService);
+// topologyEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Topology receiver thread started");
+// }
if (log.isInfoEnabled()) {
if (TopologyServiceFilter.getInstance().isActive()) {
@@ -257,13 +257,13 @@ public class LoadBalancerServiceComponent {
}
// Terminate topology receiver
- if (topologyEventReceiver != null) {
- try {
- topologyEventReceiver.terminate();
- } catch (Exception e) {
- log.warn("An error occurred while terminating topology event receiver", e);
- }
- }
+// if (topologyEventReceiver != null) {
+// try {
+// topologyEventReceiver.terminate();
+// } catch (Exception e) {
+// log.warn("An error occurred while terminating topology event receiver", e);
+// }
+// }
// Terminate application signup event receiver
// if (applicationSignUpEventReceiver != null) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
index e516271..f16282d 100644
--- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
+++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
@@ -41,8 +41,9 @@ public class MetadataTopologyEventReceiver {
private ExecutorService executorService;
public MetadataTopologyEventReceiver() {
- this.topologyEventReceiver = new TopologyEventReceiver();
- executorService = StratosThreadPool.getExecutorService(Constants.METADATA_SERVICE_THREAD_POOL_ID, 20);
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
+// //executorService = StratosThreadPool.getExecutorService(Constants
+// .METADATA_SERVICE_THREAD_POOL_ID, 20);
addEventListeners();
}
@@ -67,21 +68,21 @@ public class MetadataTopologyEventReceiver {
});
}
- public void execute() {
- topologyEventReceiver.setExecutorService(getExecutorService());
- topologyEventReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Metadata service topology receiver started.");
- }
- }
-
- public void terminate() {
- topologyEventReceiver.terminate();
- if (log.isInfoEnabled()) {
- log.info("Metadata service topology receiver stopped.");
- }
- }
+// public void execute() {
+// topologyEventReceiver.setExecutorService(getExecutorService());
+// topologyEventReceiver.execute();
+//
+// if (log.isInfoEnabled()) {
+// log.info("Metadata service topology receiver started.");
+// }
+// }
+//
+// public void terminate() {
+// topologyEventReceiver.terminate();
+// if (log.isInfoEnabled()) {
+// log.info("Metadata service topology receiver stopped.");
+// }
+// }
public ExecutorService getExecutorService() {
return executorService;
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
index 75ddbc7..47fc600 100644
--- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
+++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
@@ -56,7 +56,7 @@ public class MetadataApiRegistry implements DataStore {
public MetadataApiRegistry() {
metadataTopologyEventReceiver = new MetadataTopologyEventReceiver();
- metadataTopologyEventReceiver.execute();
+// metadataTopologyEventReceiver.execute();
metadataApplicationEventReceiver = new MetadataApplicationEventReceiver();
metadataApplicationEventReceiver.execute();
@@ -417,9 +417,9 @@ public class MetadataApiRegistry implements DataStore {
return applicationIdToReadWriteLockMap;
}
- public void stopTopologyReceiver() {
- metadataTopologyEventReceiver.terminate();
- }
+// public void stopTopologyReceiver() {
+// metadataTopologyEventReceiver.terminate();
+// }
public void stopApplicationReceiver() {
metadataApplicationEventReceiver.terminate();
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 59c70c5..2696271 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -34,26 +34,28 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
/**
* CEP Topology Receiver for Fault Handling Window Processor.
*/
-public class CEPTopologyEventReceiver extends TopologyEventReceiver {
+public class CEPTopologyEventReceiver {
private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
private FaultHandlingWindowProcessor faultHandler;
+ private TopologyEventReceiver topologyEventReceiver;
public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
this.faultHandler = faultHandler;
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
addEventListeners();
}
- @Override
- public void execute() {
- super.execute();
- log.info("CEP topology event receiver thread started");
- }
+// @Override
+// public void execute() {
+// super.execute();
+// log.info("CEP topology event receiver thread started");
+// }
private void addEventListeners() {
// Load member time stamp map from the topology as a one time task
- addEventListener(new CompleteTopologyEventListener() {
+ topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
private boolean initialized;
@Override
@@ -74,7 +76,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
});
// Remove member from the time stamp map when MemberTerminated event is received.
- addEventListener(new MemberTerminatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
@@ -84,7 +86,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
});
// Add member to time stamp map whenever member is activated
- addEventListener(new MemberActivatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 8d16b33..7aec0d5 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -286,10 +286,10 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
MemberFaultEventMap
.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
- executorService = StratosThreadPool
- .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
- cepTopologyEventReceiver.setExecutorService(executorService);
- cepTopologyEventReceiver.execute();
+// executorService = StratosThreadPool
+// .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
+// cepTopologyEventReceiver.setExecutorService(executorService);
+// cepTopologyEventReceiver.execute();
//Ordinary scheduling
window.schedule();
@@ -329,7 +329,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
@Override
public void destroy() {
// terminate topology listener thread
- cepTopologyEventReceiver.terminate();
+// cepTopologyEventReceiver.terminate();
window = null;
// Shutdown executor service
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 59c70c5..2696271 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -34,26 +34,28 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
/**
* CEP Topology Receiver for Fault Handling Window Processor.
*/
-public class CEPTopologyEventReceiver extends TopologyEventReceiver {
+public class CEPTopologyEventReceiver {
private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
private FaultHandlingWindowProcessor faultHandler;
+ private TopologyEventReceiver topologyEventReceiver;
public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
this.faultHandler = faultHandler;
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
addEventListeners();
}
- @Override
- public void execute() {
- super.execute();
- log.info("CEP topology event receiver thread started");
- }
+// @Override
+// public void execute() {
+// super.execute();
+// log.info("CEP topology event receiver thread started");
+// }
private void addEventListeners() {
// Load member time stamp map from the topology as a one time task
- addEventListener(new CompleteTopologyEventListener() {
+ topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
private boolean initialized;
@Override
@@ -74,7 +76,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
});
// Remove member from the time stamp map when MemberTerminated event is received.
- addEventListener(new MemberTerminatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
@@ -84,7 +86,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
});
// Add member to time stamp map whenever member is activated
- addEventListener(new MemberActivatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index eb07dd9..2abfda1 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -279,10 +279,10 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
MemberFaultEventMap
.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
- executorService = StratosThreadPool
- .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
- cepTopologyEventReceiver.setExecutorService(executorService);
- cepTopologyEventReceiver.execute();
+// executorService = StratosThreadPool
+// .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
+// cepTopologyEventReceiver.setExecutorService(executorService);
+// cepTopologyEventReceiver.execute();
//Ordinary scheduling
window.schedule();
@@ -322,7 +322,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
@Override
public void destroy() {
// terminate topology listener thread
- cepTopologyEventReceiver.terminate();
+// cepTopologyEventReceiver.terminate();
window = null;
// Shutdown executor service
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index d8cbc9f..c5751bd 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -125,9 +125,9 @@ public abstract class PythonAgentIntegrationTest {
}
ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize);
- topologyEventReceiver = new TopologyEventReceiver();
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
+ topologyEventReceiver = TopologyEventReceiver.getInstance();
+// topologyEventReceiver.setExecutorService(executorService);
+// topologyEventReceiver.execute();
instanceStatusEventReceiver = new InstanceStatusEventReceiver();
instanceStatusEventReceiver.setExecutorService(executorService);
http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
index 3af9866..e506ef7 100644
--- a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
+++ b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
@@ -108,9 +108,9 @@ public class TopologyHandler {
}
private void initializeApplicationSignUpEventReceiver() {
- applicationSignUpEventReceiver = new ApplicationSignUpEventReceiver();
- applicationSignUpEventReceiver.setExecutorService(executorService);
- applicationSignUpEventReceiver.execute();
+ applicationSignUpEventReceiver = ApplicationSignUpEventReceiver.getInstance();
+// applicationSignUpEventReceiver.setExecutorService(executorService);
+// applicationSignUpEventReceiver.execute();
}
private void initializeTenantEventReceiver() {
@@ -171,8 +171,8 @@ public class TopologyHandler {
* Initialize Topology event receiver
*/
private void initializeTopologyEventReceiver() {
- topologyEventReceiver = new TopologyEventReceiver();
- topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver = TopologyEventReceiver.getInstance();
+// topologyEventReceiver.setExecutorService(executorService);
topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -206,7 +206,7 @@ public class TopologyHandler {
clusterInstanceInactivateEvent.getClusterId()));
}
});
- topologyEventReceiver.execute();
+ //topologyEventReceiver.execute();
}
/**
[09/14] stratos git commit: fixing conflicts in StratosThreadPool
Posted by is...@apache.org.
fixing conflicts in StratosThreadPool
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/412cb2c2
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/412cb2c2
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/412cb2c2
Branch: refs/heads/master
Commit: 412cb2c245a5ad3c1b6f189517c0c4f7d9879050
Parents: f302906
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Fri Dec 4 05:22:40 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:05:16 2015 +0530
----------------------------------------------------------------------
.../stratos/cartridge/agent/CartridgeAgent.java | 30 ++---
.../agent/CartridgeAgentEventListeners.java | 46 ++++----
.../agent/test/JavaCartridgeAgentTest.java | 10 +-
.../CloudControllerServiceComponent.java | 4 +-
.../status/InstanceStatusTopicReceiver.java | 24 ++--
.../common/threading/StratosThreadPool.java | 4 +-
.../StratosManagerServiceComponent.java | 4 +-
...ratosManagerInstanceStatusEventReceiver.java | 22 ++--
.../notifier/InstanceNotifierEventReceiver.java | 109 +++++++++++++------
.../status/InstanceStatusEventReceiver.java | 44 +++++---
.../mock/iaas/services/impl/MockInstance.java | 32 +++---
.../tests/PythonAgentIntegrationTest.java | 6 +-
.../integration/tests/adc/GitHookTestCase.java | 22 ++--
13 files changed, 208 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index b0bf326..c498caa 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -90,10 +90,10 @@ public class CartridgeAgent implements Runnable {
}
// Start instance notifier listener thread
- registerInstanceNotifierEventListeners();
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent registerInstanceNotifierEventListeners done");
- }
+// registerInstanceNotifierEventListeners();
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent registerInstanceNotifierEventListeners done");
+// }
// Start tenant event receiver thread
/*
@@ -174,17 +174,17 @@ public class CartridgeAgent implements Runnable {
logPublisherManager.stop();
}
- protected void registerInstanceNotifierEventListeners() {
- if (log.isDebugEnabled()) {
- log.debug("SsubscribeToTopicsAndRegisterListeners before");
- }
-
- eventListenerns.startInstanceNotifierReceiver();
-
- if (log.isDebugEnabled()) {
- log.debug("SsubscribeToTopicsAndRegisterListeners after");
- }
- }
+// protected void registerInstanceNotifierEventListeners() {
+// if (log.isDebugEnabled()) {
+// log.debug("SsubscribeToTopicsAndRegisterListeners before");
+// }
+//
+// eventListenerns.startInstanceNotifierReceiver();
+//
+// if (log.isDebugEnabled()) {
+// log.debug("SsubscribeToTopicsAndRegisterListeners after");
+// }
+// }
// protected void registerTopologyEventListeners() {
// if (log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
index ffa3750..5954b76 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
@@ -65,8 +65,8 @@ public class CartridgeAgentEventListeners {
private ApplicationSignUpEventReceiver applicationsEventReceiver;
private ExtensionHandler extensionHandler;
- private static final ExecutorService eventListenerExecutorService =
- StratosThreadPool.getExecutorService("cartridge.agent.event.listener.thread.pool", 10);
+// private static final ExecutorService eventListenerExecutorService =
+// StratosThreadPool.getExecutorService("cartridge.agent.event.listener.thread.pool", 10);
public CartridgeAgentEventListeners() {
if (log.isDebugEnabled()) {
@@ -77,7 +77,7 @@ public class CartridgeAgentEventListeners {
this.topologyEventReceiver = TopologyEventReceiver.getInstance();
//this.topologyEventReceiver.setExecutorService(eventListenerExecutorService);
- this.instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
+ this.instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance();
this.tenantEventReceiver = TenantEventReceiver.getInstance();
// this.tenantEventReceiver.setExecutorService(eventListenerExecutorService);
@@ -113,23 +113,23 @@ public class CartridgeAgentEventListeners {
//
// }
- public void startInstanceNotifierReceiver() {
-
- if (log.isDebugEnabled()) {
- log.debug("Starting cartridge agent instance notifier event message receiver");
- }
-
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- instanceNotifierEventReceiver.execute();
- }
- });
-
- if (log.isDebugEnabled()) {
- log.debug("Cartridge agent Instance notifier event message receiver started, waiting for event messages ...");
- }
- }
+// public void startInstanceNotifierReceiver() {
+//
+// if (log.isDebugEnabled()) {
+// log.debug("Starting cartridge agent instance notifier event message receiver");
+// }
+//
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// instanceNotifierEventReceiver.execute();
+// }
+// });
+//
+// if (log.isDebugEnabled()) {
+// log.debug("Cartridge agent Instance notifier event message receiver started, waiting for event messages ...");
+// }
+// }
// public void startTenantEventReceiver() {
//
@@ -521,9 +521,9 @@ public class CartridgeAgentEventListeners {
* Terminate load balancer topology receiver thread.
*/
- public void terminate() {
- topologyEventReceiver.terminate();
- }
+// public void terminate() {
+// topologyEventReceiver.terminate();
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
index 430e0b8..18ed2ab 100644
--- a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
+++ b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
@@ -111,9 +111,9 @@ public class JavaCartridgeAgentTest {
//topologyEventReceiver.setExecutorService(executorService);
//topologyEventReceiver.execute();
- instanceStatusEventReceiver = new InstanceStatusEventReceiver();
- instanceStatusEventReceiver.setExecutorService(executorService);
- instanceStatusEventReceiver.execute();
+ instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance();
+// instanceStatusEventReceiver.setExecutorService(executorService);
+// instanceStatusEventReceiver.execute();
instanceStarted = false;
instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() {
@@ -176,8 +176,8 @@ public class JavaCartridgeAgentTest {
} catch (Exception ignore) {
}
- this.instanceStatusEventReceiver.terminate();
- this.topologyEventReceiver.terminate();
+ //this.instanceStatusEventReceiver.terminate();
+ // this.topologyEventReceiver.terminate();
this.instanceActivated = false;
this.instanceStarted = false;
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index c4c0336..267d5a8 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -162,8 +162,8 @@ public class CloudControllerServiceComponent {
}
instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
- instanceStatusTopicReceiver.setExecutorService(executorService);
- instanceStatusTopicReceiver.execute();
+// instanceStatusTopicReceiver.setExecutorService(executorService);
+// instanceStatusTopicReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Instance status event receiver thread started");
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index 1f012b3..00ab8b7 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -45,21 +45,21 @@ public class InstanceStatusTopicReceiver {
private ExecutorService executorService;
public InstanceStatusTopicReceiver() {
- this.statusEventReceiver = new InstanceStatusEventReceiver();
+ this.statusEventReceiver = InstanceStatusEventReceiver.getInstance();
addEventListeners();
}
- public void execute() {
- statusEventReceiver.setExecutorService(executorService);
- statusEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread started");
- }
-
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread terminated");
- }
- }
+// public void execute() {
+// statusEventReceiver.setExecutorService(executorService);
+// statusEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Cloud controller application status thread started");
+// }
+//
+// if (log.isInfoEnabled()) {
+// log.info("Cloud controller application status thread terminated");
+// }
+// }
private void addEventListeners() {
statusEventReceiver.addEventListener(new InstanceActivatedEventListener() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index c0ae8ae..687cec2 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -36,8 +36,8 @@ public class StratosThreadPool {
private static final Log log = LogFactory.getLog(StratosThreadPool.class);
- private static Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<String, ExecutorService>();
- private static Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
+ private static volatile Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<>();
+ private static volatile Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
private static Object executorServiceMapLock = new Object();
private static Object scheduledServiceMapLock = new Object();
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 573c19d..c4d68ae 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -202,8 +202,8 @@ public class StratosManagerServiceComponent {
*/
private void initializeInstanceStatusEventReceiver() {
instanceStatusEventReceiver = new StratosManagerInstanceStatusEventReceiver();
- instanceStatusEventReceiver.setExecutorService(executorService);
- instanceStatusEventReceiver.execute();
+// instanceStatusEventReceiver.setExecutorService(executorService);
+// instanceStatusEventReceiver.execute();
}
/**
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
index 1da448e..ab92d1b 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
@@ -37,32 +37,34 @@ import java.util.List;
/**
* Stratos manager instance status event receiver.
*/
-public class StratosManagerInstanceStatusEventReceiver extends InstanceStatusEventReceiver {
+public class StratosManagerInstanceStatusEventReceiver {
private static final Log log = LogFactory.getLog(StratosManagerInstanceStatusEventReceiver.class);
private ApplicationSignUpHandler signUpManager;
private ArtifactDistributionCoordinator artifactDistributionCoordinator;
+ private InstanceStatusEventReceiver instanceStatusEventReceiver;
public StratosManagerInstanceStatusEventReceiver() {
signUpManager = new ApplicationSignUpHandler();
artifactDistributionCoordinator = new ArtifactDistributionCoordinator();
+ instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance();
addEventListeners();
}
- @Override
- public void execute() {
- super.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Stratos manager instance status event receiver thread started");
- }
- }
+// @Override
+// public void execute() {
+// super.execute();
+//
+// if (log.isInfoEnabled()) {
+// log.info("Stratos manager instance status event receiver thread started");
+// }
+// }
private void addEventListeners() {
- addEventListener(new InstanceStartedEventListener() {
+ instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() {
@Override
protected void onEvent(Event event) {
InstanceStartedEvent instanceStartedEvent = (InstanceStartedEvent) event;
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index 4ad6572..cfc7f11 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -21,64 +21,107 @@ package org.apache.stratos.messaging.message.receiver.instance.notifier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
/**
* A thread for receiving instance notifier information from message broker.
*/
-public class InstanceNotifierEventReceiver {
+public class InstanceNotifierEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(InstanceNotifierEventReceiver.class);
private final InstanceNotifierEventMessageDelegator messageDelegator;
private EventSubscriber eventSubscriber;
- private boolean terminated;
+ private InstanceNotifierEventMessageListener messageListener;
+ private static volatile InstanceNotifierEventReceiver instance;
+ //private boolean terminated;
- public InstanceNotifierEventReceiver() {
+ private InstanceNotifierEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue();
this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue);
- InstanceNotifierEventMessageListener messageListener = new InstanceNotifierEventMessageListener(messageQueue);
+ messageListener = new InstanceNotifierEventMessageListener(messageQueue);
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(),
messageListener);
+ execute();
+ }
+
+ public static InstanceNotifierEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (InstanceNotifierEventReceiver.class) {
+ if (instance == null) {
+ instance = new InstanceNotifierEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}
- public void execute() {
- synchronized (this) {
- if (terminated) {
- log.info("InstanceNotifierEventReceiver has been terminated. Event subscriber will not be created.");
- return;
+// public void execute() {
+// synchronized (this) {
+// if (terminated) {
+// log.info("InstanceNotifierEventReceiver has been terminated. Event subscriber will not be created.");
+// return;
+// }
+// try {
+// Thread subscriberThread = new Thread(eventSubscriber);
+// subscriberThread.start();
+// if (log.isDebugEnabled()) {
+// log.debug("InstanceNotifier event message receiver thread started");
+// }
+//
+// // Start instance notifier event message delegator thread
+// Thread receiverThread = new Thread(messageDelegator);
+// receiverThread.start();
+// if (log.isDebugEnabled()) {
+// log.debug("InstanceNotifier event message delegator thread started");
+// }
+// } catch (Exception e) {
+// if (log.isErrorEnabled()) {
+// log.error("InstanceNotifier receiver failed", e);
+// }
+// }
+// }
+// log.info("InstanceNotifierEventReceiver started");
+//
+// // Keep the thread live until terminated
+// while (!terminated) {
+// try {
+// Thread.sleep(2000);
+// } catch (InterruptedException ignore) {
+// }
+// }
+// }
+
+ private void execute() {
+ try {
+ // Start topic subscriber thread
+ eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(),
+ messageListener);
+ executorService.execute(eventSubscriber);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Instance Notifier event message receiver thread started");
}
- try {
- Thread subscriberThread = new Thread(eventSubscriber);
- subscriberThread.start();
- if (log.isDebugEnabled()) {
- log.debug("InstanceNotifier event message receiver thread started");
- }
- // Start instance notifier event message delegator thread
- Thread receiverThread = new Thread(messageDelegator);
- receiverThread.start();
- if (log.isDebugEnabled()) {
- log.debug("InstanceNotifier event message delegator thread started");
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("InstanceNotifier receiver failed", e);
- }
+ // Start topology event message delegator thread
+ executorService.execute(messageDelegator);
+ if (log.isDebugEnabled()) {
+ log.debug("Instance Notifier event message delegator thread started");
}
- }
- log.info("InstanceNotifierEventReceiver started");
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ignore) {
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Instance Notifier receiver failed", e);
}
}
}
@@ -90,7 +133,7 @@ public class InstanceNotifierEventReceiver {
public synchronized void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
- terminated = true;
+ //terminated = true;
log.info("InstanceNotifierEventReceiver terminated");
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index 41f444e..a2a1623 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -21,27 +21,41 @@ package org.apache.stratos.messaging.message.receiver.instance.status;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
-import java.util.concurrent.ExecutorService;
-
/**
* A thread for receiving instance notifier information from message broker.
*/
-public class InstanceStatusEventReceiver {
+public class InstanceStatusEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(InstanceStatusEventReceiver.class);
private final InstanceStatusEventMessageDelegator messageDelegator;
private final InstanceStatusEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private boolean terminated;
- private ExecutorService executorService;
+ private static volatile InstanceStatusEventReceiver instance;
- public InstanceStatusEventReceiver() {
+ private InstanceStatusEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue();
this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue);
this.messageListener = new InstanceStatusEventMessageListener(messageQueue);
+ execute();
+ }
+
+ public static InstanceStatusEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (InstanceStatusEventReceiver.class) {
+ if (instance == null) {
+ instance = new InstanceStatusEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
@@ -49,7 +63,7 @@ public class InstanceStatusEventReceiver {
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_STATUS_TOPIC.getTopicName(), messageListener);
@@ -77,14 +91,14 @@ public class InstanceStatusEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
- terminated = true;
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
+ // terminated = true;
}
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
index c752f9e..9886335 100644
--- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
+++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
@@ -161,7 +161,7 @@ public class MockInstance implements Serializable {
}
private void startInstanceNotifierEventReceiver() {
- instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
+ instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance();
instanceNotifierEventReceiver.addEventListener(new InstanceCleanupClusterEventListener() {
@Override
protected void onEvent(Event event) {
@@ -185,17 +185,17 @@ public class MockInstance implements Serializable {
});
// TODO: Fix InstanceNotifierEventReceiver to use executor service
// do not remove this since execute() is a blocking call
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- instanceNotifierEventReceiver.execute();
- }
- });
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Mock instance instance notifier event message receiver started for mock member [member-id] %s",
- mockInstanceContext.getMemberId()));
- }
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// instanceNotifierEventReceiver.execute();
+// }
+// });
+// if (log.isDebugEnabled()) {
+// log.debug(String.format(
+// "Mock instance instance notifier event message receiver started for mock member [member-id] %s",
+// mockInstanceContext.getMemberId()));
+// }
}
private void handleMemberTermination() {
@@ -213,9 +213,9 @@ public class MockInstance implements Serializable {
healthStatNotifierScheduledFuture.cancel(true);
}
- private void stopInstanceNotifierReceiver() {
- instanceNotifierEventReceiver.terminate();
- }
+// private void stopInstanceNotifierReceiver() {
+// instanceNotifierEventReceiver.terminate();
+// }
public MockInstanceContext getMockInstanceContext() {
return mockInstanceContext;
@@ -223,7 +223,7 @@ public class MockInstance implements Serializable {
public synchronized void terminate() {
if (MemberStatus.Initialized.equals(memberStatus)) {
- stopInstanceNotifierReceiver();
+ //stopInstanceNotifierReceiver();
stopHealthStatisticsPublisher();
memberStatus = MemberStatus.Terminated;
if (log.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index c5751bd..4a86e40 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -129,9 +129,9 @@ public abstract class PythonAgentIntegrationTest {
// topologyEventReceiver.setExecutorService(executorService);
// topologyEventReceiver.execute();
- instanceStatusEventReceiver = new InstanceStatusEventReceiver();
- instanceStatusEventReceiver.setExecutorService(executorService);
- instanceStatusEventReceiver.execute();
+ instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance();
+// instanceStatusEventReceiver.setExecutorService(executorService);
+// instanceStatusEventReceiver.execute();
instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() {
@Override
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java b/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
index 3f708db..7412540 100644
--- a/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
+++ b/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
@@ -56,19 +56,19 @@ public class GitHookTestCase extends StratosIntegrationTest {
private static final String appPolicyId = "application-policy-git-hook-test";
private static final String GIT_HOOK_ARTIFACT_FILENAME = "hook-req.json";
private static final int ARTIFACT_UPDATED_EXPECTED_COUNT = 2;
- private ExecutorService eventListenerExecutorService = StratosThreadPool
- .getExecutorService("stratos.integration.test.git.thread.pool", 5);
+// private ExecutorService eventListenerExecutorService = StratosThreadPool
+// .getExecutorService("stratos.integration.test.git.thread.pool", 5);
@Test(timeOut = DEFAULT_TEST_TIMEOUT)
public void sendRepoNotify() throws Exception {
deployArtifacts();
- final InstanceNotifierEventReceiver instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- instanceNotifierEventReceiver.execute();
- }
- });
+ final InstanceNotifierEventReceiver instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance();
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// instanceNotifierEventReceiver.execute();
+// }
+// });
ArtifactUpdateEventListener artifactUpdateEventListener = new ArtifactUpdateEventListener() {
@Override
@@ -86,8 +86,8 @@ public class GitHookTestCase extends StratosIntegrationTest {
Thread.sleep(1000);
}
TopologyHandler.getInstance().assertApplicationActiveStatus(applicationId);
- instanceNotifierEventReceiver.terminate();
- eventListenerExecutorService.shutdownNow();
+ //instanceNotifierEventReceiver.terminate();
+ // eventListenerExecutorService.shutdownNow();
undeployArtifacts();
}
[07/14] stratos git commit: making ClusterStatusEventReceiver
singleton and fixing references in components
Posted by is...@apache.org.
making ClusterStatusEventReceiver singleton and fixing references in components
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/27ae4baf
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/27ae4baf
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/27ae4baf
Branch: refs/heads/master
Commit: 27ae4bafc268a59dcd65336ca9ba491eaeae7426
Parents: c627ff1
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 18:30:49 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:04:51 2015 +0530
----------------------------------------------------------------------
.../CloudControllerServiceComponent.java | 4 +-
.../status/ClusterStatusTopicReceiver.java | 59 ++++++++++----------
.../status/ClusterStatusEventReceiver.java | 51 +++++++++++------
.../mapping/DomainMappingEventReceiver.java | 27 ++++-----
4 files changed, 78 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 2368596..74d36e7 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -154,8 +154,8 @@ public class CloudControllerServiceComponent {
}
clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
- clusterStatusTopicReceiver.setExecutorService(executorService);
- clusterStatusTopicReceiver.execute();
+// clusterStatusTopicReceiver.setExecutorService(executorService);
+// clusterStatusTopicReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Cluster status event receiver thread started");
http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index daa6bf5..e0b9f62 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -32,28 +32,27 @@ import java.util.concurrent.ExecutorService;
public class ClusterStatusTopicReceiver {
private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class);
- private ClusterStatusEventReceiver statusEventReceiver;
- private boolean terminated;
- private ExecutorService executorService;
+ private ClusterStatusEventReceiver clusterStatusEventReceiver;
+ //private boolean terminated;
+ //private ExecutorService executorService;
public ClusterStatusTopicReceiver() {
- this.statusEventReceiver = new ClusterStatusEventReceiver();
-
+ this.clusterStatusEventReceiver = ClusterStatusEventReceiver.getInstance();
addEventListeners();
}
- public void execute() {
- statusEventReceiver.setExecutorService(executorService);
- statusEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Cloud controller Cluster status thread started");
- }
-
- }
+// public void execute() {
+// clusterStatusEventReceiver.setExecutorService(executorService);
+// clusterStatusEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Cloud controller Cluster status thread started");
+// }
+//
+// }
private void addEventListeners() {
// Listen to topology events that affect clusters
- statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -64,14 +63,14 @@ public class ClusterStatusTopicReceiver {
}
});
- statusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
@Override
protected void onEvent(Event event) {
//TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent) event);
}
});
- statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -82,7 +81,7 @@ public class ClusterStatusTopicReceiver {
}
});
- statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -93,7 +92,7 @@ public class ClusterStatusTopicReceiver {
}
});
- statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -104,7 +103,7 @@ public class ClusterStatusTopicReceiver {
}
});
- statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -116,15 +115,15 @@ public class ClusterStatusTopicReceiver {
});
}
- public void setTerminated(boolean terminated) {
- this.terminated = terminated;
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void setTerminated(boolean terminated) {
+// this.terminated = terminated;
+// }
+//
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index 119cf49..2b4d557 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -21,8 +21,10 @@ package org.apache.stratos.messaging.message.receiver.cluster.status;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
@@ -30,26 +32,39 @@ import java.util.concurrent.ExecutorService;
/**
* A thread for receiving instance notifier information from message broker.
*/
-public class ClusterStatusEventReceiver {
+public class ClusterStatusEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(ClusterStatusEventReceiver.class);
private final ClusterStatusEventMessageDelegator messageDelegator;
private final ClusterStatusEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private boolean terminated;
- private ExecutorService executorService;
+ private static volatile ClusterStatusEventReceiver instance;
- public ClusterStatusEventReceiver() {
+ private ClusterStatusEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100);
ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue);
this.messageListener = new ClusterStatusEventMessageListener(messageQueue);
+ execute();
}
public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}
+ public static ClusterStatusEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (ClusterStatusEventReceiver.class) {
+ if (instance == null) {
+ instance = new ClusterStatusEventReceiver();
+ }
+ }
+ }
- public void execute() {
+ return instance;
+ }
+
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.CLUSTER_STATUS_TOPIC.getTopicName(), messageListener);
@@ -77,17 +92,17 @@ public class ClusterStatusEventReceiver {
return ((eventSubscriber != null) && (eventSubscriber.isSubscribed()));
}
- public void terminate() {
- eventSubscriber.terminate();
- messageDelegator.terminate();
- terminated = true;
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void terminate() {
+// eventSubscriber.terminate();
+// messageDelegator.terminate();
+// terminated = true;
+// }
+//
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 3c723a3..6b79873 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -47,6 +47,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue();
this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue);
this.messageListener = new DomainMappingEventMessageListener(messageQueue);
+ execute();
}
public void addEventListener(EventListener eventListener) {
@@ -65,7 +66,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
return instance;
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.DOMAIN_MAPPING_TOPIC.getTopicName(), messageListener);
@@ -91,16 +92,16 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
}
}
- public void terminate() {
- eventSubscriber.terminate();
- messageDelegator.terminate();
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void terminate() {
+// eventSubscriber.terminate();
+// messageDelegator.terminate();
+// }
+//
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
[04/14] stratos git commit: making InitializerEventReceiver singleton
and fixing references in components
Posted by is...@apache.org.
making InitializerEventReceiver singleton and fixing references in components
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/a1ba54c8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/a1ba54c8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/a1ba54c8
Branch: refs/heads/master
Commit: a1ba54c8b3764dfc5a22c267858f05ccbf34878f
Parents: 170c27c
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 18:01:16 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:04:17 2015 +0530
----------------------------------------------------------------------
.../AutoscalerInitializerTopicReceiver.java | 16 ++++----
.../internal/AutoscalerServiceComponent.java | 10 ++---
.../CloudControllerServiceComponent.java | 4 +-
.../initializer/InitializerTopicReceiver.java | 30 +++++++--------
.../StratosManagerServiceComponent.java | 4 +-
.../StratosManagerInitializerTopicReceiver.java | 32 ++++++++--------
.../initializer/InitializerEventReceiver.java | 40 ++++++++++++++------
7 files changed, 77 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/a1ba54c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
index da6b270..b330211 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
@@ -34,17 +34,17 @@ public class AutoscalerInitializerTopicReceiver {
private ExecutorService executorService;
public AutoscalerInitializerTopicReceiver() {
- this.initializerEventReceiver = new InitializerEventReceiver();
+ this.initializerEventReceiver = InitializerEventReceiver.getInstance();
addEventListeners();
}
- public void execute() {
- initializerEventReceiver.setExecutorService(executorService);
- initializerEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Cloud controller initializer topic receiver started");
- }
- }
+// public void execute() {
+// initializerEventReceiver.setExecutorService(executorService);
+// initializerEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Cloud controller initializer topic receiver started");
+// }
+// }
private void addEventListeners() {
initializerEventReceiver.addEventListener(new CompleteApplicationsRequestEventListener() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/a1ba54c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index 4d4c54f..ba7c341 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -189,11 +189,11 @@ public class AutoscalerServiceComponent {
// Start initializer receiver
autoscalerInitializerTopicReceiver = new AutoscalerInitializerTopicReceiver();
- autoscalerInitializerTopicReceiver.setExecutorService(executorService);
- autoscalerInitializerTopicReceiver.execute();
- if (log.isDebugEnabled()) {
- log.debug("Initializer receiver thread started");
- }
+// autoscalerInitializerTopicReceiver.setExecutorService(executorService);
+// autoscalerInitializerTopicReceiver.execute();
+// if (log.isDebugEnabled()) {
+// log.debug("Initializer receiver thread started");
+// }
if (log.isInfoEnabled()) {
log.info("Scheduling tasks to publish applications");
http://git-wip-us.apache.org/repos/asf/stratos/blob/a1ba54c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 3a0b1e3..2368596 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -170,8 +170,8 @@ public class CloudControllerServiceComponent {
}
initializerTopicReceiver = new InitializerTopicReceiver();
- initializerTopicReceiver.setExecutorService(executorService);
- initializerTopicReceiver.execute();
+// initializerTopicReceiver.setExecutorService(executorService);
+// initializerTopicReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Initializer event receiver thread started");
http://git-wip-us.apache.org/repos/asf/stratos/blob/a1ba54c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
index aba0dac..9a2c502 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
@@ -34,17 +34,17 @@ public class InitializerTopicReceiver {
private ExecutorService executorService;
public InitializerTopicReceiver() {
- this.initializerEventReceiver = new InitializerEventReceiver();
+ this.initializerEventReceiver = InitializerEventReceiver.getInstance();
addEventListeners();
}
- public void execute() {
- initializerEventReceiver.setExecutorService(executorService);
- initializerEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Autoscaler initializer topic receiver started");
- }
- }
+// public void execute() {
+// initializerEventReceiver.setExecutorService(executorService);
+// initializerEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Autoscaler initializer topic receiver started");
+// }
+// }
private void addEventListeners() {
initializerEventReceiver.addEventListener(new CompleteTopologyRequestEventListener() {
@@ -62,11 +62,11 @@ public class InitializerTopicReceiver {
});
}
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/a1ba54c8/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 76d39a7..ed45852 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -193,8 +193,8 @@ public class StratosManagerServiceComponent {
private void initializeInitializerEventReceiver() {
initializerTopicReceiver = new StratosManagerInitializerTopicReceiver();
- initializerTopicReceiver.setExecutorService(executorService);
- initializerTopicReceiver.execute();
+// initializerTopicReceiver.setExecutorService(executorService);
+// initializerTopicReceiver.execute();
}
/**
http://git-wip-us.apache.org/repos/asf/stratos/blob/a1ba54c8/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
index 89ace69..c08e8e4 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
@@ -33,22 +33,22 @@ import java.util.concurrent.ExecutorService;
public class StratosManagerInitializerTopicReceiver {
private static final Log log = LogFactory.getLog(StratosManagerInitializerTopicReceiver.class);
private InitializerEventReceiver initializerEventReceiver;
- private ExecutorService executorService;
+ //private ExecutorService executorService;
private ApplicationSignUpHandler applicationSignUpHandler;
public StratosManagerInitializerTopicReceiver() {
- this.initializerEventReceiver = new InitializerEventReceiver();
+ this.initializerEventReceiver = InitializerEventReceiver.getInstance();
applicationSignUpHandler = new ApplicationSignUpHandler();
addEventListeners();
}
- public void execute() {
- initializerEventReceiver.setExecutorService(executorService);
- initializerEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Stratos manager initializer topic receiver started");
- }
- }
+// public void execute() {
+// initializerEventReceiver.setExecutorService(executorService);
+// initializerEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Stratos manager initializer topic receiver started");
+// }
+// }
private void addEventListeners() {
initializerEventReceiver.addEventListener(new CompleteTenantRequestEventListener() {
@@ -81,11 +81,11 @@ public class StratosManagerInitializerTopicReceiver {
});
}
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/a1ba54c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
index 90d358c..e6429e2 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
@@ -20,31 +20,49 @@ package org.apache.stratos.messaging.message.receiver.initializer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
-public class InitializerEventReceiver {
+public class InitializerEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(InitializerEventReceiver.class);
private InitializerEventMessageDelegator messageDelegator;
private InitializerEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private ExecutorService executorService;
+ private static volatile InitializerEventReceiver instance;
+ //private ExecutorService executorService;
- public InitializerEventReceiver() {
+ private InitializerEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("initializer-event-receiver", 100);
InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue();
this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue);
this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue);
+ execute();
+ }
+
+ public static InitializerEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (InitializerEventReceiver.class) {
+ if (instance == null) {
+ instance = new InitializerEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INITIALIZER_TOPIC.getTopicName(),
@@ -68,11 +86,11 @@ public class InitializerEventReceiver {
messageDelegator.terminate();
}
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
[12/14] stratos git commit: deleting some commented out methods
Posted by is...@apache.org.
deleting some commented out methods
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/63f931f4
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/63f931f4
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/63f931f4
Branch: refs/heads/master
Commit: 63f931f4e3ae7b5d3074cd0c1b47f6c2b2967b75
Parents: 617258b
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Thu Dec 17 21:39:47 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:05:57 2015 +0530
----------------------------------------------------------------------
.../application/ApplicationsEventReceiver.java | 10 ----------
.../signup/ApplicationSignUpEventReceiver.java | 10 ----------
.../cluster/status/ClusterStatusEventReceiver.java | 15 ---------------
.../domain/mapping/DomainMappingEventReceiver.java | 2 --
.../health/stat/HealthStatEventReceiver.java | 16 ----------------
.../initializer/InitializerEventReceiver.java | 8 --------
.../notifier/InstanceNotifierEventReceiver.java | 5 +----
.../status/InstanceStatusEventReceiver.java | 8 --------
.../receiver/tenant/TenantEventReceiver.java | 6 ------
.../receiver/topology/TopologyEventReceiver.java | 2 --
10 files changed, 1 insertion(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/63f931f4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
index e94bafd..69dba01 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
@@ -29,8 +29,6 @@ import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
-import java.util.concurrent.ExecutorService;
-
public class ApplicationsEventReceiver extends StratosEventReceiver{
private static final Log log = LogFactory.getLog(ApplicationsEventReceiver.class);
@@ -117,12 +115,4 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{
}
});
}
-
-// public ExecutorService getExecutorService() {
-// return executorService;
-// }
-//
-// public void setExecutorService(ExecutorService executorService) {
-// this.executorService = executorService;
-// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/63f931f4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index dde214d..df90cf9 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -30,8 +30,6 @@ import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
-import java.util.concurrent.ExecutorService;
-
/**
* Application signup event receiver.
*/
@@ -119,12 +117,4 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
eventSubscriber.terminate();
messageDelegator.terminate();
}
-
-// public ExecutorService getExecutorService() {
-// return executorService;
-// }
-
-// public void setExecutorService(ExecutorService executorService) {
-// this.executorService = executorService;
-// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/63f931f4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index 2b4d557..e191799 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -27,7 +27,6 @@ import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
-import java.util.concurrent.ExecutorService;
/**
* A thread for receiving instance notifier information from message broker.
@@ -91,18 +90,4 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
public boolean isSubscribed() {
return ((eventSubscriber != null) && (eventSubscriber.isSubscribed()));
}
-
-// public void terminate() {
-// eventSubscriber.terminate();
-// messageDelegator.terminate();
-// terminated = true;
-// }
-//
-// public ExecutorService getExecutorService() {
-// return executorService;
-// }
-//
-// public void setExecutorService(ExecutorService executorService) {
-// this.executorService = executorService;
-// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/63f931f4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index a37941c..4e4c04b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -27,8 +27,6 @@ import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
-import java.util.concurrent.ExecutorService;
-
/**
* Domain mapping event receiver.
*/
http://git-wip-us.apache.org/repos/asf/stratos/blob/63f931f4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index ede8f17..ba124a7 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -27,8 +27,6 @@ import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
-import java.util.concurrent.ExecutorService;
-
/**
* A thread for receiving health stat information from message broker
*/
@@ -84,18 +82,4 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
}
}
}
-
-// public void terminate() {
-// eventSubscriber.terminate();
-// messageDelegator.terminate();
-// terminated = true;
-// }
-//
-// public ExecutorService getExecutorService() {
-// return executorService;
-// }
-//
-// public void setExecutorService(ExecutorService executorService) {
-// this.executorService = executorService;
-// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/63f931f4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
index e6429e2..805a8bf 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
@@ -85,12 +85,4 @@ public class InitializerEventReceiver extends StratosEventReceiver {
eventSubscriber.terminate();
messageDelegator.terminate();
}
-
-// public ExecutorService getExecutorService() {
-// return executorService;
-// }
-//
-// public void setExecutorService(ExecutorService executorService) {
-// this.executorService = executorService;
-// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/63f931f4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index 33f5de8..e0b8e9f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -43,10 +43,7 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue();
this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue);
- messageListener = new InstanceNotifierEventMessageListener(messageQueue);
- // Start topic subscriber thread
- eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(),
- messageListener);
+ this.messageListener = new InstanceNotifierEventMessageListener(messageQueue);
execute();
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/63f931f4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index a2a1623..a565ea9 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -93,12 +93,4 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
messageDelegator.terminate();
// terminated = true;
}
-
-// public ExecutorService getExecutorService() {
-// return executorService;
-// }
-//
-// public void setExecutorService(ExecutorService executorService) {
-// this.executorService = executorService;
-// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/63f931f4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index 1c519b9..a52cb20 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -30,8 +30,6 @@ import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
-import java.util.concurrent.ExecutorService;
-
/**
* A thread for receiving tenant information from message broker and
* build tenant information in tenant manager.
@@ -68,10 +66,6 @@ public class TenantEventReceiver extends StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
-// public void setExecutorService(ExecutorService executorService) {
-// this.executorService = executorService;
-// }
-
private void execute() {
try {
// Start topic subscriber thread
http://git-wip-us.apache.org/repos/asf/stratos/blob/63f931f4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index 8378486..bfa3950 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -30,8 +30,6 @@ import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
-import java.util.concurrent.ExecutorService;
-
/**
* A thread for receiving topology information from message broker and
* build topology in topology manager.
[08/14] stratos git commit: making ApplicationsEventReceiver
singleton and fixing references in components
Posted by is...@apache.org.
making ApplicationsEventReceiver singleton and fixing references in components
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/f302906a
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/f302906a
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/f302906a
Branch: refs/heads/master
Commit: f302906abd4218673dbe648fc8254fcdf9c2ae3b
Parents: 27ae4ba
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 18:48:14 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:05:04 2015 +0530
----------------------------------------------------------------------
.../CloudControllerServiceComponent.java | 4 +--
.../application/ApplicationEventReceiver.java | 24 ++++++-------
.../extension/api/LoadBalancerExtension.java | 6 ++--
.../StratosManagerServiceComponent.java | 4 +--
.../StratosManagerApplicationEventReceiver.java | 19 +++++-----
.../application/ApplicationsEventReceiver.java | 37 ++++++++++++++------
.../MetadataApplicationEventReceiver.java | 31 ++++++++--------
.../service/registry/MetadataApiRegistry.java | 8 ++---
8 files changed, 75 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/f302906a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 74d36e7..c4c0336 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -146,8 +146,8 @@ public class CloudControllerServiceComponent {
private void executeCoordinatorTasks() {
applicationEventReceiver = new ApplicationEventReceiver();
- applicationEventReceiver.setExecutorService(executorService);
- applicationEventReceiver.execute();
+// applicationEventReceiver.setExecutorService(executorService);
+// applicationEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Application event receiver thread started");
http://git-wip-us.apache.org/repos/asf/stratos/blob/f302906a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
index bd35e25..8da5575 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
@@ -37,20 +37,20 @@ import java.util.concurrent.ExecutorService;
public class ApplicationEventReceiver {
private static final Log log = LogFactory.getLog(ApplicationEventReceiver.class);
private ApplicationsEventReceiver applicationsEventReceiver;
- private ExecutorService executorService;
+ // private ExecutorService executorService;
public ApplicationEventReceiver() {
- this.applicationsEventReceiver = new ApplicationsEventReceiver();
+ this.applicationsEventReceiver = ApplicationsEventReceiver.getInstance();
addEventListeners();
}
- public void execute() {
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application event receiver thread started");
- }
- applicationsEventReceiver.setExecutorService(executorService);
- applicationsEventReceiver.execute();
- }
+// public void execute() {
+// if (log.isInfoEnabled()) {
+// log.info("Cloud controller application event receiver thread started");
+// }
+// applicationsEventReceiver.setExecutorService(executorService);
+// applicationsEventReceiver.execute();
+// }
private void addEventListeners() {
applicationsEventReceiver.addEventListener(new ApplicationInstanceTerminatedEventListener() {
@@ -76,7 +76,7 @@ public class ApplicationEventReceiver {
});
}
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f302906a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index c2ee077..ec1ddbc 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -150,9 +150,9 @@ public class LoadBalancerExtension {
}
private void startApplicationEventReceiver(ExecutorService executorService) {
- applicationsEventReceiver = new ApplicationsEventReceiver();
- applicationsEventReceiver.setExecutorService(executorService);
- applicationsEventReceiver.execute();
+ applicationsEventReceiver = ApplicationsEventReceiver.getInstance();
+// applicationsEventReceiver.setExecutorService(executorService);
+// applicationsEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Application event receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f302906a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index ed45852..573c19d 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -220,8 +220,8 @@ public class StratosManagerServiceComponent {
*/
private void initializeApplicationEventReceiver() {
applicationEventReceiver = new StratosManagerApplicationEventReceiver();
- applicationEventReceiver.setExecutorService(executorService);
- applicationEventReceiver.execute();
+// applicationEventReceiver.setExecutorService(executorService);
+// applicationEventReceiver.execute();
}
/**
http://git-wip-us.apache.org/repos/asf/stratos/blob/f302906a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerApplicationEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerApplicationEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerApplicationEventReceiver.java
index b6ce035..ff85c83 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerApplicationEventReceiver.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerApplicationEventReceiver.java
@@ -26,23 +26,22 @@ import org.apache.stratos.messaging.message.receiver.application.ApplicationsEve
/**
* Stratos manager application event receiver.
*/
-public class StratosManagerApplicationEventReceiver extends ApplicationsEventReceiver {
+public class StratosManagerApplicationEventReceiver {
private static final Log log = LogFactory.getLog(StratosManagerApplicationEventReceiver.class);
public StratosManagerApplicationEventReceiver() {
addEventListeners();
-
}
- @Override
- public void execute() {
- super.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Stratos manager application event receiver thread started");
- }
- }
+// @Override
+// public void execute() {
+// super.execute();
+//
+// if (log.isInfoEnabled()) {
+// log.info("Stratos manager application event receiver thread started");
+// }
+// }
private void addEventListeners() {
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f302906a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
index 9306ad2..e94bafd 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
@@ -20,27 +20,44 @@ package org.apache.stratos.messaging.message.receiver.application;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.event.initializer.CompleteApplicationsRequestEvent;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
-public class ApplicationsEventReceiver {
+public class ApplicationsEventReceiver extends StratosEventReceiver{
private static final Log log = LogFactory.getLog(ApplicationsEventReceiver.class);
private ApplicationsEventMessageDelegator messageDelegator;
private ApplicationsEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private ExecutorService executorService;
+ private static volatile ApplicationsEventReceiver instance;
- public ApplicationsEventReceiver() {
+ private ApplicationsEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("application-event-receiver", 100);
ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue();
this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue);
this.messageListener = new ApplicationsEventMessageListener(messageQueue);
+ execute();
+ }
+
+ public static ApplicationsEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (ApplicationsEventReceiver.class) {
+ if (instance == null) {
+ instance = new ApplicationsEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
@@ -101,11 +118,11 @@ public class ApplicationsEventReceiver {
});
}
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f302906a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataApplicationEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataApplicationEventReceiver.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataApplicationEventReceiver.java
index ca00c3c..e1bf929 100644
--- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataApplicationEventReceiver.java
+++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataApplicationEventReceiver.java
@@ -40,11 +40,12 @@ import java.util.concurrent.ExecutorService;
public class MetadataApplicationEventReceiver {
private static final Log log = LogFactory.getLog(MetadataApplicationEventReceiver.class);
private ApplicationsEventReceiver applicationsEventReceiver;
- private ExecutorService executorService;
+ //private ExecutorService executorService;
public MetadataApplicationEventReceiver() {
- this.applicationsEventReceiver = new ApplicationsEventReceiver();
- executorService = StratosThreadPool.getExecutorService(Constants.METADATA_SERVICE_THREAD_POOL_ID, 20);
+ this.applicationsEventReceiver = ApplicationsEventReceiver.getInstance();
+ //executorService = StratosThreadPool.getExecutorService(Constants
+ // .METADATA_SERVICE_THREAD_POOL_ID, 20);
addEventListeners();
}
@@ -65,19 +66,19 @@ public class MetadataApplicationEventReceiver {
if (log.isDebugEnabled()) {
log.debug("Metadata service READ WRITE locks initialized on complete applications event.");
}
- terminate();
+ //terminate();
}
});
}
- public void execute() {
- applicationsEventReceiver.setExecutorService(getExecutorService());
- applicationsEventReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Metadata service application receiver started.");
- }
- }
+// public void execute() {
+// applicationsEventReceiver.setExecutorService(getExecutorService());
+// applicationsEventReceiver.execute();
+//
+// if (log.isInfoEnabled()) {
+// log.info("Metadata service application receiver started.");
+// }
+// }
public void terminate() {
applicationsEventReceiver.terminate();
@@ -86,8 +87,8 @@ public class MetadataApplicationEventReceiver {
}
}
- public ExecutorService getExecutorService() {
- return executorService;
- }
+ // public ExecutorService getExecutorService() {
+// return executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f302906a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
index 47fc600..abb21ba 100644
--- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
+++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
@@ -59,7 +59,7 @@ public class MetadataApiRegistry implements DataStore {
// metadataTopologyEventReceiver.execute();
metadataApplicationEventReceiver = new MetadataApplicationEventReceiver();
- metadataApplicationEventReceiver.execute();
+// metadataApplicationEventReceiver.execute();
}
/**
@@ -421,7 +421,7 @@ public class MetadataApiRegistry implements DataStore {
// metadataTopologyEventReceiver.terminate();
// }
- public void stopApplicationReceiver() {
- metadataApplicationEventReceiver.terminate();
- }
+// public void stopApplicationReceiver() {
+// metadataApplicationEventReceiver.terminate();
+// }
}
[11/14] stratos git commit: commenting unused methods after making
EventReceivers singleton
Posted by is...@apache.org.
commenting unused methods after making EventReceivers singleton
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/617258b9
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/617258b9
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/617258b9
Branch: refs/heads/master
Commit: 617258b9d00dacc7d6c7692dad63c60547cd6784
Parents: 933be1a
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Thu Dec 17 16:04:54 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:05:45 2015 +0530
----------------------------------------------------------------------
.../AutoscalerHealthStatEventReceiver.java | 31 -------------
.../StratosManagerServiceComponent.java | 49 --------------------
.../status/InstanceStatusEventReceiver.java | 4 --
.../topology/TopologyEventReceiver.java | 8 ----
4 files changed, 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/617258b9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
index 55dc1fa..4201ec1 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -41,28 +41,12 @@ public class AutoscalerHealthStatEventReceiver {
private static final Log log = LogFactory.getLog(AutoscalerHealthStatEventReceiver.class);
private HealthStatEventReceiver healthStatEventReceiver;
-<<<<<<< HEAD
- private ExecutorService executorService;
-=======
->>>>>>> d4b35c0... removing unused methods
public AutoscalerHealthStatEventReceiver() {
this.healthStatEventReceiver = HealthStatEventReceiver.getInstance();
addEventListeners();
}
-<<<<<<< HEAD
-// public void execute() {
-// healthStatEventReceiver.setExecutorService(executorService);
-// healthStatEventReceiver.execute();
-//
-// if (log.isInfoEnabled()) {
-// log.info("Autoscaler health stat event receiver thread started");
-// }
-// }
-
-=======
->>>>>>> d4b35c0... removing unused methods
private void addEventListeners() {
// Listen to health stat events that affect clusters
healthStatEventReceiver.addEventListener(new AverageLoadAverageEventListener() {
@@ -480,19 +464,4 @@ public class AutoscalerHealthStatEventReceiver {
TopologyManager.releaseReadLock();
}
}
-<<<<<<< HEAD
-
- public void terminate() {
- this.terminated = true;
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
-=======
->>>>>>> d4b35c0... removing unused methods
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/617258b9/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 7c421d9..aa7cc02 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -193,11 +193,6 @@ public class StratosManagerServiceComponent {
private void initializeInitializerEventReceiver() {
initializerTopicReceiver = new StratosManagerInitializerTopicReceiver();
-<<<<<<< HEAD
-// initializerTopicReceiver.setExecutorService(executorService);
-// initializerTopicReceiver.execute();
-=======
->>>>>>> d4b35c0... removing unused methods
}
/**
@@ -205,11 +200,6 @@ public class StratosManagerServiceComponent {
*/
private void initializeInstanceStatusEventReceiver() {
instanceStatusEventReceiver = new StratosManagerInstanceStatusEventReceiver();
-<<<<<<< HEAD
-// instanceStatusEventReceiver.setExecutorService(executorService);
-// instanceStatusEventReceiver.execute();
-=======
->>>>>>> d4b35c0... removing unused methods
}
/**
@@ -217,11 +207,6 @@ public class StratosManagerServiceComponent {
*/
private void initializeTopologyEventReceiver() {
topologyEventReceiver = new StratosManagerTopologyEventReceiver();
-<<<<<<< HEAD
-// topologyEventReceiver.setExecutorService(executorService);
-// topologyEventReceiver.execute();
-=======
->>>>>>> d4b35c0... removing unused methods
}
/**
@@ -229,11 +214,6 @@ public class StratosManagerServiceComponent {
*/
private void initializeApplicationEventReceiver() {
applicationEventReceiver = new StratosManagerApplicationEventReceiver();
-<<<<<<< HEAD
-// applicationEventReceiver.setExecutorService(executorService);
-// applicationEventReceiver.execute();
-=======
->>>>>>> d4b35c0... removing unused methods
}
/**
@@ -348,34 +328,5 @@ public class StratosManagerServiceComponent {
// Close event publisher connections to message broker
EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());
-<<<<<<< HEAD
-
- shutdownExecutorService(THREAD_POOL_ID);
- shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID);
- }
-
- private void shutdownExecutorService(String executorServiceId) {
- ExecutorService executorService = StratosThreadPool.getExecutorService(executorServiceId, 1);
- if (executorService != null) {
- shutdownExecutorService(executorService);
- }
- }
-
- private void shutdownScheduledExecutorService(String executorServiceId) {
- ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
- if (executorService != null) {
- shutdownExecutorService(executorService);
- }
- }
-
- private void shutdownExecutorService(ExecutorService executorService) {
- try {
- executorService.shutdownNow();
- } catch (Exception e) {
- log.warn("An error occurred while shutting down executor service", e);
- }
- }
-=======
}
->>>>>>> d4b35c0... removing unused methods
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/617258b9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index 5bc27ff..a2a1623 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -39,11 +39,7 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
private InstanceStatusEventReceiver() {
// TODO: make pool size configurable
-<<<<<<< HEAD
this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
-=======
- this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
->>>>>>> d4b35c0... removing unused methods
InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue();
this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue);
this.messageListener = new InstanceStatusEventMessageListener(messageQueue);
http://git-wip-us.apache.org/repos/asf/stratos/blob/617258b9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index 50e078a..8378486 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -117,12 +117,4 @@ public class TopologyEventReceiver extends StratosEventReceiver {
}
});
}
-
-// public ExecutorService getExecutorService() {
-// return executorService;
-// }
-//
-// public void setExecutorService(ExecutorService executorService) {
-// this.executorService = executorService;
-// }
}
[13/14] stratos git commit: adding activator class for messaging and
calling terminate of event recievers in de-activation of te bundle
Posted by is...@apache.org.
adding activator class for messaging and calling terminate of event recievers in de-activation of te bundle
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/905e140a
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/905e140a
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/905e140a
Branch: refs/heads/master
Commit: 905e140a109d2818df66ee349a878439c59d6885
Parents: 63f931f
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Thu Dec 17 23:17:57 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:06:15 2015 +0530
----------------------------------------------------------------------
...LoadBalancerCommonTopologyEventReceiver.java | 7 --
.../internal/LoadBalancerServiceComponent.java | 82 ++------------------
.../internal/MessagingServiceComponent.java | 66 ++++++++++++++++
.../status/ClusterStatusEventReceiver.java | 5 ++
.../mapping/DomainMappingEventReceiver.java | 5 ++
.../health/stat/HealthStatEventReceiver.java | 5 ++
6 files changed, 89 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
index 85142e3..93f391f 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
@@ -59,13 +59,6 @@ public class LoadBalancerCommonTopologyEventReceiver {
}
}
-// public void execute() {
-// super.execute();
-// if (log.isInfoEnabled()) {
-// log.info("Load balancer topology receiver thread started");
-// }
-// }
-
public void initializeTopology() {
if (initialized) {
return;
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 3786af8..b235208 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -25,7 +25,6 @@ import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.services.DistributedObjectProvider;
-import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.load.balancer.common.event.receivers.LoadBalancerCommonApplicationSignUpEventReceiver;
import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier;
import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
@@ -38,7 +37,6 @@ import org.apache.stratos.load.balancer.event.receivers.LoadBalancerDomainMappin
import org.apache.stratos.load.balancer.event.receivers.LoadBalancerTopologyEventReceiver;
import org.apache.stratos.load.balancer.exception.TenantAwareLoadBalanceEndpointException;
import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector;
-import org.apache.stratos.load.balancer.util.LoadBalancerConstants;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -63,7 +61,6 @@ import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import java.io.File;
import java.util.Collection;
-import java.util.concurrent.ExecutorService;
/**
* @scr.component name="org.apache.stratos.load.balancer.internal.LoadBalancerServiceComponent" immediate="true"
@@ -90,7 +87,6 @@ public class LoadBalancerServiceComponent {
private static final Log log = LogFactory.getLog(LoadBalancerServiceComponent.class);
private boolean activated = false;
- private ExecutorService executorService;
private LoadBalancerTopologyEventReceiver topologyEventReceiver;
private TenantEventReceiver tenantEventReceiver;
private LoadBalancerDomainMappingEventReceiver domainMappingEventReceiver;
@@ -124,11 +120,6 @@ public class LoadBalancerServiceComponent {
// Configure topology filters
TopologyFilterConfigurator.configure(configuration);
- int threadPoolSize = Integer.getInteger(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_SIZE_KEY,
- LoadBalancerConstants.LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE);
- executorService = StratosThreadPool.getExecutorService(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_ID,
- threadPoolSize);
-
TopologyProvider topologyProvider = LoadBalancerConfiguration.getInstance().getTopologyProvider();
if (topologyProvider == null) {
topologyProvider = new TopologyProvider();
@@ -137,18 +128,18 @@ public class LoadBalancerServiceComponent {
if (configuration.isMultiTenancyEnabled() || configuration.isDomainMappingEnabled()) {
// Start tenant & application signup event receivers
- startTenantEventReceiver(executorService);
- startApplicationSignUpEventReceiver(executorService, topologyProvider);
+ startTenantEventReceiver();
+ startApplicationSignUpEventReceiver(topologyProvider);
}
if (configuration.isDomainMappingEnabled()) {
// Start domain mapping event receiver
- startDomainMappingEventReceiver(executorService, topologyProvider);
+ startDomainMappingEventReceiver(topologyProvider);
}
if (configuration.isTopologyEventListenerEnabled()) {
// Start topology receiver
- startTopologyEventReceiver(executorService, topologyProvider);
+ startTopologyEventReceiver(topologyProvider);
}
if (configuration.isCepStatsPublisherEnabled()) {
@@ -167,43 +158,28 @@ public class LoadBalancerServiceComponent {
}
}
- private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ private void startDomainMappingEventReceiver(TopologyProvider topologyProvider) {
if (domainMappingEventReceiver != null) {
return;
}
domainMappingEventReceiver = new LoadBalancerDomainMappingEventReceiver(topologyProvider);
-// domainMappingEventReceiver.setExecutorService(executorService);
-// domainMappingEventReceiver.execute();
-// if (log.isInfoEnabled()) {
-// log.info("Domain mapping event receiver thread started");
-// }
}
- private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ private void startApplicationSignUpEventReceiver(TopologyProvider topologyProvider) {
if (applicationSignUpEventReceiver != null) {
return;
}
applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
-// applicationSignUpEventReceiver.setExecutorService(executorService);
-// applicationSignUpEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Application signup event receiver thread started");
- }
}
- private void startTopologyEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ private void startTopologyEventReceiver(TopologyProvider topologyProvider) {
if (topologyEventReceiver != null) {
return;
}
topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
-// topologyEventReceiver.setExecutorService(executorService);
-// topologyEventReceiver.execute();
-// if (log.isInfoEnabled()) {
-// log.info("Topology receiver thread started");
-// }
if (log.isInfoEnabled()) {
if (TopologyServiceFilter.getInstance().isActive()) {
@@ -223,14 +199,8 @@ public class LoadBalancerServiceComponent {
}
}
- private void startTenantEventReceiver(ExecutorService executorService) {
-
+ private void startTenantEventReceiver() {
tenantEventReceiver = TenantEventReceiver.getInstance();
-// tenantEventReceiver.setExecutorService(executorService);
-// tenantEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Tenant event receiver thread started");
- }
}
private void startStatisticsNotifier(TopologyProvider topologyProvider) {
@@ -256,33 +226,6 @@ public class LoadBalancerServiceComponent {
log.warn("An error occurred while removing endpoint deployer", e);
}
- // Terminate topology receiver
-// if (topologyEventReceiver != null) {
-// try {
-// topologyEventReceiver.terminate();
-// } catch (Exception e) {
-// log.warn("An error occurred while terminating topology event receiver", e);
-// }
-// }
-
- // Terminate application signup event receiver
-// if (applicationSignUpEventReceiver != null) {
-// try {
-// applicationSignUpEventReceiver.terminate();
-// } catch (Exception e) {
-// log.warn("An error occurred while terminating application signup event receiver", e);
-// }
-// }
-
- // Terminate domain mapping event receiver
-// if (domainMappingEventReceiver != null) {
-// try {
-// domainMappingEventReceiver.terminate();
-// } catch (Exception e) {
-// log.warn("An error occurred while terminating domain mapping event receiver", e);
-// }
-// }
-
// Terminate statistics notifier
if (statisticsNotifier != null) {
try {
@@ -291,15 +234,6 @@ public class LoadBalancerServiceComponent {
log.warn("An error occurred while terminating health statistics notifier", e);
}
}
-
- // Shutdown executor service
- if (executorService != null) {
- try {
- executorService.shutdownNow();
- } catch (Exception e) {
- log.warn("An error occurred while shutting down load balancer executor service", e);
- }
- }
}
/**
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
new file mode 100644
index 0000000..c97125b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.internal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
+import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
+import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
+import org.apache.stratos.messaging.message.receiver.domain.mapping.DomainMappingEventReceiver;
+import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
+import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.osgi.service.component.ComponentContext;
+
+/**
+ * @scr.component name="org.apache.stratos.messaging.internal.MessagingServiceComponent"
+ * immediate="true"
+ */
+public class MessagingServiceComponent {
+
+ private static final Log log = LogFactory.getLog(MessagingServiceComponent.class);
+
+ protected void activate(ComponentContext context) {
+ try {
+ log.info("Messaging Service bundle activated");
+ } catch (Exception e) {
+ log.error("Could not activate Messaging Service component", e);
+ }
+ }
+
+ protected void deactivate(ComponentContext context) {
+ // deactivate all message receivers
+ try {
+ ApplicationSignUpEventReceiver.getInstance().terminate();
+ ApplicationsEventReceiver.getInstance().terminate();
+ ClusterStatusEventReceiver.getInstance().terminate();
+ DomainMappingEventReceiver.getInstance().terminate();
+ HealthStatEventReceiver.getInstance().terminate();
+ InitializerEventReceiver.getInstance().terminate();
+ TenantEventReceiver.getInstance().terminate();
+ TopologyEventReceiver.getInstance().terminate();
+ log.info("Messaging Service component is deactivated");
+ } catch (Exception e) {
+ log.error("Could not de-activate Messaging Service component", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index e191799..be42b43 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -87,6 +87,11 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
}
}
+ public void terminate() {
+ eventSubscriber.terminate();
+ messageDelegator.terminate();
+ }
+
public boolean isSubscribed() {
return ((eventSubscriber != null) && (eventSubscriber.isSubscribed()));
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 4e4c04b..6de99c0 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -64,6 +64,11 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
return instance;
}
+ public void terminate() {
+ eventSubscriber.terminate();
+ messageDelegator.terminate();
+ }
+
private void execute() {
try {
// Start topic subscriber thread
http://git-wip-us.apache.org/repos/asf/stratos/blob/905e140a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index ba124a7..a9d2602 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -82,4 +82,9 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
}
}
}
+
+ public void terminate() {
+ eventSubscriber.terminate();
+ messageDelegator.terminate();
+ }
}
[14/14] stratos git commit: adding comments for StratosEventReceiver
abstraction,
starting the event reseivers from messaging activator and adding shutdown for
tenant, application and signup synchronizers
Posted by is...@apache.org.
adding comments for StratosEventReceiver abstraction, starting the event reseivers from messaging activator and adding shutdown for tenant, application and signup synchronizers
Conflicts:
components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c90eb9a7
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c90eb9a7
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c90eb9a7
Branch: refs/heads/master
Commit: c90eb9a728ca6b9f2ef33dbd6ceece635ddb77ab
Parents: 905e140
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Thu Dec 24 16:55:07 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:12:43 2015 +0530
----------------------------------------------------------------------
.../CloudControllerServiceComponent.java | 3 +
.../common/threading/StratosThreadPool.java | 35 +++++++++--
.../StratosManagerServiceComponent.java | 3 +
.../internal/MessagingServiceComponent.java | 21 ++++++-
.../message/receiver/StratosEventReceiver.java | 61 +++++++++++++++++++-
.../application/ApplicationsEventReceiver.java | 4 +-
.../ApplicationSignUpEventMessageDelegator.java | 4 ++
.../signup/ApplicationSignUpEventReceiver.java | 6 +-
.../ClusterStatusEventMessageDelegator.java | 4 ++
.../status/ClusterStatusEventReceiver.java | 6 +-
.../DomainMappingEventMessageDelegator.java | 4 ++
.../mapping/DomainMappingEventReceiver.java | 6 +-
.../stat/HealthStatEventMessageDelegator.java | 4 ++
.../health/stat/HealthStatEventReceiver.java | 5 +-
.../InitializerEventMessageDelegator.java | 4 ++
.../initializer/InitializerEventReceiver.java | 7 ++-
.../InstanceNotifierEventMessageDelegator.java | 4 ++
.../notifier/InstanceNotifierEventReceiver.java | 8 +--
.../InstanceStatusEventMessageDelegator.java | 4 ++
.../status/InstanceStatusEventReceiver.java | 6 +-
.../tenant/TenantEventMessageDelegator.java | 4 ++
.../receiver/tenant/TenantEventReceiver.java | 6 +-
.../topology/TopologyEventMessageDelegator.java | 4 ++
.../topology/TopologyEventReceiver.java | 6 +-
24 files changed, 186 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index c30fc63..5b01330 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -229,5 +229,8 @@ public class CloudControllerServiceComponent {
} catch (Exception e) {
log.warn("An error occurred while closing cloud controller topology event publisher", e);
}
+
+ // shutdown TopologyEventSync task
+ StratosThreadPool.shutdown(THREAD_POOL_ID);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index 687cec2..8037ce3 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -24,10 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.*;
/**
* Utility class for Stratos thread pool
@@ -37,7 +34,7 @@ public class StratosThreadPool {
private static final Log log = LogFactory.getLog(StratosThreadPool.class);
private static volatile Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<>();
- private static volatile Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
+ private static volatile Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<>();
private static Object executorServiceMapLock = new Object();
private static Object scheduledServiceMapLock = new Object();
@@ -84,4 +81,32 @@ public class StratosThreadPool {
}
return scheduledExecutorService;
}
+
+ public static void shutdown (String identifier) {
+
+ ExecutorService executorService = executorServiceMap.get(identifier);
+ if (executorService == null) {
+ log.warn("No executor service found for id " + identifier + ", unable to shut down");
+ return;
+ }
+
+ // try to shut down gracefully
+ executorService.shutdown();
+ // wait 10 secs till terminated
+ try {
+ if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.info("Thread Pool [id] " + identifier + " did not finish all tasks before " +
+ "timeout, forcefully shutting down");
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ // interrupted, shutdown now
+ executorService.shutdownNow();
+ }
+
+ // remove from the map
+ executorServiceMap.remove(identifier);
+
+ log.info("Successfully shutdown thread pool associated with id: " + identifier);
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index aa7cc02..5bd3f76 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -328,5 +328,8 @@ public class StratosManagerServiceComponent {
// Close event publisher connections to message broker
EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());
+
+ // shut down the scheduled thread pool
+ StratosThreadPool.shutdown(THREAD_POOL_ID);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
index c97125b..b582d56 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
@@ -21,6 +21,8 @@ package org.apache.stratos.messaging.internal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
@@ -40,8 +42,20 @@ public class MessagingServiceComponent {
private static final Log log = LogFactory.getLog(MessagingServiceComponent.class);
protected void activate(ComponentContext context) {
+ // activate all message receivers
try {
- log.info("Messaging Service bundle activated");
+ ApplicationSignUpEventReceiver.getInstance();
+ ApplicationsEventReceiver.getInstance();
+ ClusterStatusEventReceiver.getInstance();
+ DomainMappingEventReceiver.getInstance();
+ HealthStatEventReceiver.getInstance();
+ InitializerEventReceiver.getInstance();
+ TenantEventReceiver.getInstance();
+ TopologyEventReceiver.getInstance();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Messaging Service bundle activated");
+ }
} catch (Exception e) {
log.error("Could not activate Messaging Service component", e);
}
@@ -58,7 +72,10 @@ public class MessagingServiceComponent {
InitializerEventReceiver.getInstance().terminate();
TenantEventReceiver.getInstance().terminate();
TopologyEventReceiver.getInstance().terminate();
- log.info("Messaging Service component is deactivated");
+ StratosThreadPool.shutdown(StratosEventReceiver.STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID);
+ if (log.isDebugEnabled()) {
+ log.debug("Messaging Service component is deactivated");
+ }
} catch (Exception e) {
log.error("Could not de-activate Messaging Service component", e);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
index 5ac89e6..8c29816 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
@@ -19,12 +19,71 @@
package org.apache.stratos.messaging.message.receiver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.listener.EventListener;
+
import java.util.concurrent.ExecutorService;
-public class StratosEventReceiver {
+/**
+ * Abstraction for Event Receivers used in Stratos
+ */
+public abstract class StratosEventReceiver {
+
+ private static final Log log = LogFactory.getLog(StratosEventReceiver.class);
+
+ /**
+ * Thread pool information for all StratosEventReceiver implementations
+ */
+
+ public static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID = "stratos-event-receiver-pool";
+ private static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE = "stratos.event.receiver.pool.size";
+ // thread pool id
+ protected String threadPoolId;
+ // executor service used
protected ExecutorService executorService;
+ // pool size
+ protected static int threadPoolSize = 15;
+
+ static {
+ // check if the thread pool size is given as a system parameter
+ String poolSize = System.getProperty(STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE);
+ if (poolSize != null) {
+ try {
+ threadPoolSize = Integer.parseInt(poolSize);
+ } catch (NumberFormatException e) {
+ log.error("Invalid configuration found for StratosEventReceiver thread pool size", e);
+ threadPoolSize = 15;
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Number of threads used in pool " + STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID + " : " + threadPoolSize);
+ }
+ }
public StratosEventReceiver () {
+ this.threadPoolId = STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID;
+ this.executorService = StratosThreadPool.getExecutorService(threadPoolId, threadPoolSize);
}
+
+ /**
+ * Adds an EventListener to this StratosEventReceiver instance
+ *
+ * @param eventListener EventListener instance to add
+ */
+ public abstract void addEventListener(EventListener eventListener);
+
+ /**
+ * Removed an EventListener from this StratosEventReceiver instance
+ *
+ * @param eventListener EventListener instance to remove
+ */
+ public abstract void removeEventListener(EventListener eventListener);
+
+ /**
+ * Terminates this StratosEventReceiver instance
+ */
+ public abstract void terminate();
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
index 69dba01..89dd73e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
@@ -38,8 +38,6 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{
private static volatile ApplicationsEventReceiver instance;
private ApplicationsEventReceiver() {
- // TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("application-event-receiver", 100);
ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue();
this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue);
this.messageListener = new ApplicationsEventMessageListener(messageQueue);
@@ -66,7 +64,7 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{
messageDelegator.removeEventListener(eventListener);
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(),
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
index adf805d..59374bb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
@@ -46,6 +46,10 @@ class ApplicationSignUpEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index df90cf9..5ad6070 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -43,8 +43,6 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
private static volatile ApplicationSignUpEventReceiver instance;
private ApplicationSignUpEventReceiver() {
- // TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100);
ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue();
this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue);
this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue);
@@ -67,6 +65,10 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
private void execute() {
try {
// Start topic subscriber thread
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
index 5c9c502..954d9be 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
@@ -46,6 +46,10 @@ class ClusterStatusEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index be42b43..9de351b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -39,8 +39,6 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
private static volatile ClusterStatusEventReceiver instance;
private ClusterStatusEventReceiver() {
- // TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100);
ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue);
this.messageListener = new ClusterStatusEventMessageListener(messageQueue);
@@ -51,6 +49,10 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
public static ClusterStatusEventReceiver getInstance () {
if (instance == null) {
synchronized (ClusterStatusEventReceiver.class) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
index fa783a9..03154f2 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
@@ -46,6 +46,10 @@ class DomainMappingEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 6de99c0..6c88f73 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -40,8 +40,6 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
private static volatile DomainMappingEventReceiver instance;
private DomainMappingEventReceiver() {
- // TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("domainmapping-event-receiver", 100);
DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue();
this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue);
this.messageListener = new DomainMappingEventMessageListener(messageQueue);
@@ -52,6 +50,10 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
public static DomainMappingEventReceiver getInstance () {
if (instance == null) {
synchronized (DomainMappingEventReceiver.class) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
index 2cde2a9..29fb47b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
@@ -48,6 +48,10 @@ class HealthStatEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index a9d2602..442bdb6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -39,8 +39,6 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
private static volatile HealthStatEventReceiver instance;
private HealthStatEventReceiver() {
- // TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("healthstat-event-receiver", 100);
HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue();
this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue);
this.messageListener = new HealthStatEventMessageListener(messageQueue);
@@ -63,6 +61,9 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
private void execute() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
index ffd2ae4..baca350 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
@@ -41,6 +41,10 @@ public class InitializerEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
index 805a8bf..c7e5daf 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
@@ -26,7 +26,6 @@ import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
-import java.util.concurrent.ExecutorService;
public class InitializerEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(InitializerEventReceiver.class);
@@ -38,8 +37,6 @@ public class InitializerEventReceiver extends StratosEventReceiver {
//private ExecutorService executorService;
private InitializerEventReceiver() {
- // TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("initializer-event-receiver", 100);
InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue();
this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue);
this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue);
@@ -62,6 +59,10 @@ public class InitializerEventReceiver extends StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
private void execute() {
try {
// Start topic subscriber thread
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
index 73ef9fe..b695db7 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
@@ -46,6 +46,10 @@ class InstanceNotifierEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index e0b8e9f..5bcd75a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -39,8 +39,6 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
//private boolean terminated;
private InstanceNotifierEventReceiver() {
- // TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue();
this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue);
this.messageListener = new InstanceNotifierEventMessageListener(messageQueue);
@@ -63,6 +61,10 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
private void execute() {
try {
// Start topic subscriber thread
@@ -94,7 +96,5 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
public synchronized void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
- //terminated = true;
- log.info("InstanceNotifierEventReceiver terminated");
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
index 9f754b0..e5df65e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
@@ -46,6 +46,10 @@ class InstanceStatusEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index a565ea9..3d9f793 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -38,8 +38,6 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
private static volatile InstanceStatusEventReceiver instance;
private InstanceStatusEventReceiver() {
- // TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue();
this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue);
this.messageListener = new InstanceStatusEventMessageListener(messageQueue);
@@ -62,6 +60,9 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
private void execute() {
try {
@@ -91,6 +92,5 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
- // terminated = true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
index c735d9b..cd8724c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
@@ -48,6 +48,10 @@ class TenantEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index a52cb20..e30d3ab 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -42,8 +42,6 @@ public class TenantEventReceiver extends StratosEventReceiver {
private static volatile TenantEventReceiver instance;
private TenantEventReceiver() {
- // TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("tenant-event-receiver", 100);
TenantEventMessageQueue messageQueue = new TenantEventMessageQueue();
this.messageDelegator = new TenantEventMessageDelegator(messageQueue);
this.messageListener = new TenantEventMessageListener(messageQueue);
@@ -66,6 +64,10 @@ public class TenantEventReceiver extends StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
private void execute() {
try {
// Start topic subscriber thread
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 8508d91..d2664f4 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -47,6 +47,10 @@ class TopologyEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index bfa3950..4f1f254 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -44,8 +44,6 @@ public class TopologyEventReceiver extends StratosEventReceiver {
private static volatile TopologyEventReceiver instance;
private TopologyEventReceiver() {
- // TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue();
this.messageDelegator = new TopologyEventMessageDelegator(messageQueue);
this.messageListener = new TopologyEventMessageListener(messageQueue);
@@ -68,6 +66,10 @@ public class TopologyEventReceiver extends StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
private void execute() {
try {
// Start topic subscriber thread
[06/14] stratos git commit: making DomainMappingEventReceiver
singleton and fixing references in components
Posted by is...@apache.org.
making DomainMappingEventReceiver singleton and fixing references in components
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c627ff18
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c627ff18
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c627ff18
Branch: refs/heads/master
Commit: c627ff18f6c4d84c3cd42762e580e55128d80e07
Parents: 6ac3b87
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 18:22:37 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:04:39 2015 +0530
----------------------------------------------------------------------
...alancerCommonDomainMappingEventReceiver.java | 8 ++++---
.../extension/api/LoadBalancerExtension.java | 9 ++++----
.../internal/LoadBalancerServiceComponent.java | 24 ++++++++++----------
.../mapping/DomainMappingEventReceiver.java | 21 ++++++++++++++---
4 files changed, 40 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/c627ff18/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonDomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonDomainMappingEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonDomainMappingEventReceiver.java
index a51ed22..c0d9f81 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonDomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonDomainMappingEventReceiver.java
@@ -34,13 +34,15 @@ import org.apache.stratos.messaging.message.receiver.domain.mapping.DomainMappin
* Load balancer common domain mapping event receiver updates the topology in the given topology provider
* with the domains found in domain mapping events.
*/
-public class LoadBalancerCommonDomainMappingEventReceiver extends DomainMappingEventReceiver {
+public class LoadBalancerCommonDomainMappingEventReceiver {
private static final Log log = LogFactory.getLog(LoadBalancerCommonDomainMappingEventReceiver.class);
private TopologyProvider topologyProvider;
+ private DomainMappingEventReceiver domainMappingEventReceiver;
public LoadBalancerCommonDomainMappingEventReceiver(TopologyProvider topologyProvider) {
+ this.domainMappingEventReceiver = DomainMappingEventReceiver.getInstance();
this.topologyProvider = topologyProvider;
addEventListeners();
}
@@ -57,7 +59,7 @@ public class LoadBalancerCommonDomainMappingEventReceiver extends DomainMappingE
* domain mapping events.
*/
public void addEventListeners() {
- addEventListener(new DomainMappingAddedEventListener() {
+ domainMappingEventReceiver.addEventListener(new DomainMappingAddedEventListener() {
@Override
protected void onEvent(Event event) {
DomainMappingAddedEvent domainMappingAddedEvent = (DomainMappingAddedEvent) event;
@@ -75,7 +77,7 @@ public class LoadBalancerCommonDomainMappingEventReceiver extends DomainMappingE
}
});
- addEventListener(new DomainMappingRemovedEventListener() {
+ domainMappingEventReceiver.addEventListener(new DomainMappingRemovedEventListener() {
@Override
protected void onEvent(Event event) {
DomainMappingRemovedEvent domainMappingRemovedEvent = (DomainMappingRemovedEvent) event;
http://git-wip-us.apache.org/repos/asf/stratos/blob/c627ff18/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index d2a8cb3..c2ee077 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -39,6 +39,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
+import org.apache.stratos.messaging.message.receiver.domain.mapping.DomainMappingEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import java.util.concurrent.ExecutorService;
@@ -170,8 +171,8 @@ public class LoadBalancerExtension {
addDomainMappingsEventListeners(domainMappingEventReceiver);
// Add default domain mapping event listeners
domainMappingEventReceiver.addEventListeners();
- domainMappingEventReceiver.setExecutorService(executorService);
- domainMappingEventReceiver.execute();
+// domainMappingEventReceiver.setExecutorService(executorService);
+// domainMappingEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Domain mapping event receiver thread started");
}
@@ -179,14 +180,14 @@ public class LoadBalancerExtension {
private void addDomainMappingsEventListeners(final LoadBalancerCommonDomainMappingEventReceiver
domainMappingEventReceiver) {
- domainMappingEventReceiver.addEventListener(new DomainMappingAddedEventListener() {
+ DomainMappingEventReceiver.getInstance().addEventListener(new DomainMappingAddedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
- domainMappingEventReceiver.addEventListener(new DomainMappingRemovedEventListener() {
+ DomainMappingEventReceiver.getInstance().addEventListener(new DomainMappingRemovedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
http://git-wip-us.apache.org/repos/asf/stratos/blob/c627ff18/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index e772322..3786af8 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -173,11 +173,11 @@ public class LoadBalancerServiceComponent {
}
domainMappingEventReceiver = new LoadBalancerDomainMappingEventReceiver(topologyProvider);
- domainMappingEventReceiver.setExecutorService(executorService);
- domainMappingEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Domain mapping event receiver thread started");
- }
+// domainMappingEventReceiver.setExecutorService(executorService);
+// domainMappingEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Domain mapping event receiver thread started");
+// }
}
private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
@@ -275,13 +275,13 @@ public class LoadBalancerServiceComponent {
// }
// Terminate domain mapping event receiver
- if (domainMappingEventReceiver != null) {
- try {
- domainMappingEventReceiver.terminate();
- } catch (Exception e) {
- log.warn("An error occurred while terminating domain mapping event receiver", e);
- }
- }
+// if (domainMappingEventReceiver != null) {
+// try {
+// domainMappingEventReceiver.terminate();
+// } catch (Exception e) {
+// log.warn("An error occurred while terminating domain mapping event receiver", e);
+// }
+// }
// Terminate statistics notifier
if (statisticsNotifier != null) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c627ff18/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 1d582b5..3c723a3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -21,8 +21,10 @@ package org.apache.stratos.messaging.message.receiver.domain.mapping;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
@@ -30,16 +32,18 @@ import java.util.concurrent.ExecutorService;
/**
* Domain mapping event receiver.
*/
-public class DomainMappingEventReceiver {
+public class DomainMappingEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(DomainMappingEventReceiver.class);
private DomainMappingEventMessageDelegator messageDelegator;
private DomainMappingEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private ExecutorService executorService;
+ private static volatile DomainMappingEventReceiver instance;
- public DomainMappingEventReceiver() {
+ private DomainMappingEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("domainmapping-event-receiver", 100);
DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue();
this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue);
this.messageListener = new DomainMappingEventMessageListener(messageQueue);
@@ -49,6 +53,17 @@ public class DomainMappingEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public static DomainMappingEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (DomainMappingEventReceiver.class) {
+ if (instance == null) {
+ instance = new DomainMappingEventReceiver();
+ }
+ }
+ }
+
+ return instance;
+ }
public void execute() {
try {
[03/14] stratos git commit: making TenantEventReceiver a singleton
and fixing the references in the components
Posted by is...@apache.org.
making TenantEventReceiver a singleton and fixing the references in the components
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/170c27cc
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/170c27cc
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/170c27cc
Branch: refs/heads/master
Commit: 170c27cc61aaddcb2a28239e98ad11b438b421bb
Parents: b3dc546
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 17:49:15 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:04:05 2015 +0530
----------------------------------------------------------------------
.../agent/CartridgeAgentEventListeners.java | 4 +--
.../internal/LoadBalancerServiceComponent.java | 6 ++--
.../receiver/tenant/TenantEventReceiver.java | 31 +++++++++++++++-----
3 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/170c27cc/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
index 1d64ff0..ffa3750 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
@@ -79,8 +79,8 @@ public class CartridgeAgentEventListeners {
this.instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
- this.tenantEventReceiver = new TenantEventReceiver();
- this.tenantEventReceiver.setExecutorService(eventListenerExecutorService);
+ this.tenantEventReceiver = TenantEventReceiver.getInstance();
+// this.tenantEventReceiver.setExecutorService(eventListenerExecutorService);
extensionHandler = new DefaultExtensionHandler();
http://git-wip-us.apache.org/repos/asf/stratos/blob/170c27cc/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 442686a..e772322 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -225,9 +225,9 @@ public class LoadBalancerServiceComponent {
private void startTenantEventReceiver(ExecutorService executorService) {
- tenantEventReceiver = new TenantEventReceiver();
- tenantEventReceiver.setExecutorService(executorService);
- tenantEventReceiver.execute();
+ tenantEventReceiver = TenantEventReceiver.getInstance();
+// tenantEventReceiver.setExecutorService(executorService);
+// tenantEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Tenant event receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/170c27cc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index 988a2ce..1c519b9 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.receiver.tenant;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.event.initializer.CompleteTenantRequestEvent;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
@@ -34,28 +36,43 @@ import java.util.concurrent.ExecutorService;
* A thread for receiving tenant information from message broker and
* build tenant information in tenant manager.
*/
-public class TenantEventReceiver {
+public class TenantEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(TenantEventReceiver.class);
private TenantEventMessageDelegator messageDelegator;
private TenantEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private ExecutorService executorService;
+ private static volatile TenantEventReceiver instance;
- public TenantEventReceiver() {
+ private TenantEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("tenant-event-receiver", 100);
TenantEventMessageQueue messageQueue = new TenantEventMessageQueue();
this.messageDelegator = new TenantEventMessageDelegator(messageQueue);
this.messageListener = new TenantEventMessageListener(messageQueue);
+ execute();
+ }
+
+ public static TenantEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (TenantEventReceiver.class) {
+ if (instance == null) {
+ instance = new TenantEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.TENANT_TOPIC.getTopicName(), messageListener);
[10/14] stratos git commit: removing unused methods
Posted by is...@apache.org.
removing unused methods
Conflicts:
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/933be1a1
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/933be1a1
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/933be1a1
Branch: refs/heads/master
Commit: 933be1a145946d81f59de58a865758c3c3155a35
Parents: 412cb2c
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Tue Dec 8 07:32:06 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:05:30 2015 +0530
----------------------------------------------------------------------
.../AutoscalerHealthStatEventReceiver.java | 10 ++-
.../AutoscalerInitializerTopicReceiver.java | 17 -----
.../AutoscalerTopologyEventReceiver.java | 17 -----
.../internal/AutoscalerServiceComponent.java | 67 +-------------------
.../CloudControllerServiceComponent.java | 59 -----------------
.../application/ApplicationEventReceiver.java | 13 ----
.../StratosManagerServiceComponent.java | 16 +++++
.../mapping/DomainMappingEventReceiver.java | 13 ----
.../notifier/InstanceNotifierEventReceiver.java | 36 -----------
.../status/InstanceStatusEventReceiver.java | 4 ++
.../tests/PythonAgentIntegrationTest.java | 7 +-
.../integration/common/TopologyHandler.java | 12 +---
12 files changed, 37 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
index 0b13500..55dc1fa 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -39,16 +39,19 @@ import java.util.concurrent.ExecutorService;
public class AutoscalerHealthStatEventReceiver {
private static final Log log = LogFactory.getLog(AutoscalerHealthStatEventReceiver.class);
- private boolean terminated = false;
private HealthStatEventReceiver healthStatEventReceiver;
+<<<<<<< HEAD
private ExecutorService executorService;
+=======
+>>>>>>> d4b35c0... removing unused methods
public AutoscalerHealthStatEventReceiver() {
this.healthStatEventReceiver = HealthStatEventReceiver.getInstance();
addEventListeners();
}
+<<<<<<< HEAD
// public void execute() {
// healthStatEventReceiver.setExecutorService(executorService);
// healthStatEventReceiver.execute();
@@ -58,6 +61,8 @@ public class AutoscalerHealthStatEventReceiver {
// }
// }
+=======
+>>>>>>> d4b35c0... removing unused methods
private void addEventListeners() {
// Listen to health stat events that affect clusters
healthStatEventReceiver.addEventListener(new AverageLoadAverageEventListener() {
@@ -475,6 +480,7 @@ public class AutoscalerHealthStatEventReceiver {
TopologyManager.releaseReadLock();
}
}
+<<<<<<< HEAD
public void terminate() {
this.terminated = true;
@@ -487,4 +493,6 @@ public class AutoscalerHealthStatEventReceiver {
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
+=======
+>>>>>>> d4b35c0... removing unused methods
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
index b330211..33b93ed 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
@@ -31,21 +31,12 @@ import java.util.concurrent.ExecutorService;
public class AutoscalerInitializerTopicReceiver {
private static final Log log = LogFactory.getLog(AutoscalerInitializerTopicReceiver.class);
private InitializerEventReceiver initializerEventReceiver;
- private ExecutorService executorService;
public AutoscalerInitializerTopicReceiver() {
this.initializerEventReceiver = InitializerEventReceiver.getInstance();
addEventListeners();
}
-// public void execute() {
-// initializerEventReceiver.setExecutorService(executorService);
-// initializerEventReceiver.execute();
-// if (log.isInfoEnabled()) {
-// log.info("Cloud controller initializer topic receiver started");
-// }
-// }
-
private void addEventListeners() {
initializerEventReceiver.addEventListener(new CompleteApplicationsRequestEventListener() {
@Override
@@ -61,12 +52,4 @@ public class AutoscalerInitializerTopicReceiver {
}
});
}
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index daa70ae..e2b417e 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -55,7 +55,6 @@ import java.util.concurrent.ExecutorService;
public class AutoscalerTopologyEventReceiver {
private static final Log log = LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
private TopologyEventReceiver topologyEventReceiver;
- private boolean terminated;
private boolean topologyInitialized;
private ExecutorService executorService;
@@ -506,20 +505,4 @@ public class AutoscalerTopologyEventReceiver {
}
});
}
-
- /**
- * Terminate load balancer topology receiver thread.
- */
-// public void terminate() {
-// topologyEventReceiver.terminate();
-// terminated = true;
-// }
-//
-// public ExecutorService getExecutorService() {
-// return executorService;
-// }
-//
-// public void setExecutorService(ExecutorService executorService) {
-// this.executorService = executorService;
-// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index bb28577..881c3ab 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -173,27 +173,10 @@ public class AutoscalerServiceComponent {
// Start topology receiver
asTopologyReceiver = new AutoscalerTopologyEventReceiver();
-// asTopologyReceiver.setExecutorService(executorService);
- //asTopologyReceiver.execute();
- if (log.isDebugEnabled()) {
- log.debug("Topology receiver executor service started");
- }
-
// Start health stat receiver
autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
-// autoscalerHealthStatEventReceiver.setExecutorService(executorService);
-// autoscalerHealthStatEventReceiver.execute();
- if (log.isDebugEnabled()) {
- log.debug("Health statistics receiver thread started");
- }
-
// Start initializer receiver
autoscalerInitializerTopicReceiver = new AutoscalerInitializerTopicReceiver();
-// autoscalerInitializerTopicReceiver.setExecutorService(executorService);
-// autoscalerInitializerTopicReceiver.execute();
-// if (log.isDebugEnabled()) {
-// log.debug("Initializer receiver thread started");
-// }
if (log.isInfoEnabled()) {
log.info("Scheduling tasks to publish applications");
@@ -245,54 +228,8 @@ public class AutoscalerServiceComponent {
}
protected void deactivate(ComponentContext context) {
-// if (asTopologyReceiver != null) {
-// try {
-// asTopologyReceiver.terminate();
-// } catch (Exception e) {
-// log.warn("An error occurred while terminating autoscaler topology event receiver", e);
-// }
-// }
-
- if (autoscalerHealthStatEventReceiver != null) {
- try {
- autoscalerHealthStatEventReceiver.terminate();
- } catch (Exception e) {
- log.warn("An error occurred while terminating autoscaler health statistics event receiver", e);
- }
- }
-
- // Shutdown executor service
- shutdownExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID);
-
- // Shutdown scheduler
- shutdownScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID);
-
- // Shutdown application monitor executor service
- shutdownExecutorService(AutoscalerConstants.MONITOR_THREAD_POOL_ID);
-
- // Shutdown cluster monitor scheduler executor service
- shutdownScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID);
- }
-
- private void shutdownExecutorService(String executorServiceId) {
- ExecutorService executorService = StratosThreadPool.getExecutorService(executorServiceId, 1);
- if (executorService != null) {
- shutdownExecutorService(executorService);
- }
- }
-
- private void shutdownScheduledExecutorService(String executorServiceId) {
- ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
- if (executorService != null) {
- shutdownExecutorService(executorService);
- }
- }
-
- private void shutdownExecutorService(ExecutorService executorService) {
- try {
- executorService.shutdownNow();
- } catch (Exception e) {
- log.warn("An error occurred while shutting down executor service", e);
+ if (log.isDebugEnabled()) {
+ log.debug("Autoscaler Component de-activated");
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 267d5a8..c30fc63 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -146,40 +146,9 @@ public class CloudControllerServiceComponent {
private void executeCoordinatorTasks() {
applicationEventReceiver = new ApplicationEventReceiver();
-// applicationEventReceiver.setExecutorService(executorService);
-// applicationEventReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Application event receiver thread started");
- }
-
clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
-// clusterStatusTopicReceiver.setExecutorService(executorService);
-// clusterStatusTopicReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Cluster status event receiver thread started");
- }
-
instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
-// instanceStatusTopicReceiver.setExecutorService(executorService);
-// instanceStatusTopicReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Instance status event receiver thread started");
- }
-
initializerTopicReceiver = new InitializerTopicReceiver();
-// initializerTopicReceiver.setExecutorService(executorService);
-// initializerTopicReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Initializer event receiver thread started");
- }
-
- if (log.isInfoEnabled()) {
- log.info("Scheduling topology synchronizer task");
- }
Runnable topologySynchronizer = new TopologyEventSynchronizer();
scheduler.scheduleAtFixedRate(topologySynchronizer, 0, 1, TimeUnit.MINUTES);
}
@@ -260,33 +229,5 @@ public class CloudControllerServiceComponent {
} catch (Exception e) {
log.warn("An error occurred while closing cloud controller topology event publisher", e);
}
-
- // Shutdown executor service
- shutdownExecutorService(THREAD_POOL_ID);
-
- // Shutdown scheduler
- shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID);
- }
-
- private void shutdownExecutorService(String executorServiceId) {
- ExecutorService executorService = StratosThreadPool.getExecutorService(executorServiceId, 1);
- if (executorService != null) {
- shutdownExecutorService(executorService);
- }
- }
-
- private void shutdownScheduledExecutorService(String executorServiceId) {
- ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
- if (executorService != null) {
- shutdownExecutorService(executorService);
- }
- }
-
- private void shutdownExecutorService(ExecutorService executorService) {
- try {
- executorService.shutdownNow();
- } catch (Exception e) {
- log.warn("An error occurred while shutting down executor service", e);
- }
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
index 8da5575..65bef23 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
@@ -37,21 +37,12 @@ import java.util.concurrent.ExecutorService;
public class ApplicationEventReceiver {
private static final Log log = LogFactory.getLog(ApplicationEventReceiver.class);
private ApplicationsEventReceiver applicationsEventReceiver;
- // private ExecutorService executorService;
public ApplicationEventReceiver() {
this.applicationsEventReceiver = ApplicationsEventReceiver.getInstance();
addEventListeners();
}
-// public void execute() {
-// if (log.isInfoEnabled()) {
-// log.info("Cloud controller application event receiver thread started");
-// }
-// applicationsEventReceiver.setExecutorService(executorService);
-// applicationsEventReceiver.execute();
-// }
-
private void addEventListeners() {
applicationsEventReceiver.addEventListener(new ApplicationInstanceTerminatedEventListener() {
@Override
@@ -75,8 +66,4 @@ public class ApplicationEventReceiver {
}
});
}
-
-// public void setExecutorService(ExecutorService executorService) {
-// this.executorService = executorService;
-// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index c4d68ae..7c421d9 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -193,8 +193,11 @@ public class StratosManagerServiceComponent {
private void initializeInitializerEventReceiver() {
initializerTopicReceiver = new StratosManagerInitializerTopicReceiver();
+<<<<<<< HEAD
// initializerTopicReceiver.setExecutorService(executorService);
// initializerTopicReceiver.execute();
+=======
+>>>>>>> d4b35c0... removing unused methods
}
/**
@@ -202,8 +205,11 @@ public class StratosManagerServiceComponent {
*/
private void initializeInstanceStatusEventReceiver() {
instanceStatusEventReceiver = new StratosManagerInstanceStatusEventReceiver();
+<<<<<<< HEAD
// instanceStatusEventReceiver.setExecutorService(executorService);
// instanceStatusEventReceiver.execute();
+=======
+>>>>>>> d4b35c0... removing unused methods
}
/**
@@ -211,8 +217,11 @@ public class StratosManagerServiceComponent {
*/
private void initializeTopologyEventReceiver() {
topologyEventReceiver = new StratosManagerTopologyEventReceiver();
+<<<<<<< HEAD
// topologyEventReceiver.setExecutorService(executorService);
// topologyEventReceiver.execute();
+=======
+>>>>>>> d4b35c0... removing unused methods
}
/**
@@ -220,8 +229,11 @@ public class StratosManagerServiceComponent {
*/
private void initializeApplicationEventReceiver() {
applicationEventReceiver = new StratosManagerApplicationEventReceiver();
+<<<<<<< HEAD
// applicationEventReceiver.setExecutorService(executorService);
// applicationEventReceiver.execute();
+=======
+>>>>>>> d4b35c0... removing unused methods
}
/**
@@ -336,6 +348,7 @@ public class StratosManagerServiceComponent {
// Close event publisher connections to message broker
EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());
+<<<<<<< HEAD
shutdownExecutorService(THREAD_POOL_ID);
shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID);
@@ -362,4 +375,7 @@ public class StratosManagerServiceComponent {
log.warn("An error occurred while shutting down executor service", e);
}
}
+=======
+ }
+>>>>>>> d4b35c0... removing unused methods
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 6b79873..a37941c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -91,17 +91,4 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
}
}
}
-
-// public void terminate() {
-// eventSubscriber.terminate();
-// messageDelegator.terminate();
-// }
-//
-// public ExecutorService getExecutorService() {
-// return executorService;
-// }
-//
-// public void setExecutorService(ExecutorService executorService) {
-// this.executorService = executorService;
-// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index cfc7f11..33f5de8 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -66,42 +66,6 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
-// public void execute() {
-// synchronized (this) {
-// if (terminated) {
-// log.info("InstanceNotifierEventReceiver has been terminated. Event subscriber will not be created.");
-// return;
-// }
-// try {
-// Thread subscriberThread = new Thread(eventSubscriber);
-// subscriberThread.start();
-// if (log.isDebugEnabled()) {
-// log.debug("InstanceNotifier event message receiver thread started");
-// }
-//
-// // Start instance notifier event message delegator thread
-// Thread receiverThread = new Thread(messageDelegator);
-// receiverThread.start();
-// if (log.isDebugEnabled()) {
-// log.debug("InstanceNotifier event message delegator thread started");
-// }
-// } catch (Exception e) {
-// if (log.isErrorEnabled()) {
-// log.error("InstanceNotifier receiver failed", e);
-// }
-// }
-// }
-// log.info("InstanceNotifierEventReceiver started");
-//
-// // Keep the thread live until terminated
-// while (!terminated) {
-// try {
-// Thread.sleep(2000);
-// } catch (InterruptedException ignore) {
-// }
-// }
-// }
-
private void execute() {
try {
// Start topic subscriber thread
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index a2a1623..5bc27ff 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -39,7 +39,11 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
private InstanceStatusEventReceiver() {
// TODO: make pool size configurable
+<<<<<<< HEAD
this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
+=======
+ this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
+>>>>>>> d4b35c0... removing unused methods
InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue();
this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue);
this.messageListener = new InstanceStatusEventMessageListener(messageQueue);
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index 4a86e40..aa0a802 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -124,7 +124,8 @@ public abstract class PythonAgentIntegrationTest {
startActiveMQInstance(Integer.parseInt(amqpBindPorts[i]), Integer.parseInt(mqttBindPorts[i]), true);
}
- ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize);
+ //ExecutorService executorService = StratosThreadPool.getExecutorService
+ // ("TEST_THREAD_POOL");
topologyEventReceiver = TopologyEventReceiver.getInstance();
// topologyEventReceiver.setExecutorService(executorService);
// topologyEventReceiver.execute();
@@ -149,9 +150,7 @@ public abstract class PythonAgentIntegrationTest {
}
});
- initializerEventReceiver = new InitializerEventReceiver();
- initializerEventReceiver.setExecutorService(executorService);
- initializerEventReceiver.execute();
+ initializerEventReceiver = InitializerEventReceiver.getInstance();
this.eventReceiverInitialized = true;
http://git-wip-us.apache.org/repos/asf/stratos/blob/933be1a1/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
index e506ef7..d115970 100644
--- a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
+++ b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
@@ -114,17 +114,14 @@ public class TopologyHandler {
}
private void initializeTenantEventReceiver() {
- tenantEventReceiver = new TenantEventReceiver();
- tenantEventReceiver.setExecutorService(executorService);
- tenantEventReceiver.execute();
+ tenantEventReceiver = TenantEventReceiver.getInstance();
}
/**
* Initialize application event receiver
*/
private void initializeHealthStatsEventReceiver() {
- healthStatEventReceiver = new HealthStatEventReceiver();
- healthStatEventReceiver.setExecutorService(executorService);
+ healthStatEventReceiver = HealthStatEventReceiver.getInstance();
healthStatEventReceiver.addEventListener(new MemberFaultEventListener() {
@Override
protected void onEvent(Event event) {
@@ -133,15 +130,13 @@ public class TopologyHandler {
memberFaultEvent.getMemberId()));
}
});
- healthStatEventReceiver.execute();
}
/**
* Initialize application event receiver
*/
private void initializeApplicationEventReceiver() {
- applicationsEventReceiver = new ApplicationsEventReceiver();
- applicationsEventReceiver.setExecutorService(executorService);
+ applicationsEventReceiver = ApplicationsEventReceiver.getInstance();
applicationsEventReceiver.addEventListener(new ApplicationInstanceActivatedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -164,7 +159,6 @@ public class TopologyHandler {
appInstanceInactivatedEvent.getInstanceId()));
}
});
- applicationsEventReceiver.execute();
}
/**