You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ga...@apache.org on 2015/11/23 16:04:15 UTC

[1/6] stratos git commit: Fixing jira issue STRATOS-1629 - DAS publishers created each time when an event is published from stratos to DAS

Repository: stratos
Updated Branches:
  refs/heads/stratos-4.1.x 6496b05a3 -> 509bd6bdc


Fixing jira issue STRATOS-1629 - DAS publishers created each time when an event is published from stratos to DAS


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/65aee8a8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/65aee8a8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/65aee8a8

Branch: refs/heads/stratos-4.1.x
Commit: 65aee8a81ffec254486e88b5c53eefa58f06dea8
Parents: af13aeb
Author: Thanuja <th...@wso2.com>
Authored: Mon Nov 23 11:27:12 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Mon Nov 23 11:28:38 2015 +0530

----------------------------------------------------------------------
 .../monitor/cluster/ClusterMonitor.java         | 21 +++++++++---
 .../messaging/topology/TopologyBuilder.java     | 34 +++-----------------
 .../src/main/conf/drools/dependent-scaling.drl  |  8 ++---
 .../src/main/conf/drools/mincheck.drl           |  9 ++----
 .../src/main/conf/drools/scaling.drl            |  8 ++---
 .../src/test/resources/common/scaling.drl       |  8 ++---
 6 files changed, 31 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
index 28c2d95..5976279 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
@@ -41,6 +41,8 @@ import org.apache.stratos.autoscaler.monitor.events.ScalingEvent;
 import org.apache.stratos.autoscaler.monitor.events.ScalingUpBeyondMaxEvent;
 import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder;
 import org.apache.stratos.autoscaler.rule.RuleTasksDelegator;
+import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory;
+import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher;
 import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiveProcessor;
 import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInactiveProcessor;
 import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
@@ -51,6 +53,7 @@ import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
 import org.apache.stratos.common.Properties;
 import org.apache.stratos.common.client.CloudControllerServiceClient;
 import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.domain.application.ApplicationStatus;
 import org.apache.stratos.messaging.domain.application.GroupStatus;
