You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ga...@apache.org on 2015/11/23 16:04:18 UTC

[4/6] stratos git commit: Changing publisher classes hierarchy

Changing publisher classes hierarchy


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/609bc9d9
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/609bc9d9
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/609bc9d9

Branch: refs/heads/stratos-4.1.x
Commit: 609bc9d9013fc3fb13d5b4a467528eb70540dccf
Parents: f3a809b
Author: Thanuja <th...@wso2.com>
Authored: Mon Nov 23 18:24:30 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Mon Nov 23 18:24:30 2015 +0530

----------------------------------------------------------------------
 .../publisher/DASScalingDecisionPublisher.java  |  5 ++--
 .../publisher/ScalingDecisionPublisher.java     | 24 ++++++++++++--------
 .../DASMemberInformationPublisher.java          |  8 +++----
 .../publisher/DASMemberStatusPublisher.java     |  8 +++----
 .../publisher/MemberInformationPublisher.java   | 12 +++++++---
 .../publisher/MemberStatusPublisher.java        | 16 +++++++++----
 ...InvalidStatisticsPublisherTypeException.java |  2 +-
 .../publisher/HealthStatisticsPublisher.java    | 12 +++++++---
 .../publisher/InFlightRequestPublisher.java     | 11 +++++++--
 .../publisher/ThriftStatisticsPublisher.java    |  4 +---
 .../cep/WSO2CEPHealthStatisticsPublisher.java   |  5 ++--
 .../cep/WSO2CEPInFlightRequestPublisher.java    | 19 ++++++++--------
 12 files changed, 76 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
index 097c568..52857d4 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.autoscaler.statistics.publisher;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.util.AutoscalerConstants;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
@@ -35,7 +34,7 @@ import java.util.concurrent.ExecutorService;
 /**
  * MemberInfoPublisher to publish member information/metadata to DAS.
  */
-public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher implements ScalingDecisionPublisher {
+public class DASScalingDecisionPublisher extends ScalingDecisionPublisher {
 
     private static final Log log = LogFactory.getLog(DASScalingDecisionPublisher.class);
     private static volatile DASScalingDecisionPublisher dasScalingDecisionPublisher;
@@ -165,7 +164,7 @@ public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher imple
                 payload.add(activeInstanceCount);
                 payload.add(additionalInstanceCount);
                 payload.add(scalingReason);
-                DASScalingDecisionPublisher.super.publish(payload.toArray());
+                publish(payload.toArray());
             }
 
         };

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
index f7b0087..fe791f9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
@@ -19,12 +19,18 @@
 
 package org.apache.stratos.autoscaler.statistics.publisher;
 
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 /**
  * Scaling Decision Publisher interface.
  */
-public interface ScalingDecisionPublisher extends StatisticsPublisher {
+public abstract class ScalingDecisionPublisher extends ThriftStatisticsPublisher {
+
+    public ScalingDecisionPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
+
     /**
      * Publishing scaling decision to DAS.
      *
@@ -47,11 +53,11 @@ public interface ScalingDecisionPublisher extends StatisticsPublisher {
      * @param additionalInstanceCount Additional Instance Needed
      * @param scalingReason           Scaling Reason
      */
-    public void publish(Long timestamp, String scalingDecisionId, String clusterId,
-                        int minInstanceCount, int maxInstanceCount,
-                        int rifPredicted, int rifThreshold, int rifRequiredInstances,
-                        int mcPredicted, int mcThreshold, int mcRequiredInstances,
-                        int laPredicted, int laThreshold, int laRequiredInstance,
-                        int requiredInstanceCount, int activeInstanceCount, int additionalInstanceCount,
-                        String scalingReason);
+    public abstract void publish(Long timestamp, String scalingDecisionId, String clusterId,
+                                 int minInstanceCount, int maxInstanceCount,
+                                 int rifPredicted, int rifThreshold, int rifRequiredInstances,
+                                 int mcPredicted, int mcThreshold, int mcRequiredInstances,
+                                 int laPredicted, int laThreshold, int laRequiredInstance,
+                                 int requiredInstanceCount, int activeInstanceCount, int additionalInstanceCount,
+                                 String scalingReason);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
index 4ab65e1..621f9e2 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -27,7 +27,6 @@ import org.apache.stratos.cloud.controller.domain.IaasProvider;
 import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
 import org.apache.stratos.cloud.controller.domain.MemberContext;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
@@ -41,7 +40,7 @@ import java.util.concurrent.ExecutorService;
 /**
  * MemberInfoPublisher to publish member information/metadata to DAS.
  */
-public class DASMemberInformationPublisher extends ThriftStatisticsPublisher implements MemberInformationPublisher {
+public class DASMemberInformationPublisher extends MemberInformationPublisher {
 
     private static final Log log = LogFactory.getLog(DASMemberInformationPublisher.class);
     private static volatile DASMemberInformationPublisher dasMemberInformationPublisher;
@@ -54,7 +53,8 @@ public class DASMemberInformationPublisher extends ThriftStatisticsPublisher imp
 
     private DASMemberInformationPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE);
+        executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+                STATS_PUBLISHER_THREAD_POOL_SIZE);
     }
 
     public static DASMemberInformationPublisher getInstance() {
@@ -158,7 +158,7 @@ public class DASMemberInformationPublisher extends ThriftStatisticsPublisher imp
                                 metadata.getOperatingSystemName(), metadata.getOperatingSystemVersion(),
                                 metadata.getOperatingSystemArchitecture(), metadata.isOperatingSystem64bit()));
                     }
