You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 15:56:10 UTC
[07/14] stratos git commit: making ClusterStatusEventReceiver
singleton and fixing references in components
making ClusterStatusEventReceiver singleton and fixing references in components
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/27ae4baf
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/27ae4baf
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/27ae4baf
Branch: refs/heads/master
Commit: 27ae4bafc268a59dcd65336ca9ba491eaeae7426
Parents: c627ff1
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 18:30:49 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:04:51 2015 +0530
----------------------------------------------------------------------
.../CloudControllerServiceComponent.java | 4 +-
.../status/ClusterStatusTopicReceiver.java | 59 ++++++++++----------
.../status/ClusterStatusEventReceiver.java | 51 +++++++++++------
.../mapping/DomainMappingEventReceiver.java | 27 ++++-----
4 files changed, 78 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 2368596..74d36e7 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -154,8 +154,8 @@ public class CloudControllerServiceComponent {
}
clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
- clusterStatusTopicReceiver.setExecutorService(executorService);
- clusterStatusTopicReceiver.execute();
+// clusterStatusTopicReceiver.setExecutorService(executorService);
+// clusterStatusTopicReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Cluster status event receiver thread started");
http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index daa6bf5..e0b9f62 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -32,28 +32,27 @@ import java.util.concurrent.ExecutorService;
public class ClusterStatusTopicReceiver {
private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class);
- private ClusterStatusEventReceiver statusEventReceiver;
- private boolean terminated;
- private ExecutorService executorService;
+ private ClusterStatusEventReceiver clusterStatusEventReceiver;
+ //private boolean terminated;
+ //private ExecutorService executorService;
public ClusterStatusTopicReceiver() {
- this.statusEventReceiver = new ClusterStatusEventReceiver();
-
+ this.clusterStatusEventReceiver = ClusterStatusEventReceiver.getInstance();
addEventListeners();
}
- public void execute() {
- statusEventReceiver.setExecutorService(executorService);
- statusEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Cloud controller Cluster status thread started");
- }
-
- }
+// public void execute() {
+// clusterStatusEventReceiver.setExecutorService(executorService);
+// clusterStatusEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Cloud controller Cluster status thread started");
+// }
+//
+// }
private void addEventListeners() {
// Listen to topology events that affect clusters
- statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -64,14 +63,14 @@ public class ClusterStatusTopicReceiver {
}
});
- statusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
@Override
protected void onEvent(Event event) {
//TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent) event);
}
});
- statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -82,7 +81,7 @@ public class ClusterStatusTopicReceiver {
}
});
- statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -93,7 +92,7 @@ public class ClusterStatusTopicReceiver {
}
});
- statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -104,7 +103,7 @@ public class ClusterStatusTopicReceiver {
}
});
- statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
+ clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -116,15 +115,15 @@ public class ClusterStatusTopicReceiver {
});
}
- public void setTerminated(boolean terminated) {
- this.terminated = terminated;
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void setTerminated(boolean terminated) {
+// this.terminated = terminated;
+// }
+//
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index 119cf49..2b4d557 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -21,8 +21,10 @@ package org.apache.stratos.messaging.message.receiver.cluster.status;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
@@ -30,26 +32,39 @@ import java.util.concurrent.ExecutorService;
/**
* A thread for receiving instance notifier information from message broker.
*/
-public class ClusterStatusEventReceiver {
+public class ClusterStatusEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(ClusterStatusEventReceiver.class);
private final ClusterStatusEventMessageDelegator messageDelegator;
private final ClusterStatusEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private boolean terminated;
- private ExecutorService executorService;
+ private static volatile ClusterStatusEventReceiver instance;
- public ClusterStatusEventReceiver() {
+ private ClusterStatusEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100);
ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue);
this.messageListener = new ClusterStatusEventMessageListener(messageQueue);
+ execute();
}
public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}
+ public static ClusterStatusEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (ClusterStatusEventReceiver.class) {
+ if (instance == null) {
+ instance = new ClusterStatusEventReceiver();
+ }
+ }
+ }
- public void execute() {
+ return instance;
+ }
+
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.CLUSTER_STATUS_TOPIC.getTopicName(), messageListener);
@@ -77,17 +92,17 @@ public class ClusterStatusEventReceiver {
return ((eventSubscriber != null) && (eventSubscriber.isSubscribed()));
}
- public void terminate() {
- eventSubscriber.terminate();
- messageDelegator.terminate();
- terminated = true;
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void terminate() {
+// eventSubscriber.terminate();
+// messageDelegator.terminate();
+// terminated = true;
+// }
+//
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 3c723a3..6b79873 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -47,6 +47,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue();
this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue);
this.messageListener = new DomainMappingEventMessageListener(messageQueue);
+ execute();
}
public void addEventListener(EventListener eventListener) {
@@ -65,7 +66,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
return instance;
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.DOMAIN_MAPPING_TOPIC.getTopicName(), messageListener);
@@ -91,16 +92,16 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
}
}
- public void terminate() {
- eventSubscriber.terminate();
- messageDelegator.terminate();
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public void terminate() {
+// eventSubscriber.terminate();
+// messageDelegator.terminate();
+// }
+//
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}