@@ -98,7 +101,8 @@ public class ClusterMonitor extends Monitor {
     private boolean hasScalingDependents;
     private boolean groupScalingEnabledSubtree;
     private String deploymentPolicyId;
-
+    private ScalingDecisionPublisher scalingDecisionPublisher =
+            AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS);
 
     public ClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree,
                           String deploymentPolicyId) {
@@ -337,6 +341,9 @@ public class ClusterMonitor extends Monitor {
                                 instanceContext.getMinCheckKnowledgeSession().setGlobal("algorithmName",
                                         paritionAlgo);
 
+                                instanceContext.getMinCheckKnowledgeSession().setGlobal("scalingDecisionPublisher",
+                                        scalingDecisionPublisher);
+
                                 if (log.isDebugEnabled()) {
                                     log.debug(String.format("Running minimum check for [cluster instance] %s, " +
                                                     "[cluster id] %s",
@@ -350,7 +357,7 @@ public class ClusterMonitor extends Monitor {
 
                                 if (log.isDebugEnabled()) {
                                     log.debug(String.format("Running maximum check for [cluster instance] %s, " +
-                                                    "[cluster id] %s", instanceContext.getId(), clusterId));
+                                            "[cluster id] %s", instanceContext.getId(), clusterId));
                                 }
                                 instanceContext.setMaxCheckFactHandle(evaluate(instanceContext.
                                                 getMaxCheckKnowledgeSession(),
@@ -376,6 +383,8 @@ public class ClusterMonitor extends Monitor {
                                             clusterContext.getAutoscalePolicy());
                                     instanceContext.getScaleCheckKnowledgeSession().setGlobal("arspiReset",
                                             averageRequestServedPerInstanceReset);
+                                    instanceContext.getScaleCheckKnowledgeSession().setGlobal("scalingDecisionPublisher",
+                                            scalingDecisionPublisher);
                                     if (log.isDebugEnabled()) {
                                         log.debug("Running scale check, [Is rif Reset] " + rifReset + ", " +
                                                 "[Is memoryConsumption Reset] " + memoryConsumptionReset + ", " +
@@ -552,8 +561,12 @@ public class ClusterMonitor extends Monitor {
         clusterInstanceContext.setRequiredInstanceCountBasedOnDependencies(roundedRequiredInstanceCount);
 
         clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-        clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("roundedRequiredInstanceCount", roundedRequiredInstanceCount);
-        clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("algorithmName", clusterInstanceContext.getPartitionAlgorithm());
+        clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("roundedRequiredInstanceCount",
+                roundedRequiredInstanceCount);
+        clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("algorithmName",
+                clusterInstanceContext.getPartitionAlgorithm());
+        clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("scalingDecisionPublisher",
+                scalingDecisionPublisher);
 
         if (log.isDebugEnabled()) {
             log.debug(String.format("Running dependent scale check for [cluster instance] %s, " +

http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index da38337..90fa8bd 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -57,6 +57,11 @@ import java.util.*;
  */
 public class TopologyBuilder {
     private static final Log log = LogFactory.getLog(TopologyBuilder.class);
+    private static MemberInformationPublisher memInfoPublisher = CloudControllerPublisherFactory.
+            createMemberInformationPublisher(StatisticsPublisherType.WSO2DAS);
+
+    private static MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+            createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
 
     public static void handleServiceCreated(List<Cartridge> cartridgeList) throws RegistryException {
         Service service;
@@ -352,9 +357,6 @@ public class TopologyBuilder {
             //member created time
             Long timestamp = System.currentTimeMillis();
             //publishing member status to DAS
-            MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
-                    createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
-
             if (memStatusPublisher.isEnabled()) {
                 if (log.isDebugEnabled()) {
                     log.debug("Publishing Member Status to DAS");
@@ -363,8 +365,6 @@ public class TopologyBuilder {
                         memberContext.getClusterInstanceId(), memberContext.getCartridgeType(),
                         memberContext.getNetworkPartitionId(), memberContext.getPartition().getId(),
                         memberContext.getMemberId(), MemberStatus.Created.toString());
-            } else {
-                log.warn("Member Status Publisher is not enabled");
             }
 
         } finally {
@@ -435,11 +435,6 @@ public class TopologyBuilder {
                 Long timestamp = System.currentTimeMillis();
                 TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
                 //publishing member information and status to DAS
-                MemberInformationPublisher memInfoPublisher = CloudControllerPublisherFactory.
-                        createMemberInformationPublisher(StatisticsPublisherType.WSO2DAS);
-
-                MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
-                        createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
 
                 if (memInfoPublisher.isEnabled()) {
                     if (log.isInfoEnabled()) {
@@ -460,8 +455,6 @@ public class TopologyBuilder {
                             memberContext.getClusterInstanceId(), memberContext.getCartridgeType(),
                             memberContext.getNetworkPartitionId(), memberContext.getPartition().getId(),
                             memberContext.getMemberId(), MemberStatus.Initialized.toString());
-                } else {
-                    log.warn("Member status publisher is not enabled");
                 }
             }
         } finally {
@@ -519,9 +512,6 @@ public class TopologyBuilder {
                     //memberStartedEvent.
                     TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
                     //publishing member status to DAS
-                    MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
-                            createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
-
                     if (memStatusPublisher.isEnabled()) {
                         if (log.isDebugEnabled()) {
                             log.debug("Publishing Member Status to DAS");
@@ -533,8 +523,6 @@ public class TopologyBuilder {
                                         instanceStartedEvent.getNetworkPartitionId(),
                                         instanceStartedEvent.getPartitionId(), instanceStartedEvent.getMemberId(),
                                         MemberStatus.Starting.toString());
-                    } else {
-                        log.warn("Member Status Publisher is not enabled");
                     }
                 }
             } finally {
@@ -632,8 +620,6 @@ public class TopologyBuilder {
                 TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
 
                 //publishing member status to DAS
-                MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
-                        createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
                 if (memStatusPublisher.isEnabled()) {
                     if (log.isDebugEnabled()) {
                         log.debug("Publishing Member Status to DAS");
@@ -643,8 +629,6 @@ public class TopologyBuilder {
                                     memberActivatedEvent.getClusterInstanceId(), memberActivatedEvent.getServiceName(),
                                     memberActivatedEvent.getNetworkPartitionId(), memberActivatedEvent.getPartitionId(),
                                     memberActivatedEvent.getMemberId(), MemberStatus.Active.toString());
-                } else {
-                    log.warn("Member Status Publisher is not enabled");
                 }
             }
         } finally {
@@ -700,8 +684,6 @@ public class TopologyBuilder {
         }
         TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
         //publishing member status to DAS.
-        MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
-                createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
         if (memStatusPublisher.isEnabled()) {
             if (log.isDebugEnabled()) {
                 log.debug("Publishing Member Status to DAS");
@@ -713,8 +695,6 @@ public class TopologyBuilder {
                             instanceReadyToShutdownEvent.getNetworkPartitionId(),
                             instanceReadyToShutdownEvent.getPartitionId(), instanceReadyToShutdownEvent.getMemberId(),
                             MemberStatus.ReadyToShutDown.toString());
-        } else {
-            log.warn("Member Status Publisher is not enabled");
         }
         //termination of particular instance will be handled by autoscaler
     }
@@ -814,8 +794,6 @@ public class TopologyBuilder {
                         partitionId, properties, groupAlias);
 
         //publishing member status to DAS.
-        MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
-                createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
         if (memStatusPublisher.isEnabled()) {
             if (log.isDebugEnabled()) {
                 log.debug("Publishing Member Status to DAS");
@@ -823,8 +801,6 @@ public class TopologyBuilder {
             memStatusPublisher.publish(timestamp, applicationId, member.getClusterId(), clusterAlias,
                     member.getClusterInstanceId(), member.getServiceName(), member.getNetworkPartitionId(),
                     member.getPartitionId(), member.getMemberId(), MemberStatus.Terminated.toString());
-        } else {
-            log.warn("Member Status Publisher is not enabled");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
index c4a1141..72c5536 100644
--- a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
@@ -28,15 +28,13 @@ import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadThresholds;
 import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption;
 import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage;
 import java.util.UUID;
-import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher;
-import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
 
 global org.apache.stratos.autoscaler.rule.RuleLog log;
 global java.lang.String clusterId;
 global Integer roundedRequiredInstanceCount;
 global org.apache.stratos.autoscaler.rule.RuleTasksDelegator delegator;
 global java.lang.String algorithmName;
+global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher;
 
 rule "Dependent Scaling Rule"
 dialect "mvel"
@@ -82,7 +80,7 @@ dialect "mvel"
                 String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString();
                 long scalingTime = System.currentTimeMillis();
                 String scalingReason = "DEPENDENCY";
-                ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS);
+
                 if (scalingDecisionPublisher.isEnabled()) {
                     log.debug("Publishing scaling decision to DAS");
                     scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId,
@@ -90,8 +88,6 @@ dialect "mvel"
                                              0, 0, 0, 0, 0, 0,0, 0, 0,
                                              additionalInstances + nonTerminatedMembers,
                                              0, additionalInstances, scalingReason);
-                } else {
-                    log.warn("Scaling decision publisher is not enabled");
                 }
 
                 while(count != additionalInstances  && partitionsAvailable) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
index 5ce6d21..250676d 100755
--- a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
@@ -38,9 +38,6 @@ import org.apache.stratos.cloud.controller.stub.domain.Partition;
 import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
 import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
 import java.util.UUID;
-import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher;
-import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
 
 global org.apache.stratos.autoscaler.rule.RuleLog log;
 global org.apache.stratos.autoscaler.pojo.policy.PolicyManager manager;
@@ -49,6 +46,7 @@ global org.apache.stratos.autoscaler.rule.RuleTasksDelegator delegator;
 global java.util.Map partitionCtxts;
 global java.lang.String clusterId;
 global java.lang.String algorithmName;
+global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher;
 
 rule "Minimum Rule"
 dialect "mvel"
@@ -74,7 +72,7 @@ dialect "mvel"
         String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString();
         long scalingTime = System.currentTimeMillis();
         String scalingReason = "MIN";
-        ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS);
+
         if (scalingDecisionPublisher.isEnabled()) {
         log.debug("Publishing scaling decision to DAS");
         scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId,
@@ -83,9 +81,8 @@ dialect "mvel"
                                  0, 0, 0, 0, 0, 0,0, 0, 0,
                                  clusterInstanceContext.getMinInstanceCount(), 0,
                                  additionalInstances, scalingReason);
-        } else {
-            log.warn("Scaling decision publisher is not enabled");
         }
+
         while(count != additionalInstances && partitionsAvailable){
 
             ClusterLevelPartitionContext partitionContext =  (ClusterLevelPartitionContext)partitionAlgorithm.getNextScaleUpPartitionContext(clusterInstanceContext.getPartitionCtxtsAsAnArray());

http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
index 912685c..f367652 100644
--- a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
@@ -40,9 +40,6 @@ import org.apache.stratos.cloud.controller.stub.domain.Partition;
 import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
 import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
 import java.util.UUID;
-import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher;
-import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
 import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage
 import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption
 
@@ -56,6 +53,7 @@ global java.lang.Boolean mcReset;
 global java.lang.Boolean laReset;
 global java.lang.Boolean arspiReset;
 global java.lang.String algorithmName;
+global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher;
 
 rule "Scaling Rule"
 dialect "mvel"
@@ -168,7 +166,7 @@ dialect "mvel"
 
                     String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString();
                     long scalingTime = System.currentTimeMillis();
-                    ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS);
+
                     if (scalingDecisionPublisher.isEnabled()) {
                         log.debug("Publishing scaling decision to DAS");
                         scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId,
@@ -178,8 +176,6 @@ dialect "mvel"
                                             laPredictedValue, laThreshold, numberOfInstancesReuquiredBasedOnLoadAverage,
                                             numberOfRequiredInstances, activeInstancesCount,
                                             additionalInstances, scalingReason);
-                    } else {
-                        log.warn("Scaling decision publisher is not enabled");
                     }
 
                     while(count != additionalInstances && partitionsAvailable){

http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl b/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl
index 4fc73fd..c4af71c 100644
--- a/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl
+++ b/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl
@@ -40,9 +40,6 @@ import org.apache.stratos.cloud.controller.stub.domain.Partition;
 import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
 import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
 import java.util.UUID;
-import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher;
-import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
 import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage
 import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption
 
@@ -56,6 +53,7 @@ global java.lang.Boolean mcReset;
 global java.lang.Boolean laReset;
 global java.lang.Boolean arspiReset;
 global java.lang.String algorithmName;
+global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher;
 
 rule "Scaling Rule"
 dialect "mvel"
@@ -169,7 +167,7 @@ dialect "mvel"
 
                     String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString();
                     long scalingTime = System.currentTimeMillis();
-                    ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS);
+
                     if (scalingDecisionPublisher.isEnabled()) {
                         log.debug("Publishing scaling decision to DAS");
                         scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId,
@@ -179,8 +177,6 @@ dialect "mvel"
                                             laPredictedValue, laThreshold, numberOfInstancesReuquiredBasedOnLoadAverage,
                                             numberOfRequiredInstances, activeInstancesCount,
                                             additionalInstances, scalingReason);
-                    } else {
-                        log.warn("Scaling decision publisher is not enabled");
                     }
 
                     while(count != additionalInstances && partitionsAvailable){


[3/6] stratos git commit: Simplyfied isPublisherEnabled() logic

Posted by ga...@apache.org.
Simplyfied isPublisherEnabled() logic


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/f3a809b7
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/f3a809b7
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/f3a809b7

Branch: refs/heads/stratos-4.1.x
Commit: f3a809b768cb4275e30d0223b2cd5c374a76f02e
Parents: 65aee8a
Author: Thanuja <th...@wso2.com>
Authored: Mon Nov 23 12:31:03 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Mon Nov 23 12:31:03 2015 +0530

----------------------------------------------------------------------
 .../statistics/publisher/ThriftStatisticsPublisher.java      | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/f3a809b7/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
index 16dba16..4552f92 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -63,14 +63,12 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher {
     }
 
     private boolean isPublisherEnabled() {
-        boolean publisherEnabled = false;
         for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
-            publisherEnabled = thriftClientInfo.isStatsPublisherEnabled();
-            if (publisherEnabled) {
-                break;
+            if (thriftClientInfo.isStatsPublisherEnabled()) {
+                return true;
             }
         }
-        return publisherEnabled;
+        return false;
     }
 
     private void init() {


[5/6] stratos git commit: Moving STATS_PUBLISHER_THREAD_POOL_SIZE to CloudControllerConstants to have common config for member status and info publishers

Posted by ga...@apache.org.
Moving STATS_PUBLISHER_THREAD_POOL_SIZE to CloudControllerConstants to have common config for member status and info publishers


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/05d1c187
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/05d1c187
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/05d1c187

Branch: refs/heads/stratos-4.1.x
Commit: 05d1c187e1aeb3efeef4c1d4f6282ea41de51d63
Parents: 609bc9d
Author: Thanuja <th...@wso2.com>
Authored: Mon Nov 23 19:11:33 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Mon Nov 23 19:11:33 2015 +0530

----------------------------------------------------------------------
 .../statistics/publisher/DASMemberInformationPublisher.java       | 3 +--
 .../controller/statistics/publisher/DASMemberStatusPublisher.java | 3 +--
 .../stratos/cloud/controller/util/CloudControllerConstants.java   | 1 +
 3 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/05d1c187/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
index 621f9e2..8107e1b 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -47,14 +47,13 @@ public class DASMemberInformationPublisher extends MemberInformationPublisher {
     private static final String DATA_STREAM_NAME = "member_info";
     private static final String VERSION = "1.0.0";
     private static final String DAS_THRIFT_CLIENT_NAME = "das";
-    private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
     private static final String VALUE_NOT_FOUND = "Value Not Found";
     private ExecutorService executorService;
 
     private DASMemberInformationPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
         executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
-                STATS_PUBLISHER_THREAD_POOL_SIZE);
+                CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE);
     }
 
     public static DASMemberInformationPublisher getInstance() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/05d1c187/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
index 7a291ab..6bb6251 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -41,13 +41,12 @@ public class DASMemberStatusPublisher extends MemberStatusPublisher {
     private static final String DATA_STREAM_NAME = "member_lifecycle";
     private static final String VERSION = "1.0.0";
     private static final String DAS_THRIFT_CLIENT_NAME = "das";
-    private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
     private ExecutorService executorService;
 
     private DASMemberStatusPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
         executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
-                STATS_PUBLISHER_THREAD_POOL_SIZE);
+                CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE);
     }
 
     public static DASMemberStatusPublisher getInstance() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/05d1c187/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
index ccd8d34..29facf0 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
@@ -163,6 +163,7 @@ public final class CloudControllerConstants {
     public static final String SCALING_DECISION_ID_COL = "scaling_decision_id";
 
     public static final String STATS_PUBLISHER_THREAD_POOL_ID = "cloud.controller.stats.publisher.thread.pool";
+    public static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
 
     /**
      * Properties


[4/6] stratos git commit: Changing publisher classes hierarchy

Posted by ga...@apache.org.
Changing publisher classes hierarchy


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/609bc9d9
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/609bc9d9
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/609bc9d9

Branch: refs/heads/stratos-4.1.x
Commit: 609bc9d9013fc3fb13d5b4a467528eb70540dccf
Parents: f3a809b
Author: Thanuja <th...@wso2.com>
Authored: Mon Nov 23 18:24:30 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Mon Nov 23 18:24:30 2015 +0530

----------------------------------------------------------------------
 .../publisher/DASScalingDecisionPublisher.java  |  5 ++--
 .../publisher/ScalingDecisionPublisher.java     | 24 ++++++++++++--------
 .../DASMemberInformationPublisher.java          |  8 +++----
 .../publisher/DASMemberStatusPublisher.java     |  8 +++----
 .../publisher/MemberInformationPublisher.java   | 12 +++++++---
 .../publisher/MemberStatusPublisher.java        | 16 +++++++++----
 ...InvalidStatisticsPublisherTypeException.java |  2 +-
 .../publisher/HealthStatisticsPublisher.java    | 12 +++++++---
 .../publisher/InFlightRequestPublisher.java     | 11 +++++++--
 .../publisher/ThriftStatisticsPublisher.java    |  4 +---
 .../cep/WSO2CEPHealthStatisticsPublisher.java   |  5 ++--
 .../cep/WSO2CEPInFlightRequestPublisher.java    | 19 ++++++++--------
 12 files changed, 76 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
index 097c568..52857d4 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.autoscaler.statistics.publisher;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.util.AutoscalerConstants;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
@@ -35,7 +34,7 @@ import java.util.concurrent.ExecutorService;
 /**
  * MemberInfoPublisher to publish member information/metadata to DAS.
  */
-public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher implements ScalingDecisionPublisher {
+public class DASScalingDecisionPublisher extends ScalingDecisionPublisher {
 
     private static final Log log = LogFactory.getLog(DASScalingDecisionPublisher.class);
     private static volatile DASScalingDecisionPublisher dasScalingDecisionPublisher;
@@ -165,7 +164,7 @@ public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher imple
                 payload.add(activeInstanceCount);
                 payload.add(additionalInstanceCount);
                 payload.add(scalingReason);
-                DASScalingDecisionPublisher.super.publish(payload.toArray());
+                publish(payload.toArray());
             }
 
         };

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
index f7b0087..fe791f9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
@@ -19,12 +19,18 @@
 
 package org.apache.stratos.autoscaler.statistics.publisher;
 
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 /**
  * Scaling Decision Publisher interface.
  */
-public interface ScalingDecisionPublisher extends StatisticsPublisher {
+public abstract class ScalingDecisionPublisher extends ThriftStatisticsPublisher {
+
+    public ScalingDecisionPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
+
     /**
      * Publishing scaling decision to DAS.
      *
@@ -47,11 +53,11 @@ public interface ScalingDecisionPublisher extends StatisticsPublisher {
      * @param additionalInstanceCount Additional Instance Needed
      * @param scalingReason           Scaling Reason
      */
-    public void publish(Long timestamp, String scalingDecisionId, String clusterId,
-                        int minInstanceCount, int maxInstanceCount,
-                        int rifPredicted, int rifThreshold, int rifRequiredInstances,
-                        int mcPredicted, int mcThreshold, int mcRequiredInstances,
-                        int laPredicted, int laThreshold, int laRequiredInstance,
-                        int requiredInstanceCount, int activeInstanceCount, int additionalInstanceCount,
-                        String scalingReason);
+    public abstract void publish(Long timestamp, String scalingDecisionId, String clusterId,
+                                 int minInstanceCount, int maxInstanceCount,
+                                 int rifPredicted, int rifThreshold, int rifRequiredInstances,
+                                 int mcPredicted, int mcThreshold, int mcRequiredInstances,
+                                 int laPredicted, int laThreshold, int laRequiredInstance,
+                                 int requiredInstanceCount, int activeInstanceCount, int additionalInstanceCount,
+                                 String scalingReason);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
index 4ab65e1..621f9e2 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -27,7 +27,6 @@ import org.apache.stratos.cloud.controller.domain.IaasProvider;
 import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
 import org.apache.stratos.cloud.controller.domain.MemberContext;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
@@ -41,7 +40,7 @@ import java.util.concurrent.ExecutorService;
 /**
  * MemberInfoPublisher to publish member information/metadata to DAS.
  */
-public class DASMemberInformationPublisher extends ThriftStatisticsPublisher implements MemberInformationPublisher {
+public class DASMemberInformationPublisher extends MemberInformationPublisher {
 
     private static final Log log = LogFactory.getLog(DASMemberInformationPublisher.class);
     private static volatile DASMemberInformationPublisher dasMemberInformationPublisher;
@@ -54,7 +53,8 @@ public class DASMemberInformationPublisher extends ThriftStatisticsPublisher imp
 
     private DASMemberInformationPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE);
+        executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+                STATS_PUBLISHER_THREAD_POOL_SIZE);
     }
 
     public static DASMemberInformationPublisher getInstance() {
@@ -158,7 +158,7 @@ public class DASMemberInformationPublisher extends ThriftStatisticsPublisher imp
                                 metadata.getOperatingSystemName(), metadata.getOperatingSystemVersion(),
                                 metadata.getOperatingSystemArchitecture(), metadata.isOperatingSystem64bit()));
                     }
-                    DASMemberInformationPublisher.super.publish(payload.toArray());
+                    publish(payload.toArray());
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
index 332bbba..7a291ab 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.cloud.controller.statistics.publisher;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
@@ -35,7 +34,7 @@ import java.util.concurrent.ExecutorService;
 /**
  * Publishing member status to DAS.
  */
-public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implements MemberStatusPublisher {
+public class DASMemberStatusPublisher extends MemberStatusPublisher {
 
     private static final Log log = LogFactory.getLog(DASMemberStatusPublisher.class);
     private static volatile DASMemberStatusPublisher dasMemberStatusPublisher;
@@ -47,7 +46,8 @@ public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implemen
 
     private DASMemberStatusPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE);
+        executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+                STATS_PUBLISHER_THREAD_POOL_SIZE);
     }
 
     public static DASMemberStatusPublisher getInstance() {
@@ -131,7 +131,7 @@ public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implemen
                 payload.add(partitionId);
                 payload.add(memberId);
                 payload.add(status);
-                DASMemberStatusPublisher.super.publish(payload.toArray());
+                publish(payload.toArray());
             }
         };
         executorService.execute(publisher);

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
index ffe0380..fda1b41 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
@@ -20,12 +20,18 @@
 package org.apache.stratos.cloud.controller.statistics.publisher;
 
 import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 /**
  * Member Information Publisher interface.
  */
-public interface MemberInformationPublisher extends StatisticsPublisher {
+public abstract class MemberInformationPublisher extends ThriftStatisticsPublisher {
+
+    public MemberInformationPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
+
     /**
      * Publishing member information.
      *
@@ -33,6 +39,6 @@ public interface MemberInformationPublisher extends StatisticsPublisher {
      * @param scalingDecisionId Scaling Decision Id
      * @param metadata          InstanceMetadata
      */
-    public void publish(String memberId, String scalingDecisionId, InstanceMetadata metadata);
+    public abstract void publish(String memberId, String scalingDecisionId, InstanceMetadata metadata);
 
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
index fad1006..4fa23b1 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
@@ -19,12 +19,18 @@
 
 package org.apache.stratos.cloud.controller.statistics.publisher;
 
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 /**
  * Member Status Publisher Interface.
  */
-public interface MemberStatusPublisher extends StatisticsPublisher {
+public abstract class MemberStatusPublisher extends ThriftStatisticsPublisher {
+
+    public MemberStatusPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
+
     /**
      * Publishing member status.
      *
@@ -39,7 +45,7 @@ public interface MemberStatusPublisher extends StatisticsPublisher {
      * @param memberId           Member Id
      * @param status             Member Status
      */
-    void publish(Long timestamp, String applicationId, String clusterId,
-                 String clusterAlias, String clusterInstanceId, String serviceName,
-                 String networkPartitionId, String partitionId, String memberId, String status);
+    public abstract void publish(Long timestamp, String applicationId, String clusterId,
+                                 String clusterAlias, String clusterInstanceId, String serviceName,
+                                 String networkPartitionId, String partitionId, String memberId, String status);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
index 09efa1e..4609c9f 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
@@ -22,7 +22,7 @@ package org.apache.stratos.common.exception;
 /**
  * This exception will be thrown when trying to create a publisher with invalid statistics publisher type.
  */
-public class InvalidStatisticsPublisherTypeException extends Exception {
+public class InvalidStatisticsPublisherTypeException extends RuntimeException {
 
     public InvalidStatisticsPublisherTypeException(String message) {
         super(message);

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
index dd7ddd4..20f0ffe 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
@@ -19,10 +19,16 @@
 
 package org.apache.stratos.common.statistics.publisher;
 
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
 /**
  * Health statistics publisher interface.
  */
-public interface HealthStatisticsPublisher extends StatisticsPublisher {
+public abstract class HealthStatisticsPublisher extends ThriftStatisticsPublisher {
+
+    public HealthStatisticsPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
 
     /**
      * Publish health statistics to complex event processor.
@@ -35,6 +41,6 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
      * @param health             Health type: memory_consumption | load_average
      * @param value              Health type value
      */
-    void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
-                 String memberId, String partitionId, String health, double value);
+    public abstract void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
+                                 String memberId, String partitionId, String health, double value);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
index 289be8b..af46ed1 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
@@ -19,10 +19,16 @@
 
 package org.apache.stratos.common.statistics.publisher;
 
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
 /**
  * In-flight request publisher interface.
  */
-public interface InFlightRequestPublisher extends StatisticsPublisher {
+public abstract class InFlightRequestPublisher extends ThriftStatisticsPublisher {
+
+    public InFlightRequestPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
 
     /**
      * Publish in-flight request count.
@@ -32,5 +38,6 @@ public interface InFlightRequestPublisher extends StatisticsPublisher {
      * @param networkPartitionId   Network partition id of the cluster
      * @param inFlightRequestCount In-flight request count of the cluster
      */
-    void publish(String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount);
+    public abstract void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
+                                 int inFlightRequestCount);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
index 4552f92..95c0478 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -77,9 +77,7 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher {
         loadBalancingDataPublisher = new LoadBalancingDataPublisher(getReceiverGroups());
 
         //adding stream definition
-        if (!loadBalancingDataPublisher.isStreamDefinitionAdded(streamDefinition)) {
-            loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
-        }
+        loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
     }
 
     private ArrayList<ReceiverGroup> getReceiverGroups() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index d025c33..03222ec 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.common.statistics.publisher.wso2.cep;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.statistics.publisher.HealthStatisticsPublisher;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -33,7 +32,7 @@ import java.util.List;
 /**
  * Health statistics publisher for publishing statistics to WSO2 CEP.
  */
-public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher implements HealthStatisticsPublisher {
+public class WSO2CEPHealthStatisticsPublisher extends HealthStatisticsPublisher {
 
     private static final Log log = LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class);
     private static volatile WSO2CEPHealthStatisticsPublisher wso2CEPHealthStatisticsPublisher;
@@ -109,6 +108,6 @@ public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher
         payload.add(health);
         payload.add(value);
 
-        super.publish(payload.toArray());
+        publish(payload.toArray());
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index 8c9189b..862a49d 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.common.statistics.publisher.wso2.cep;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -36,7 +35,7 @@ import java.util.List;
  * In-flight request count:
  * Number of requests being served at a given moment could be identified as in-flight request count.
  */
-public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher implements InFlightRequestPublisher {
+public class WSO2CEPInFlightRequestPublisher extends InFlightRequestPublisher {
     private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class);
     private static volatile WSO2CEPInFlightRequestPublisher wso2CEPInFlightRequestPublisher;
     private static final String DATA_STREAM_NAME = "in_flight_requests";
@@ -47,11 +46,11 @@ public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher i
         super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME);
     }
 
-    public static  WSO2CEPInFlightRequestPublisher getInstance() {
+    public static WSO2CEPInFlightRequestPublisher getInstance() {
         if (wso2CEPInFlightRequestPublisher == null) {
-            synchronized ( WSO2CEPInFlightRequestPublisher.class) {
+            synchronized (WSO2CEPInFlightRequestPublisher.class) {
                 if (wso2CEPInFlightRequestPublisher == null) {
-                    wso2CEPInFlightRequestPublisher = new  WSO2CEPInFlightRequestPublisher();
+                    wso2CEPInFlightRequestPublisher = new WSO2CEPInFlightRequestPublisher();
                 }
             }
         }
@@ -81,10 +80,10 @@ public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher i
     /**
      * Publish in-flight request count of a cluster.
      *
-     * @param clusterId
-     * @param clusterInstanceId
-     * @param networkPartitionId
-     * @param inFlightRequestCount
+     * @param clusterId             Cluster id
+     * @param clusterInstanceId     Cluster instance id
+     * @param networkPartitionId    Cluster's network partition id
+     * @param inFlightRequestCount  Cluster's in-flight-request count
      */
     @Override
     public void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
@@ -102,6 +101,6 @@ public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher i
         payload.add(networkPartitionId);
         payload.add((double) inFlightRequestCount);
 
-        super.publish(payload.toArray());
+        publish(payload.toArray());
     }
 }


[6/6] stratos git commit: Merge branch 'stratos-4.1.x' of https://github.com/Thanu/stratos into stratos-4.1.x

Posted by ga...@apache.org.
Merge branch 'stratos-4.1.x' of https://github.com/Thanu/stratos into stratos-4.1.x


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/509bd6bd
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/509bd6bd
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/509bd6bd

Branch: refs/heads/stratos-4.1.x
Commit: 509bd6bdc4b7e6fd7c8a0e0fc3ec3b637e68d514
Parents: 6496b05 05d1c18
Author: gayangunarathne <ga...@wso2.com>
Authored: Mon Nov 23 20:08:45 2015 +0530
Committer: gayangunarathne <ga...@wso2.com>
Committed: Mon Nov 23 20:08:45 2015 +0530

----------------------------------------------------------------------
 .../monitor/cluster/ClusterMonitor.java         | 21 ++++-
 .../publisher/AutoscalerPublisherFactory.java   |  5 +-
 .../publisher/DASScalingDecisionPublisher.java  | 25 ++++--
 .../publisher/ScalingDecisionPublisher.java     | 24 ++++--
 .../autoscaler/util/AutoscalerConstants.java    |  1 +
 .../messaging/topology/TopologyBuilder.java     | 34 ++------
 .../CloudControllerPublisherFactory.java        |  9 +-
 .../DASMemberInformationPublisher.java          | 23 ++++--
 .../publisher/DASMemberStatusPublisher.java     | 22 +++--
 .../publisher/MemberInformationPublisher.java   | 12 ++-
 .../publisher/MemberStatusPublisher.java        | 16 ++--
 .../util/CloudControllerConstants.java          |  2 +
 ...InvalidStatisticsPublisherTypeException.java | 30 +++++++
 .../publisher/HealthStatisticsPublisher.java    | 12 ++-
 .../HealthStatisticsPublisherFactory.java       |  5 +-
 .../publisher/InFlightRequestPublisher.java     | 11 ++-
 .../InFlightRequestPublisherFactory.java        |  5 +-
 .../publisher/ThriftStatisticsPublisher.java    | 86 ++++++++++----------
 .../cep/WSO2CEPHealthStatisticsPublisher.java   | 20 +++--
 .../cep/WSO2CEPInFlightRequestPublisher.java    | 28 +++++--
 .../src/main/conf/drools/dependent-scaling.drl  |  8 +-
 .../src/main/conf/drools/mincheck.drl           |  9 +-
 .../src/main/conf/drools/scaling.drl            |  8 +-
 .../src/test/resources/common/scaling.drl       |  8 +-
 24 files changed, 259 insertions(+), 165 deletions(-)
----------------------------------------------------------------------



[2/6] stratos git commit: Refactroing thrift publisher classes

Posted by ga...@apache.org.
Refactroing thrift publisher classes


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/af13aeba
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/af13aeba
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/af13aeba

Branch: refs/heads/stratos-4.1.x
Commit: af13aebae4b78c9ba4df0286eb081d48794fc427
Parents: 962ce94
Author: Thanuja <th...@wso2.com>
Authored: Mon Nov 23 11:23:15 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Mon Nov 23 11:28:38 2015 +0530

----------------------------------------------------------------------
 .../publisher/AutoscalerPublisherFactory.java   |  5 +-
 .../publisher/DASScalingDecisionPublisher.java  | 20 ++++-
 .../autoscaler/util/AutoscalerConstants.java    |  1 +
 .../CloudControllerPublisherFactory.java        |  9 +-
 .../DASMemberInformationPublisher.java          | 18 +++-
 .../publisher/DASMemberStatusPublisher.java     | 17 +++-
 .../util/CloudControllerConstants.java          |  1 +
 ...InvalidStatisticsPublisherTypeException.java | 30 +++++++
 .../HealthStatisticsPublisherFactory.java       |  5 +-
 .../InFlightRequestPublisherFactory.java        |  5 +-
 .../publisher/ThriftStatisticsPublisher.java    | 92 ++++++++++----------
 .../cep/WSO2CEPHealthStatisticsPublisher.java   | 15 +++-
 .../cep/WSO2CEPInFlightRequestPublisher.java    | 15 +++-
 13 files changed, 165 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
index d057108..8c688ba 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.stratos.autoscaler.statistics.publisher;
 
+import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
 import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
 
 /**
@@ -29,9 +30,9 @@ public class AutoscalerPublisherFactory {
     public static ScalingDecisionPublisher createScalingDecisionPublisher(StatisticsPublisherType type) {
 
         if (type == StatisticsPublisherType.WSO2DAS) {
-            return new DASScalingDecisionPublisher();
+            return DASScalingDecisionPublisher.getInstance();
         } else {
-            throw new RuntimeException("Unknown statistics publisher type");
+            throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
index a907043..097c568 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
@@ -38,22 +38,36 @@ import java.util.concurrent.ExecutorService;
 public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher implements ScalingDecisionPublisher {
 
     private static final Log log = LogFactory.getLog(DASScalingDecisionPublisher.class);
+    private static volatile DASScalingDecisionPublisher dasScalingDecisionPublisher;
     private static final String DATA_STREAM_NAME = "scaling_decision";
     private static final String VERSION = "1.0.0";
     private static final String DAS_THRIFT_CLIENT_NAME = "das";
+    private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
     private ExecutorService executorService;
 
     public DASScalingDecisionPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = StratosThreadPool.getExecutorService("autoscaler.stats.publisher.thread.pool", 10);
+        executorService = StratosThreadPool.getExecutorService(AutoscalerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+                STATS_PUBLISHER_THREAD_POOL_SIZE);
+    }
+
+    public static DASScalingDecisionPublisher getInstance() {
+        if (dasScalingDecisionPublisher == null) {
+            synchronized (DASScalingDecisionPublisher.class) {
+                if (dasScalingDecisionPublisher == null) {
+                    dasScalingDecisionPublisher = new DASScalingDecisionPublisher();
+                }
+            }
+        }
+        return dasScalingDecisionPublisher;
     }
 
     private static StreamDefinition createStreamDefinition() {
         try {
             // Create stream definition
             StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
-            streamDefinition.setNickName("Member Information");
-            streamDefinition.setDescription("Member Information");
+            streamDefinition.setNickName("Scaling Decision");
+            streamDefinition.setDescription("Scaling Decision");
             List<Attribute> payloadData = new ArrayList<Attribute>();
 
             // Set payload definition

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
index 997ab0c..ef12983 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
@@ -72,6 +72,7 @@ public final class AutoscalerConstants {
     public static final String PAYLOAD_DEPLOYMENT = "default";
 
     public static final String MONITOR_THREAD_POOL_ID = "monitor.thread.pool";
+    public static final String STATS_PUBLISHER_THREAD_POOL_ID = "autoscaler.stats.publisher.thread.pool";
     public static final String MONITOR_THREAD_POOL_SIZE = "monitor.thread.pool.size";
     public static final String CLUSTER_MONITOR_SCHEDULER_ID = "cluster.monitor.scheduler";
     public static final String MEMBER_FAULT_EVENT_NAME = "member_fault";

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
index db68396..87d7ab9 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.stratos.cloud.controller.statistics.publisher;
 
+import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
 import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
 
 /**
@@ -33,9 +34,9 @@ public class CloudControllerPublisherFactory {
      */
     public static MemberInformationPublisher createMemberInformationPublisher(StatisticsPublisherType type) {
         if (type == StatisticsPublisherType.WSO2DAS) {
-            return new DASMemberInformationPublisher();
+            return DASMemberInformationPublisher.getInstance();
         } else {
-            throw new RuntimeException("Unknown statistics publisher type");
+            throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher.");
         }
     }
 
@@ -47,9 +48,9 @@ public class CloudControllerPublisherFactory {
      */
     public static MemberStatusPublisher createMemberStatusPublisher(StatisticsPublisherType type) {
         if (type == StatisticsPublisherType.WSO2DAS) {
-            return new DASMemberStatusPublisher();
+            return DASMemberStatusPublisher.getInstance();
         } else {
-            throw new RuntimeException("Unknown statistics publisher type");
+            throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
index d0dcc49..4ab65e1 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -44,16 +44,28 @@ import java.util.concurrent.ExecutorService;
 public class DASMemberInformationPublisher extends ThriftStatisticsPublisher implements MemberInformationPublisher {
 
     private static final Log log = LogFactory.getLog(DASMemberInformationPublisher.class);
-
+    private static volatile DASMemberInformationPublisher dasMemberInformationPublisher;
     private static final String DATA_STREAM_NAME = "member_info";
     private static final String VERSION = "1.0.0";
     private static final String DAS_THRIFT_CLIENT_NAME = "das";
+    private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
     private static final String VALUE_NOT_FOUND = "Value Not Found";
     private ExecutorService executorService;
 
-    public DASMemberInformationPublisher() {
+    private DASMemberInformationPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10);
+        executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE);
+    }
+
+    public static DASMemberInformationPublisher getInstance() {
+        if (dasMemberInformationPublisher == null) {
+            synchronized (DASMemberInformationPublisher.class) {
+                if (dasMemberInformationPublisher == null) {
+                    dasMemberInformationPublisher = new DASMemberInformationPublisher();
+                }
+            }
+        }
+        return dasMemberInformationPublisher;
     }
 
     private static StreamDefinition createStreamDefinition() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
index 877256d..332bbba 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -38,14 +38,27 @@ import java.util.concurrent.ExecutorService;
 public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implements MemberStatusPublisher {
 
     private static final Log log = LogFactory.getLog(DASMemberStatusPublisher.class);
+    private static volatile DASMemberStatusPublisher dasMemberStatusPublisher;
     private static final String DATA_STREAM_NAME = "member_lifecycle";
     private static final String VERSION = "1.0.0";
     private static final String DAS_THRIFT_CLIENT_NAME = "das";
+    private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
     private ExecutorService executorService;
 
-    public DASMemberStatusPublisher() {
+    private DASMemberStatusPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10);
+        executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE);
+    }
+
+    public static DASMemberStatusPublisher getInstance() {
+        if (dasMemberStatusPublisher == null) {
+            synchronized (DASMemberStatusPublisher.class) {
+                if (dasMemberStatusPublisher == null) {
+                    dasMemberStatusPublisher = new DASMemberStatusPublisher();
+                }
+            }
+        }
+        return dasMemberStatusPublisher;
     }
 
     private static StreamDefinition createStreamDefinition() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
index c025bb4..ccd8d34 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
@@ -162,6 +162,7 @@ public final class CloudControllerConstants {
     public static final String TIMESTAMP_COL = "timestamp";
     public static final String SCALING_DECISION_ID_COL = "scaling_decision_id";
 
+    public static final String STATS_PUBLISHER_THREAD_POOL_ID = "cloud.controller.stats.publisher.thread.pool";
 
     /**
      * Properties

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
new file mode 100644
index 0000000..09efa1e
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.exception;
+
+/**
+ * This exception will be thrown when trying to create a publisher with invalid statistics publisher type.
+ */
+public class InvalidStatisticsPublisherTypeException extends Exception {
+
+    public InvalidStatisticsPublisherTypeException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
index e4047ab..bf67c1b 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.stratos.common.statistics.publisher;
 
+import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
 import org.apache.stratos.common.statistics.publisher.wso2.cep.WSO2CEPHealthStatisticsPublisher;
 
 /**
@@ -28,9 +29,9 @@ public class HealthStatisticsPublisherFactory {
 
     public static HealthStatisticsPublisher createHealthStatisticsPublisher(StatisticsPublisherType type) {
         if (type == StatisticsPublisherType.WSO2CEP) {
-            return new WSO2CEPHealthStatisticsPublisher();
+            return WSO2CEPHealthStatisticsPublisher.getInstance();
         } else {
-            throw new RuntimeException("Unknown statistics publisher type");
+            throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
index c942bce..a4b9db8 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.stratos.common.statistics.publisher;
 
+import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
 import org.apache.stratos.common.statistics.publisher.wso2.cep.WSO2CEPInFlightRequestPublisher;
 
 /**
@@ -28,9 +29,9 @@ public class InFlightRequestPublisherFactory {
 
     public static InFlightRequestPublisher createInFlightRequestPublisher(StatisticsPublisherType type) {
         if (type == StatisticsPublisherType.WSO2CEP) {
-            return new WSO2CEPInFlightRequestPublisher();
+            return WSO2CEPInFlightRequestPublisher.getInstance();
         } else {
-            throw new RuntimeException("Unknown statistics publisher type");
+            throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
index 7d4aa6e..16dba16 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -21,14 +21,10 @@ package org.apache.stratos.common.statistics.publisher;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.wso2.carbon.databridge.agent.thrift.Agent;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
 import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
 import org.wso2.carbon.databridge.agent.thrift.lb.DataPublisherHolder;
 import org.wso2.carbon.databridge.agent.thrift.lb.LoadBalancingDataPublisher;
 import org.wso2.carbon.databridge.agent.thrift.lb.ReceiverGroup;
-import org.wso2.carbon.databridge.agent.thrift.util.DataPublisherUtil;
 import org.wso2.carbon.databridge.commons.Event;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
 
@@ -52,8 +48,8 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher {
      * Credential information stored inside thrift-client-config.xml file
      * is parsed and assigned into ip,port,username and password fields
      *
-     * @param streamDefinition      Thrift Event Stream Definition
-     * @param thriftClientName      Thrift Client Name
+     * @param streamDefinition Thrift Event Stream Definition
+     * @param thriftClientName Thrift Client Name
      */
     public ThriftStatisticsPublisher(StreamDefinition streamDefinition, String thriftClientName) {
         ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance();
@@ -61,54 +57,58 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher {
         this.streamDefinition = streamDefinition;
 
         if (isPublisherEnabled()) {
-        	this.enabled = true;
+            this.enabled = true;
             init();
         }
     }
 
     private boolean isPublisherEnabled() {
-    	boolean publisherEnabled = false;
-    	for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
-    		publisherEnabled = thriftClientInfo.isStatsPublisherEnabled();
-    		if(publisherEnabled){
-    			break;
-    		}
-		}    	
-		return publisherEnabled;
-	}
-
-	private void init() {
-        
+        boolean publisherEnabled = false;
+        for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
+            publisherEnabled = thriftClientInfo.isStatsPublisherEnabled();
+            if (publisherEnabled) {
+                break;
+            }
+        }
+        return publisherEnabled;
+    }
+
+    private void init() {
+
         // Initialize load balancing data publisher       
         loadBalancingDataPublisher = new LoadBalancingDataPublisher(getReceiverGroups());
-        loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
+
+        //adding stream definition
+        if (!loadBalancingDataPublisher.isStreamDefinitionAdded(streamDefinition)) {
+            loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
+        }
     }
 
     private ArrayList<ReceiverGroup> getReceiverGroups() {
-    	
-        ArrayList<ReceiverGroup> receiverGroups = new ArrayList<ReceiverGroup>();    
-        
+
+        ArrayList<ReceiverGroup> receiverGroups = new ArrayList<ReceiverGroup>();
+
         for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
-        	ArrayList<DataPublisherHolder> dataPublisherHolders = new ArrayList<DataPublisherHolder>();
-			DataPublisherHolder aNode = new DataPublisherHolder(null, buildUrl(thriftClientInfo), thriftClientInfo.getUsername(), thriftClientInfo.getPassword());
-			dataPublisherHolders.add(aNode);
-			ReceiverGroup group = new ReceiverGroup(dataPublisherHolders);
-			receiverGroups.add(group);
-		} 
-		return receiverGroups; 
-        
-	}
-
-	private String buildUrl(ThriftClientInfo thriftClientInfo) {
-		String url = new StringBuilder()
-						.append("tcp://")
-						.append(thriftClientInfo.getIp())
-						.append(":")
-						.append(thriftClientInfo.getPort()).toString();				
-		return url;
-	}
-
-	@Override
+            ArrayList<DataPublisherHolder> dataPublisherHolders = new ArrayList<DataPublisherHolder>();
+            DataPublisherHolder aNode = new DataPublisherHolder(null, buildUrl(thriftClientInfo), thriftClientInfo.getUsername(), thriftClientInfo.getPassword());
+            dataPublisherHolders.add(aNode);
+            ReceiverGroup group = new ReceiverGroup(dataPublisherHolders);
+            receiverGroups.add(group);
+        }
+        return receiverGroups;
+
+    }
+
+    private String buildUrl(ThriftClientInfo thriftClientInfo) {
+        String url = new StringBuilder()
+                .append("tcp://")
+                .append(thriftClientInfo.getIp())
+                .append(":")
+                .append(thriftClientInfo.getPort()).toString();
+        return url;
+    }
+
+    @Override
     public void setEnabled(boolean enabled) {
         this.enabled = enabled;
         if (this.enabled) {
@@ -138,11 +138,11 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher {
             }
             loadBalancingDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
             if (log.isDebugEnabled()) {
-                log.debug(String.format("Successfully Published ********  thrift event: [stream] %s [version] %s",
+                log.debug(String.format("Successfully Published thrift event: [stream] %s [version] %s",
                         streamDefinition.getName(), streamDefinition.getVersion()));
             }
-            
-            
+
+
         } catch (AgentException e) {
             if (log.isErrorEnabled()) {
                 log.error(String.format("Could not publish thrift event: [stream] %s [version] %s",

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index ea1c401..d025c33 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -36,15 +36,26 @@ import java.util.List;
 public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher implements HealthStatisticsPublisher {
 
     private static final Log log = LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class);
-
+    private static volatile WSO2CEPHealthStatisticsPublisher wso2CEPHealthStatisticsPublisher;
     private static final String DATA_STREAM_NAME = "cartridge_agent_health_stats";
     private static final String VERSION = "1.0.0";
     private static final String CEP_THRIFT_CLIENT_NAME = "cep";
 
-    public WSO2CEPHealthStatisticsPublisher() {
+    private WSO2CEPHealthStatisticsPublisher() {
         super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME);
     }
 
+    public static WSO2CEPHealthStatisticsPublisher getInstance() {
+        if (wso2CEPHealthStatisticsPublisher == null) {
+            synchronized (WSO2CEPHealthStatisticsPublisher.class) {
+                if (wso2CEPHealthStatisticsPublisher == null) {
+                    wso2CEPHealthStatisticsPublisher = new WSO2CEPHealthStatisticsPublisher();
+                }
+            }
+        }
+        return wso2CEPHealthStatisticsPublisher;
+    }
+
     private static StreamDefinition createStreamDefinition() {
         try {
             // Create stream definition

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index 24465c7..8c9189b 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -38,15 +38,26 @@ import java.util.List;
  */
 public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher implements InFlightRequestPublisher {
     private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class);
-
+    private static volatile WSO2CEPInFlightRequestPublisher wso2CEPInFlightRequestPublisher;
     private static final String DATA_STREAM_NAME = "in_flight_requests";
     private static final String VERSION = "1.0.0";
     private static final String CEP_THRIFT_CLIENT_NAME = "cep";
 
-    public WSO2CEPInFlightRequestPublisher() {
+    private WSO2CEPInFlightRequestPublisher() {
         super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME);
     }
 
+    public static  WSO2CEPInFlightRequestPublisher getInstance() {
+        if (wso2CEPInFlightRequestPublisher == null) {
+            synchronized ( WSO2CEPInFlightRequestPublisher.class) {
+                if (wso2CEPInFlightRequestPublisher == null) {
+                    wso2CEPInFlightRequestPublisher = new  WSO2CEPInFlightRequestPublisher();
+                }
+            }
+        }
+        return wso2CEPInFlightRequestPublisher;
+    }
+
     private static StreamDefinition createStreamDefinition() {
         try {
             // Create stream definition