You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2016/01/05 16:47:11 UTC
[02/50] [abbrv] stratos git commit: making TopologyEventReceiver and
ApplicationSignupEventReceiver a singleton
making TopologyEventReceiver and ApplicationSignupEventReceiver a singleton
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/033ab1f6
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/033ab1f6
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/033ab1f6
Branch: refs/heads/stratos-4.1.x
Commit: 033ab1f6c355c1847168da54393c73e22ad5c2ac
Parents: bb22134
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 15:33:21 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Mon Dec 7 18:48:09 2015 +0530
----------------------------------------------------------------------
.../AutoscalerTopologyEventReceiver.java | 18 +++----
.../stratos/cartridge/agent/CartridgeAgent.java | 54 ++++++++++----------
.../agent/CartridgeAgentEventListeners.java | 43 ++++++++--------
.../agent/test/JavaCartridgeAgentTest.java | 6 +--
...cerCommonApplicationSignUpEventReceiver.java | 11 ++--
...LoadBalancerCommonTopologyEventReceiver.java | 37 +++++++-------
.../extension/api/LoadBalancerExtension.java | 2 -
.../internal/LoadBalancerServiceComponent.java | 18 +++----
.../StratosManagerServiceComponent.java | 4 +-
.../StratosManagerTopologyEventReceiver.java | 18 +++----
.../message/receiver/StratosEventReceiver.java | 30 +++++++++++
.../signup/ApplicationSignUpEventReceiver.java | 37 ++++++++++----
.../topology/TopologyEventReceiver.java | 39 ++++++++++----
.../mock/iaas/services/impl/MockInstance.java | 6 +--
14 files changed, 193 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 8336f86..6fd64a7 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -60,18 +60,18 @@ public class AutoscalerTopologyEventReceiver {
private ExecutorService executorService;
public AutoscalerTopologyEventReceiver() {
- this.topologyEventReceiver = new TopologyEventReceiver();
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
addEventListeners();
}
- public void execute() {
- //FIXME this activated before autoscaler deployer activated.
- topologyEventReceiver.setExecutorService(getExecutorService());
- topologyEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Autoscaler topology receiver thread started");
- }
- }
+// public void execute() {
+// //FIXME this activated before autoscaler deployer activated.
+// // topologyEventReceiver.setExecutorService(getExecutorService());
+// //topologyEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Autoscaler topology receiver thread started");
+// }
+// }
private void addEventListeners() {
// Listen to topology events that affect clusters
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index 91f596e..18e6e0a 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -103,10 +103,10 @@ public class CartridgeAgent implements Runnable {
} */
// Start application event receiver thread
- registerApplicationEventListeners();
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent registering all event listeners ... done");
- }
+ //registerApplicationEventListeners();
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent registering all event listeners ... done");
+// }
// Execute instance started shell script
extensionHandler.onInstanceStartedEvent();
@@ -197,29 +197,29 @@ public class CartridgeAgent implements Runnable {
}
}
- protected void registerTenantEventListeners() {
- if (log.isDebugEnabled()) {
- log.debug("registerTenantEventListeners before");
- }
-
- eventListenerns.startTenantEventReceiver();
-
- if (log.isDebugEnabled()) {
- log.debug("registerTenantEventListeners after");
- }
- }
-
- protected void registerApplicationEventListeners() {
- if (log.isDebugEnabled()) {
- log.debug("registerApplicationListeners before");
- }
-
- eventListenerns.startApplicationsEventReceiver();
-
- if (log.isDebugEnabled()) {
- log.debug("registerApplicationEventListeners after");
- }
- }
+// protected void registerTenantEventListeners() {
+// if (log.isDebugEnabled()) {
+// log.debug("registerTenantEventListeners before");
+// }
+//
+// eventListenerns.startTenantEventReceiver();
+//
+// if (log.isDebugEnabled()) {
+// log.debug("registerTenantEventListeners after");
+// }
+// }
+
+// protected void registerApplicationEventListeners() {
+// if (log.isDebugEnabled()) {
+// log.debug("registerApplicationListeners before");
+// }
+//
+// eventListenerns.startApplicationsEventReceiver();
+//
+// if (log.isDebugEnabled()) {
+// log.debug("registerApplicationEventListeners after");
+// }
+// }
protected void validateRequiredSystemProperties() {
String jndiPropertiesDir = System.getProperty(CartridgeAgentConstants.JNDI_PROPERTIES_DIR);
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
index e6bb41b..103d2c7 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
@@ -72,11 +72,10 @@ public class CartridgeAgentEventListeners {
if (log.isDebugEnabled()) {
log.debug("Creating cartridge agent event listeners...");
}
- this.applicationsEventReceiver = new ApplicationSignUpEventReceiver();
- this.applicationsEventReceiver.setExecutorService(eventListenerExecutorService);
+ this.applicationsEventReceiver = ApplicationSignUpEventReceiver.getInstance();
- this.topologyEventReceiver = new TopologyEventReceiver();
- this.topologyEventReceiver.setExecutorService(eventListenerExecutorService);
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
+ //this.topologyEventReceiver.setExecutorService(eventListenerExecutorService);
this.instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
@@ -151,24 +150,24 @@ public class CartridgeAgentEventListeners {
}
- public void startApplicationsEventReceiver() {
-
- if (log.isDebugEnabled()) {
- log.debug("Starting cartridge agent application event message receiver");
- }
-
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- applicationsEventReceiver.execute();
- }
- });
-
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent application receiver thread started, waiting for event messages ...");
- }
-
- }
+// public void startApplicationsEventReceiver() {
+//
+// if (log.isDebugEnabled()) {
+// log.debug("Starting cartridge agent application event message receiver");
+// }
+//
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// applicationsEventReceiver.execute();
+// }
+// });
+//
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent application receiver thread started, waiting for event messages ...");
+// }
+//
+// }
private void addInstanceNotifierEventListeners() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
index 0427eb6..a501507 100644
--- a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
+++ b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
@@ -122,9 +122,9 @@ public class JavaCartridgeAgentTest {
String agentHome = setupJavaAgent();
ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 5);
- topologyEventReceiver = new TopologyEventReceiver();
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
+ topologyEventReceiver = TopologyEventReceiver.getInstance();
+ //topologyEventReceiver.setExecutorService(executorService);
+ //topologyEventReceiver.execute();
instanceStatusEventReceiver = new InstanceStatusEventReceiver();
instanceStatusEventReceiver.setExecutorService(executorService);
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
index d5819dc..95c4867 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
@@ -41,19 +41,20 @@ import org.apache.stratos.messaging.message.receiver.application.signup.Applicat
* Load balancer common application signup event receiver updates the topology in the given topology provider
* with the hostnames found in application signup events.
*/
-public class LoadBalancerCommonApplicationSignUpEventReceiver extends ApplicationSignUpEventReceiver {
+public class LoadBalancerCommonApplicationSignUpEventReceiver {
private static final Log log = LogFactory.getLog(LoadBalancerCommonApplicationSignUpEventReceiver.class);
-
+ private ApplicationSignUpEventReceiver applicationSignUpEventReceiver;
private TopologyProvider topologyProvider;
public LoadBalancerCommonApplicationSignUpEventReceiver(TopologyProvider topologyProvider) {
+ this.applicationSignUpEventReceiver = ApplicationSignUpEventReceiver.getInstance();
this.topologyProvider = topologyProvider;
addEventListeners();
}
private void addEventListeners() {
- addEventListener(new CompleteApplicationSignUpsEventListener() {
+ applicationSignUpEventReceiver.addEventListener(new CompleteApplicationSignUpsEventListener() {
private boolean initialized = false;
@Override
@@ -96,7 +97,7 @@ public class LoadBalancerCommonApplicationSignUpEventReceiver extends Applicatio
}
});
- addEventListener(new ApplicationSignUpAddedEventListener() {
+ applicationSignUpEventReceiver.addEventListener(new ApplicationSignUpAddedEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -110,7 +111,7 @@ public class LoadBalancerCommonApplicationSignUpEventReceiver extends Applicatio
}
});
- addEventListener(new ApplicationSignUpRemovedEventListener() {
+ applicationSignUpEventReceiver.addEventListener(new ApplicationSignUpRemovedEventListener() {
@Override
protected void onEvent(Event event) {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
index 4fb45a9..85142e3 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
@@ -38,15 +38,17 @@ import java.util.Properties;
* Load balancer common topology receiver updates the topology in the given topology provider
* according to topology events.
*/
-public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiver {
+public class LoadBalancerCommonTopologyEventReceiver {
private static final Log log = LogFactory.getLog(LoadBalancerCommonTopologyEventReceiver.class);
private TopologyProvider topologyProvider;
private boolean initialized;
+ private TopologyEventReceiver topologyEventReceiver;
public LoadBalancerCommonTopologyEventReceiver(TopologyProvider topologyProvider) {
this.topologyProvider = topologyProvider;
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
addEventListeners();
}
@@ -57,12 +59,12 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
}
- public void execute() {
- super.execute();
- if (log.isInfoEnabled()) {
- log.info("Load balancer topology receiver thread started");
- }
- }
+// public void execute() {
+// super.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Load balancer topology receiver thread started");
+// }
+// }
public void initializeTopology() {
if (initialized) {
@@ -115,7 +117,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
* Add default event listeners for updating the topology on topology events
*/
public void addEventListeners() {
- addEventListener(new CompleteTopologyEventListener() {
+ topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
if (!initialized) {
@@ -124,7 +126,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new MemberActivatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -142,11 +144,10 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
if (networkPartitionIdFilter != null && !networkPartitionIdFilter.equals("")) {
if (memberActivatedEvent.getNetworkPartitionId().equals(networkPartitionIdFilter)) {
addMember(serviceName, clusterId, memberId);
- }
- else{
+ } else {
log.debug(String.format("Member exists in a different network partition." +
- "[member id] %s [member network partition] %s [filter network partition] %s ",
- memberId,memberActivatedEvent.getNetworkPartitionId(),networkPartitionIdFilter));
+ "[member id] %s [member network partition] %s [filter network partition] %s ",
+ memberId, memberActivatedEvent.getNetworkPartitionId(), networkPartitionIdFilter));
}
} else {
addMember(serviceName, clusterId, memberId);
@@ -159,7 +160,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new MemberMaintenanceListener() {
+ topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
@Override
protected void onEvent(Event event) {
@@ -181,7 +182,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new MemberSuspendedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -203,7 +204,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new MemberTerminatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -224,7 +225,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new ClusterRemovedEventListener() {
+ topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -253,7 +254,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
});
- addEventListener(new ServiceRemovedEventListener() {
+ topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
@Override
protected void onEvent(Event event) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index ae2b6dd..e7a2071 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -201,8 +201,6 @@ public class LoadBalancerExtension {
*/
private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
- applicationSignUpEventReceiver.setExecutorService(executorService);
- applicationSignUpEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Application signup event receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index a7761cd..cb74984 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -186,8 +186,8 @@ public class LoadBalancerServiceComponent {
}
applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
- applicationSignUpEventReceiver.setExecutorService(executorService);
- applicationSignUpEventReceiver.execute();
+// applicationSignUpEventReceiver.setExecutorService(executorService);
+// applicationSignUpEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Application signup event receiver thread started");
}
@@ -266,13 +266,13 @@ public class LoadBalancerServiceComponent {
}
// Terminate application signup event receiver
- if (applicationSignUpEventReceiver != null) {
- try {
- applicationSignUpEventReceiver.terminate();
- } catch (Exception e) {
- log.warn("An error occurred while terminating application signup event receiver", e);
- }
- }
+// if (applicationSignUpEventReceiver != null) {
+// try {
+// applicationSignUpEventReceiver.terminate();
+// } catch (Exception e) {
+// log.warn("An error occurred while terminating application signup event receiver", e);
+// }
+// }
// Terminate domain mapping event receiver
if (domainMappingEventReceiver != null) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 47f401a..76d39a7 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -211,8 +211,8 @@ public class StratosManagerServiceComponent {
*/
private void initializeTopologyEventReceiver() {
topologyEventReceiver = new StratosManagerTopologyEventReceiver();
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
+// topologyEventReceiver.setExecutorService(executorService);
+// topologyEventReceiver.execute();
}
/**
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
index 08ca3d6..51b21ac 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
@@ -23,19 +23,19 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
-public class StratosManagerTopologyEventReceiver extends TopologyEventReceiver {
+public class StratosManagerTopologyEventReceiver {
private static final Log log = LogFactory.getLog(StratosManagerTopologyEventReceiver.class);
public StratosManagerTopologyEventReceiver() {
}
- @Override
- public void execute() {
- super.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Stratos manager topology event receiver thread started");
- }
- }
+// @Override
+// public void execute() {
+// super.execute();
+//
+// if (log.isInfoEnabled()) {
+// log.info("Stratos manager topology event receiver thread started");
+// }
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
new file mode 100644
index 0000000..5ac89e6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver;
+
+import java.util.concurrent.ExecutorService;
+
+public class StratosEventReceiver {
+
+ protected ExecutorService executorService;
+
+ public StratosEventReceiver () {
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index 55e3fd1..dde214d 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.receiver.application.signup;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.event.initializer.CompleteApplicationSignUpsRequestEvent;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
@@ -33,26 +35,41 @@ import java.util.concurrent.ExecutorService;
/**
* Application signup event receiver.
*/
-public class ApplicationSignUpEventReceiver {
+public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(ApplicationSignUpEventReceiver.class);
private ApplicationSignUpEventMessageDelegator messageDelegator;
private ApplicationSignUpEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private ExecutorService executorService;
+ private static volatile ApplicationSignUpEventReceiver instance;
- public ApplicationSignUpEventReceiver() {
+ private ApplicationSignUpEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100);
ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue();
this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue);
this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue);
+ execute();
+ }
+
+ public static ApplicationSignUpEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (ApplicationSignUpEventReceiver.class) {
+ if (instance == null) {
+ instance = new ApplicationSignUpEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_SIGNUP_TOPIC.getTopicName(),
@@ -103,11 +120,11 @@ public class ApplicationSignUpEventReceiver {
messageDelegator.terminate();
}
- public ExecutorService getExecutorService() {
- return executorService;
- }
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index b841d0a..50e078a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.receiver.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
@@ -34,26 +36,41 @@ import java.util.concurrent.ExecutorService;
* A thread for receiving topology information from message broker and
* build topology in topology manager.
*/
-public class TopologyEventReceiver {
+public class TopologyEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(TopologyEventReceiver.class);
private TopologyEventMessageDelegator messageDelegator;
private TopologyEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private ExecutorService executorService;
+ private static volatile TopologyEventReceiver instance;
- public TopologyEventReceiver() {
+ private TopologyEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue();
this.messageDelegator = new TopologyEventMessageDelegator(messageQueue);
this.messageListener = new TopologyEventMessageListener(messageQueue);
+ execute();
+ }
+
+ public static TopologyEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (TopologyEventReceiver.class) {
+ if (instance == null) {
+ instance = new TopologyEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.TOPOLOGY_TOPIC.getTopicName(), messageListener);
@@ -101,11 +118,11 @@ public class TopologyEventReceiver {
});
}
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
index 7b31861..c752f9e 100644
--- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
+++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
@@ -108,7 +108,7 @@ public class MockInstance implements Serializable {
}
private void startTopologyEventReceiver() {
- topologyEventReceiver = new TopologyEventReceiver();
+ topologyEventReceiver = TopologyEventReceiver.getInstance();
topologyEventReceiver.addEventListener(new MemberInitializedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -151,8 +151,8 @@ public class MockInstance implements Serializable {
}
}
});
- topologyEventReceiver.setExecutorService(eventListenerExecutorService);
- topologyEventReceiver.execute();
+// topologyEventReceiver.setExecutorService(eventListenerExecutorService);
+// topologyEventReceiver.execute();
if (log.isDebugEnabled()) {
log.debug(String.format(
"Mock instance topology event message receiver started for mock member [member-id] %s",