You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 14:04:24 UTC

[07/19] stratos git commit: making ClusterStatusEventReceiver singleton and fixing references in components

making ClusterStatusEventReceiver singleton and fixing references in components


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/e4e72d94
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/e4e72d94
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/e4e72d94

Branch: refs/heads/stratos-4.1.x
Commit: e4e72d94e3bf2896dc7f8806faa085c7680b848e
Parents: bd37f9c
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 18:30:49 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 17 11:34:50 2015 +0530

----------------------------------------------------------------------
 .../CloudControllerServiceComponent.java        |  4 +-
 .../status/ClusterStatusTopicReceiver.java      | 59 ++++++++++----------
 .../status/ClusterStatusEventReceiver.java      | 51 +++++++++++------
 .../mapping/DomainMappingEventReceiver.java     | 27 ++++-----
 4 files changed, 78 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/e4e72d94/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 2368596..74d36e7 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -154,8 +154,8 @@ public class CloudControllerServiceComponent {
         }
 
         clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
-        clusterStatusTopicReceiver.setExecutorService(executorService);
-        clusterStatusTopicReceiver.execute();
+//        clusterStatusTopicReceiver.setExecutorService(executorService);
+//        clusterStatusTopicReceiver.execute();
 
         if (log.isInfoEnabled()) {
             log.info("Cluster status event receiver thread started");

http://git-wip-us.apache.org/repos/asf/stratos/blob/e4e72d94/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index daa6bf5..e0b9f62 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -32,28 +32,27 @@ import java.util.concurrent.ExecutorService;
 public class ClusterStatusTopicReceiver {
     private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class);
 
-    private ClusterStatusEventReceiver statusEventReceiver;
-    private boolean terminated;
-    private ExecutorService executorService;
+    private ClusterStatusEventReceiver clusterStatusEventReceiver;
+    //private boolean terminated;
+    //private ExecutorService executorService;
 
     public ClusterStatusTopicReceiver() {
-        this.statusEventReceiver = new ClusterStatusEventReceiver();
-
+        this.clusterStatusEventReceiver = ClusterStatusEventReceiver.getInstance();
         addEventListeners();
     }
 
-    public void execute() {
-        statusEventReceiver.setExecutorService(executorService);
-        statusEventReceiver.execute();
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller Cluster status thread started");
-        }
-
-    }
+//    public void execute() {
+//        clusterStatusEventReceiver.setExecutorService(executorService);
+//        clusterStatusEventReceiver.execute();
+//        if (log.isInfoEnabled()) {
+//            log.info("Cloud controller Cluster status thread started");
+//        }
+//
+//    }
 
     private void addEventListeners() {
         // Listen to topology events that affect clusters
-        statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
+        clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
@@ -64,14 +63,14 @@ public class ClusterStatusTopicReceiver {
             }
         });
 
-        statusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
+        clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 //TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent) event);
             }
         });
 
-        statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
+        clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
@@ -82,7 +81,7 @@ public class ClusterStatusTopicReceiver {
             }
         });
 
-        statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
+        clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
@@ -93,7 +92,7 @@ public class ClusterStatusTopicReceiver {
             }
         });
 
-        statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
+        clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
@@ -104,7 +103,7 @@ public class ClusterStatusTopicReceiver {
             }
         });
 
-        statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
+        clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
@@ -116,15 +115,15 @@ public class ClusterStatusTopicReceiver {
         });
     }
 
-    public void setTerminated(boolean terminated) {
-        this.terminated = terminated;
-    }
-
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
-    }
+//    public void setTerminated(boolean terminated) {
+//        this.terminated = terminated;
+//    }
+//
+//    public ExecutorService getExecutorService() {
+//        return executorService;
+//    }
+//
+//    public void setExecutorService(ExecutorService executorService) {
+//        this.executorService = executorService;
+//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e4e72d94/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index 119cf49..2b4d557 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -21,8 +21,10 @@ package org.apache.stratos.messaging.message.receiver.cluster.status;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
 import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
 import org.apache.stratos.messaging.util.MessagingUtil;
 
 import java.util.concurrent.ExecutorService;
@@ -30,26 +32,39 @@ import java.util.concurrent.ExecutorService;
 /**
  * A thread for receiving instance notifier information from message broker.
  */
-public class ClusterStatusEventReceiver {
+public class ClusterStatusEventReceiver extends StratosEventReceiver {
     private static final Log log = LogFactory.getLog(ClusterStatusEventReceiver.class);
     private final ClusterStatusEventMessageDelegator messageDelegator;
     private final ClusterStatusEventMessageListener messageListener;
     private EventSubscriber eventSubscriber;
-    private boolean terminated;
-    private ExecutorService executorService;
+    private static volatile ClusterStatusEventReceiver instance;
 
-    public ClusterStatusEventReceiver() {
+    private ClusterStatusEventReceiver() {
+        // TODO: make pool size configurable
+        this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100);
         ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
         this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue);
         this.messageListener = new ClusterStatusEventMessageListener(messageQueue);
+        execute();
     }
 
     public void addEventListener(EventListener eventListener) {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public static ClusterStatusEventReceiver getInstance () {
+        if (instance == null) {
+            synchronized (ClusterStatusEventReceiver.class) {
+                if (instance == null) {
+                    instance = new ClusterStatusEventReceiver();
+                }
+            }
+        }
 
-    public void execute() {
+        return instance;
+    }
+
+    private void execute() {
         try {
             // Start topic subscriber thread
             eventSubscriber = new EventSubscriber(MessagingUtil.Topics.CLUSTER_STATUS_TOPIC.getTopicName(), messageListener);
@@ -77,17 +92,17 @@ public class ClusterStatusEventReceiver {
         return ((eventSubscriber != null) && (eventSubscriber.isSubscribed()));
     }
 
-    public void terminate() {
-        eventSubscriber.terminate();
-        messageDelegator.terminate();
-        terminated = true;
-    }
-
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
-    }
+//    public void terminate() {
+//        eventSubscriber.terminate();
+//        messageDelegator.terminate();
+//        terminated = true;
+//    }
+//
+//    public ExecutorService getExecutorService() {
+//        return executorService;
+//    }
+//
+//    public void setExecutorService(ExecutorService executorService) {
+//        this.executorService = executorService;
+//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e4e72d94/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 3c723a3..6b79873 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -47,6 +47,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
         DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue();
         this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue);
         this.messageListener = new DomainMappingEventMessageListener(messageQueue);
+        execute();
     }
 
     public void addEventListener(EventListener eventListener) {
@@ -65,7 +66,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
         return instance;
     }
 
-    public void execute() {
+    private void execute() {
         try {
             // Start topic subscriber thread
             eventSubscriber = new EventSubscriber(MessagingUtil.Topics.DOMAIN_MAPPING_TOPIC.getTopicName(), messageListener);
@@ -91,16 +92,16 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
         }
     }
 
-    public void terminate() {
-        eventSubscriber.terminate();
-        messageDelegator.terminate();
-    }
-
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
-    }
+//    public void terminate() {
+//        eventSubscriber.terminate();
+//        messageDelegator.terminate();
+//    }
+//
+//    public ExecutorService getExecutorService() {
+//        return executorService;
+//    }
+//
+//    public void setExecutorService(ExecutorService executorService) {
+//        this.executorService = executorService;
+//    }
 }