-                    DASMemberInformationPublisher.super.publish(payload.toArray());
+                    publish(payload.toArray());
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
index 332bbba..7a291ab 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.cloud.controller.statistics.publisher;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
@@ -35,7 +34,7 @@ import java.util.concurrent.ExecutorService;
 /**
  * Publishing member status to DAS.
  */
-public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implements MemberStatusPublisher {
+public class DASMemberStatusPublisher extends MemberStatusPublisher {
 
     private static final Log log = LogFactory.getLog(DASMemberStatusPublisher.class);
     private static volatile DASMemberStatusPublisher dasMemberStatusPublisher;
@@ -47,7 +46,8 @@ public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implemen
 
     private DASMemberStatusPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE);
+        executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+                STATS_PUBLISHER_THREAD_POOL_SIZE);
     }
 
     public static DASMemberStatusPublisher getInstance() {
@@ -131,7 +131,7 @@ public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implemen
                 payload.add(partitionId);
                 payload.add(memberId);
                 payload.add(status);
-                DASMemberStatusPublisher.super.publish(payload.toArray());
+                publish(payload.toArray());
             }
         };
         executorService.execute(publisher);

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
index ffe0380..fda1b41 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
@@ -20,12 +20,18 @@
 package org.apache.stratos.cloud.controller.statistics.publisher;
 
 import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 /**
  * Member Information Publisher interface.
  */
