You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 14:04:19 UTC
[02/19] stratos git commit: making TopologyEventReceiver a singleton
and fixing references
making TopologyEventReceiver a singleton and fixing references
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/20560136
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/20560136
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/20560136
Branch: refs/heads/stratos-4.1.x
Commit: 20560136a3ae5ca85959c4da9d544485837be4e5
Parents: 5ce932c
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 17:28:00 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 17 11:33:49 2015 +0530
----------------------------------------------------------------------
.../AutoscalerTopologyEventReceiver.java | 24 +++----
.../internal/AutoscalerServiceComponent.java | 18 ++---
.../stratos/cartridge/agent/CartridgeAgent.java | 28 ++++----
.../agent/CartridgeAgentEventListeners.java | 72 ++++++++++----------
.../extension/api/LoadBalancerExtension.java | 33 ++++-----
.../internal/LoadBalancerServiceComponent.java | 24 +++----
.../service/MetadataTopologyEventReceiver.java | 35 +++++-----
.../service/registry/MetadataApiRegistry.java | 8 +--
.../cep/extension/CEPTopologyEventReceiver.java | 20 +++---
.../extension/FaultHandlingWindowProcessor.java | 10 +--
.../cep/extension/CEPTopologyEventReceiver.java | 20 +++---
.../extension/FaultHandlingWindowProcessor.java | 10 +--
.../tests/PythonAgentIntegrationTest.java | 6 +-
.../integration/common/TopologyHandler.java | 12 ++--
14 files changed, 163 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 6fd64a7..daa70ae 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -510,16 +510,16 @@ public class AutoscalerTopologyEventReceiver {
/**
* Terminate load balancer topology receiver thread.
*/
- public void terminate() {
- topologyEventReceiver.terminate();
- terminated = true;
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void terminate() {
+// topologyEventReceiver.terminate();
+// terminated = true;
+// }
+//
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index 76844a0..4d4c54f 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -173,8 +173,8 @@ public class AutoscalerServiceComponent {
// Start topology receiver
asTopologyReceiver = new AutoscalerTopologyEventReceiver();
- asTopologyReceiver.setExecutorService(executorService);
- asTopologyReceiver.execute();
+// asTopologyReceiver.setExecutorService(executorService);
+ //asTopologyReceiver.execute();
if (log.isDebugEnabled()) {
log.debug("Topology receiver executor service started");
}
@@ -245,13 +245,13 @@ public class AutoscalerServiceComponent {
}
protected void deactivate(ComponentContext context) {
- if (asTopologyReceiver != null) {
- try {
- asTopologyReceiver.terminate();
- } catch (Exception e) {
- log.warn("An error occurred while terminating autoscaler topology event receiver", e);
- }
- }
+// if (asTopologyReceiver != null) {
+// try {
+// asTopologyReceiver.terminate();
+// } catch (Exception e) {
+// log.warn("An error occurred while terminating autoscaler topology event receiver", e);
+// }
+// }
if (autoscalerHealthStatEventReceiver != null) {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index 18e6e0a..b0bf326 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -60,10 +60,10 @@ public class CartridgeAgent implements Runnable {
}
// Start topology event receiver thread
- registerTopologyEventListeners();
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent registerTopologyEventListeners done");
- }
+// registerTopologyEventListeners();
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent registerTopologyEventListeners done");
+// }
if (log.isInfoEnabled()) {
log.info("Waiting for CompleteTopologyEvent..");
@@ -186,16 +186,16 @@ public class CartridgeAgent implements Runnable {
}
}
- protected void registerTopologyEventListeners() {
- if (log.isDebugEnabled()) {
- log.debug("registerTopologyEventListeners before");
- }
- eventListenerns.startTopologyEventReceiver();
-
- if (log.isDebugEnabled()) {
- log.debug("registerTopologyEventListeners after");
- }
- }
+// protected void registerTopologyEventListeners() {
+// if (log.isDebugEnabled()) {
+// log.debug("registerTopologyEventListeners before");
+// }
+// eventListenerns.startTopologyEventReceiver();
+//
+// if (log.isDebugEnabled()) {
+// log.debug("registerTopologyEventListeners after");
+// }
+// }
// protected void registerTenantEventListeners() {
// if (log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
index 103d2c7..1d64ff0 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
@@ -94,24 +94,24 @@ public class CartridgeAgentEventListeners {
}
}
- public void startTopologyEventReceiver() {
-
- if (log.isDebugEnabled()) {
- log.debug("Starting cartridge agent topology event message receiver");
- }
-
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- topologyEventReceiver.execute();
- }
- });
-
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent topology receiver thread started, waiting for event messages ...");
- }
-
- }
+// public void startTopologyEventReceiver() {
+//
+// if (log.isDebugEnabled()) {
+// log.debug("Starting cartridge agent topology event message receiver");
+// }
+//
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// topologyEventReceiver.execute();
+// }
+// });
+//
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent topology receiver thread started, waiting for event messages ...");
+// }
+//
+// }
public void startInstanceNotifierReceiver() {
@@ -131,24 +131,24 @@ public class CartridgeAgentEventListeners {
}
}
- public void startTenantEventReceiver() {
-
- if (log.isDebugEnabled()) {
- log.debug("Starting cartridge agent tenant event message receiver");
- }
-
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- topologyEventReceiver.execute();
- }
- });
-
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent tenant receiver thread started, waiting for event messages ...");
- }
-
- }
+// public void startTenantEventReceiver() {
+//
+// if (log.isDebugEnabled()) {
+// log.debug("Starting cartridge agent tenant event message receiver");
+// }
+//
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// topologyEventReceiver.execute();
+// }
+// });
+//
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent tenant receiver thread started, waiting for event messages ...");
+// }
+//
+// }
// public void startApplicationsEventReceiver() {
//
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index e7a2071..d2a8cb3 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -39,6 +39,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import java.util.concurrent.ExecutorService;
@@ -123,8 +124,8 @@ public class LoadBalancerExtension {
addTopologyEventListeners(topologyEventReceiver);
// Add default topology provider event listeners
topologyEventReceiver.addEventListeners();
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
+// topologyEventReceiver.setExecutorService(executorService);
+// topologyEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Topology receiver thread started");
}
@@ -212,12 +213,12 @@ public class LoadBalancerExtension {
* @param topologyEventReceiver topology event receiver instance
*/
private void addTopologyEventListeners(final LoadBalancerCommonTopologyEventReceiver topologyEventReceiver) {
- topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
try {
- if (!loadBalancerStarted) {
+ if (!loadBalancerStarted) {
configureAndStart();
}
} catch (Exception e) {
@@ -228,37 +229,37 @@ public class LoadBalancerExtension {
}
}
});
- topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
- topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new MemberSuspendedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
- topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
- topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new ClusterRemovedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
- topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new ServiceRemovedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
- topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
+ TopologyEventReceiver.getInstance().addEventListener(new MemberMaintenanceListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
@@ -338,12 +339,12 @@ public class LoadBalancerExtension {
* Stop load balancer instance.
*/
public void stop() {
- try {
- if (topologyEventReceiver != null) {
- topologyEventReceiver.terminate();
- }
- } catch (Exception ignore) {
- }
+// try {
+// if (topologyEventReceiver != null) {
+// topologyEventReceiver.terminate();
+// }
+// } catch (Exception ignore) {
+// }
try {
if (statisticsNotifier != null) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index cb74984..442686a 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -199,11 +199,11 @@ public class LoadBalancerServiceComponent {
}
topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Topology receiver thread started");
- }
+// topologyEventReceiver.setExecutorService(executorService);
+// topologyEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Topology receiver thread started");
+// }
if (log.isInfoEnabled()) {
if (TopologyServiceFilter.getInstance().isActive()) {
@@ -257,13 +257,13 @@ public class LoadBalancerServiceComponent {
}
// Terminate topology receiver
- if (topologyEventReceiver != null) {
- try {
- topologyEventReceiver.terminate();
- } catch (Exception e) {
- log.warn("An error occurred while terminating topology event receiver", e);
- }
- }
+// if (topologyEventReceiver != null) {
+// try {
+// topologyEventReceiver.terminate();
+// } catch (Exception e) {
+// log.warn("An error occurred while terminating topology event receiver", e);
+// }
+// }
// Terminate application signup event receiver
// if (applicationSignUpEventReceiver != null) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
index e516271..f16282d 100644
--- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
+++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
@@ -41,8 +41,9 @@ public class MetadataTopologyEventReceiver {
private ExecutorService executorService;
public MetadataTopologyEventReceiver() {
- this.topologyEventReceiver = new TopologyEventReceiver();
- executorService = StratosThreadPool.getExecutorService(Constants.METADATA_SERVICE_THREAD_POOL_ID, 20);
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
+// //executorService = StratosThreadPool.getExecutorService(Constants
+// .METADATA_SERVICE_THREAD_POOL_ID, 20);
addEventListeners();
}
@@ -67,21 +68,21 @@ public class MetadataTopologyEventReceiver {
});
}
- public void execute() {
- topologyEventReceiver.setExecutorService(getExecutorService());
- topologyEventReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Metadata service topology receiver started.");
- }
- }
-
- public void terminate() {
- topologyEventReceiver.terminate();
- if (log.isInfoEnabled()) {
- log.info("Metadata service topology receiver stopped.");
- }
- }
+// public void execute() {
+// topologyEventReceiver.setExecutorService(getExecutorService());
+// topologyEventReceiver.execute();
+//
+// if (log.isInfoEnabled()) {
+// log.info("Metadata service topology receiver started.");
+// }
+// }
+//
+// public void terminate() {
+// topologyEventReceiver.terminate();
+// if (log.isInfoEnabled()) {
+// log.info("Metadata service topology receiver stopped.");
+// }
+// }
public ExecutorService getExecutorService() {
return executorService;
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
index 75ddbc7..47fc600 100644
--- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
+++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
@@ -56,7 +56,7 @@ public class MetadataApiRegistry implements DataStore {
public MetadataApiRegistry() {
metadataTopologyEventReceiver = new MetadataTopologyEventReceiver();
- metadataTopologyEventReceiver.execute();
+// metadataTopologyEventReceiver.execute();
metadataApplicationEventReceiver = new MetadataApplicationEventReceiver();
metadataApplicationEventReceiver.execute();
@@ -417,9 +417,9 @@ public class MetadataApiRegistry implements DataStore {
return applicationIdToReadWriteLockMap;
}
- public void stopTopologyReceiver() {
- metadataTopologyEventReceiver.terminate();
- }
+// public void stopTopologyReceiver() {
+// metadataTopologyEventReceiver.terminate();
+// }
public void stopApplicationReceiver() {
metadataApplicationEventReceiver.terminate();
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 59c70c5..2696271 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -34,26 +34,28 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
/**
* CEP Topology Receiver for Fault Handling Window Processor.
*/
-public class CEPTopologyEventReceiver extends TopologyEventReceiver {
+public class CEPTopologyEventReceiver {
private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
private FaultHandlingWindowProcessor faultHandler;
+ private TopologyEventReceiver topologyEventReceiver;
public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
this.faultHandler = faultHandler;
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
addEventListeners();
}
- @Override
- public void execute() {
- super.execute();
- log.info("CEP topology event receiver thread started");
- }
+// @Override
+// public void execute() {
+// super.execute();
+// log.info("CEP topology event receiver thread started");
+// }
private void addEventListeners() {
// Load member time stamp map from the topology as a one time task
- addEventListener(new CompleteTopologyEventListener() {
+ topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
private boolean initialized;
@Override
@@ -74,7 +76,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
});
// Remove member from the time stamp map when MemberTerminated event is received.
- addEventListener(new MemberTerminatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
@@ -84,7 +86,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
});
// Add member to time stamp map whenever member is activated
- addEventListener(new MemberActivatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 0526f6a..9e98288 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -286,10 +286,10 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
MemberFaultEventMap
.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
- executorService = StratosThreadPool
- .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
- cepTopologyEventReceiver.setExecutorService(executorService);
- cepTopologyEventReceiver.execute();
+// executorService = StratosThreadPool
+// .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
+// cepTopologyEventReceiver.setExecutorService(executorService);
+// cepTopologyEventReceiver.execute();
//Ordinary scheduling
window.schedule();
@@ -329,7 +329,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
@Override
public void destroy() {
// terminate topology listener thread
- cepTopologyEventReceiver.terminate();
+// cepTopologyEventReceiver.terminate();
window = null;
// Shutdown executor service
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 59c70c5..2696271 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -34,26 +34,28 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
/**
* CEP Topology Receiver for Fault Handling Window Processor.
*/
-public class CEPTopologyEventReceiver extends TopologyEventReceiver {
+public class CEPTopologyEventReceiver {
private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
private FaultHandlingWindowProcessor faultHandler;
+ private TopologyEventReceiver topologyEventReceiver;
public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
this.faultHandler = faultHandler;
+ this.topologyEventReceiver = TopologyEventReceiver.getInstance();
addEventListeners();
}
- @Override
- public void execute() {
- super.execute();
- log.info("CEP topology event receiver thread started");
- }
+// @Override
+// public void execute() {
+// super.execute();
+// log.info("CEP topology event receiver thread started");
+// }
private void addEventListeners() {
// Load member time stamp map from the topology as a one time task
- addEventListener(new CompleteTopologyEventListener() {
+ topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
private boolean initialized;
@Override
@@ -74,7 +76,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
});
// Remove member from the time stamp map when MemberTerminated event is received.
- addEventListener(new MemberTerminatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
@@ -84,7 +86,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
});
// Add member to time stamp map whenever member is activated
- addEventListener(new MemberActivatedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 2fdce19..c3951e4 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -279,10 +279,10 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
MemberFaultEventMap
.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
- executorService = StratosThreadPool
- .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
- cepTopologyEventReceiver.setExecutorService(executorService);
- cepTopologyEventReceiver.execute();
+// executorService = StratosThreadPool
+// .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
+// cepTopologyEventReceiver.setExecutorService(executorService);
+// cepTopologyEventReceiver.execute();
//Ordinary scheduling
window.schedule();
@@ -322,7 +322,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
@Override
public void destroy() {
// terminate topology listener thread
- cepTopologyEventReceiver.terminate();
+// cepTopologyEventReceiver.terminate();
window = null;
// Shutdown executor service
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index 649430f..f31583c 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -124,9 +124,9 @@ public abstract class PythonAgentIntegrationTest {
}
ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize);
- topologyEventReceiver = new TopologyEventReceiver();
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
+ topologyEventReceiver = TopologyEventReceiver.getInstance();
+// topologyEventReceiver.setExecutorService(executorService);
+// topologyEventReceiver.execute();
instanceStatusEventReceiver = new InstanceStatusEventReceiver();
instanceStatusEventReceiver.setExecutorService(executorService);
http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
index a0cc928..5a199c7 100644
--- a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
+++ b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
@@ -108,9 +108,9 @@ public class TopologyHandler {
}
private void initializeApplicationSignUpEventReceiver() {
- applicationSignUpEventReceiver = new ApplicationSignUpEventReceiver();
- applicationSignUpEventReceiver.setExecutorService(executorService);
- applicationSignUpEventReceiver.execute();
+ applicationSignUpEventReceiver = ApplicationSignUpEventReceiver.getInstance();
+// applicationSignUpEventReceiver.setExecutorService(executorService);
+// applicationSignUpEventReceiver.execute();
}
private void initializeTenantEventReceiver() {
@@ -171,8 +171,8 @@ public class TopologyHandler {
* Initialize Topology event receiver
*/
private void initializeTopologyEventReceiver() {
- topologyEventReceiver = new TopologyEventReceiver();
- topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver = TopologyEventReceiver.getInstance();
+// topologyEventReceiver.setExecutorService(executorService);
topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -206,7 +206,7 @@ public class TopologyHandler {
clusterInstanceInactivateEvent.getClusterId()));
}
});
- topologyEventReceiver.execute();
+ //topologyEventReceiver.execute();
}
/**