-public interface MemberInformationPublisher extends StatisticsPublisher {
+public abstract class MemberInformationPublisher extends ThriftStatisticsPublisher {
+
+    public MemberInformationPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
+
     /**
      * Publishing member information.
      *
@@ -33,6 +39,6 @@ public interface MemberInformationPublisher extends StatisticsPublisher {
      * @param scalingDecisionId Scaling Decision Id
      * @param metadata          InstanceMetadata
      */
-    public void publish(String memberId, String scalingDecisionId, InstanceMetadata metadata);
+    public abstract void publish(String memberId, String scalingDecisionId, InstanceMetadata metadata);
 
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
index fad1006..4fa23b1 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
@@ -19,12 +19,18 @@
 
 package org.apache.stratos.cloud.controller.statistics.publisher;
 
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 /**
  * Member Status Publisher Interface.
  */
-public interface MemberStatusPublisher extends StatisticsPublisher {
+public abstract class MemberStatusPublisher extends ThriftStatisticsPublisher {
+
+    public MemberStatusPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
+
     /**
      * Publishing member status.
      *
@@ -39,7 +45,7 @@ public interface MemberStatusPublisher extends StatisticsPublisher {
      * @param memberId           Member Id
      * @param status             Member Status
      */
-    void publish(Long timestamp, String applicationId, String clusterId,
-                 String clusterAlias, String clusterInstanceId, String serviceName,
-                 String networkPartitionId, String partitionId, String memberId, String status);
+    public abstract void publish(Long timestamp, String applicationId, String clusterId,
+                                 String clusterAlias, String clusterInstanceId, String serviceName,
+                                 String networkPartitionId, String partitionId, String memberId, String status);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
index 09efa1e..4609c9f 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
@@ -22,7 +22,7 @@ package org.apache.stratos.common.exception;
 /**
  * This exception will be thrown when trying to create a publisher with invalid statistics publisher type.
  */
-public class InvalidStatisticsPublisherTypeException extends Exception {
+public class InvalidStatisticsPublisherTypeException extends RuntimeException {
 
     public InvalidStatisticsPublisherTypeException(String message) {
         super(message);

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
index dd7ddd4..20f0ffe 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
@@ -19,10 +19,16 @@
 
 package org.apache.stratos.common.statistics.publisher;
 
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
 /**
  * Health statistics publisher interface.
  */
-public interface HealthStatisticsPublisher extends StatisticsPublisher {
+public abstract class HealthStatisticsPublisher extends ThriftStatisticsPublisher {
+
+    public HealthStatisticsPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
 
     /**
      * Publish health statistics to complex event processor.
@@ -35,6 +41,6 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
      * @param health             Health type: memory_consumption | load_average
      * @param value              Health type value
      */
-    void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
-                 String memberId, String partitionId, String health, double value);
+    public abstract void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
+                                 String memberId, String partitionId, String health, double value);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
index 289be8b..af46ed1 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
@@ -19,10 +19,16 @@
 
 package org.apache.stratos.common.statistics.publisher;
 
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
 /**
  * In-flight request publisher interface.
  */
-public interface InFlightRequestPublisher extends StatisticsPublisher {
+public abstract class InFlightRequestPublisher extends ThriftStatisticsPublisher {
+
+    public InFlightRequestPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
 
     /**
      * Publish in-flight request count.
@@ -32,5 +38,6 @@ public interface InFlightRequestPublisher extends StatisticsPublisher {
      * @param networkPartitionId   Network partition id of the cluster
      * @param inFlightRequestCount In-flight request count of the cluster
      */
-    void publish(String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount);
+    public abstract void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
+                                 int inFlightRequestCount);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
index 4552f92..95c0478 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -77,9 +77,7 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher {
         loadBalancingDataPublisher = new LoadBalancingDataPublisher(getReceiverGroups());
 
         //adding stream definition
-        if (!loadBalancingDataPublisher.isStreamDefinitionAdded(streamDefinition)) {
-            loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
-        }
+        loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
     }
 
     private ArrayList<ReceiverGroup> getReceiverGroups() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index d025c33..03222ec 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.common.statistics.publisher.wso2.cep;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.statistics.publisher.HealthStatisticsPublisher;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -33,7 +32,7 @@ import java.util.List;
 /**
  * Health statistics publisher for publishing statistics to WSO2 CEP.
  */
-public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher implements HealthStatisticsPublisher {
+public class WSO2CEPHealthStatisticsPublisher extends HealthStatisticsPublisher {
 
     private static final Log log = LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class);
     private static volatile WSO2CEPHealthStatisticsPublisher wso2CEPHealthStatisticsPublisher;
@@ -109,6 +108,6 @@ public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher
         payload.add(health);
         payload.add(value);
 
-        super.publish(payload.toArray());
+        publish(payload.toArray());
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index 8c9189b..862a49d 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.common.statistics.publisher.wso2.cep;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -36,7 +35,7 @@ import java.util.List;
  * In-flight request count:
  * Number of requests being served at a given moment could be identified as in-flight request count.
  */
-public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher implements InFlightRequestPublisher {
+public class WSO2CEPInFlightRequestPublisher extends InFlightRequestPublisher {
     private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class);
     private static volatile WSO2CEPInFlightRequestPublisher wso2CEPInFlightRequestPublisher;
     private static final String DATA_STREAM_NAME = "in_flight_requests";
@@ -47,11 +46,11 @@ public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher i
         super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME);
     }
 
-    public static  WSO2CEPInFlightRequestPublisher getInstance() {
+    public static WSO2CEPInFlightRequestPublisher getInstance() {
         if (wso2CEPInFlightRequestPublisher == null) {
-            synchronized ( WSO2CEPInFlightRequestPublisher.class) {
+            synchronized (WSO2CEPInFlightRequestPublisher.class) {
                 if (wso2CEPInFlightRequestPublisher == null) {
-                    wso2CEPInFlightRequestPublisher = new  WSO2CEPInFlightRequestPublisher();
+                    wso2CEPInFlightRequestPublisher = new WSO2CEPInFlightRequestPublisher();
                 }
             }
         }
@@ -81,10 +80,10 @@ public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher i
     /**
      * Publish in-flight request count of a cluster.
      *
-     * @param clusterId
-     * @param clusterInstanceId
-     * @param networkPartitionId
-     * @param inFlightRequestCount
+     * @param clusterId             Cluster id
+     * @param clusterInstanceId     Cluster instance id
+     * @param networkPartitionId    Cluster's network partition id
+     * @param inFlightRequestCount  Cluster's in-flight-request count
      */
     @Override
     public void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
@@ -102,6 +101,6 @@ public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher i
         payload.add(networkPartitionId);
         payload.add((double) inFlightRequestCount);
 
-        super.publish(payload.toArray());
+        publish(payload.toArray());
     }
 }