You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2015/08/14 07:19:58 UTC

stratos git commit: Refactoring Monitoring Service Classes

Repository: stratos
Updated Branches:
  refs/heads/data-publisher-integration f704fa3da -> b530e9de3


Refactoring Monitoring Service Classes


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b530e9de
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b530e9de
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b530e9de

Branch: refs/heads/data-publisher-integration
Commit: b530e9de31e16075f8c3505d35e95c2ec4d50429
Parents: f704fa3
Author: Thanuja <th...@wso2.com>
Authored: Wed Aug 12 18:16:54 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Wed Aug 12 18:16:54 2015 +0530

----------------------------------------------------------------------
 .../publisher/HealthStatisticsPublisher.java    |   4 +-
 .../publisher/InFlightRequestPublisher.java     |   4 +-
 .../publisher/StatisticsPublisherType.java      |   2 +-
 .../publisher/ThriftClientConfig.java           |  81 +++++++++++
 .../publisher/ThriftClientConfigParser.java     | 139 +++++++++++++++++++
 .../statistics/publisher/ThriftClientInfo.java  |  63 +++++++++
 .../publisher/ThriftStatisticsPublisher.java    | 115 +++++++++++++++
 .../publisher/wso2/cep/ThriftClientConfig.java  |  81 -----------
 .../wso2/cep/ThriftClientConfigParser.java      | 139 -------------------
 .../publisher/wso2/cep/ThriftClientInfo.java    |  63 ---------
 .../cep/WSO2CEPHealthStatisticsPublisher.java   |  26 ++--
 .../cep/WSO2CEPInFlightRequestPublisher.java    |  28 ++--
 .../wso2/cep/WSO2CEPStatisticsPublisher.java    | 114 ---------------
 .../test/ThriftClientConfigParserTest.java      |  13 +-
 .../cartridge.agent/healthstats.py              |   4 +-
 .../streamdefinitions/stream-manager-config.xml |   4 +-
 16 files changed, 447 insertions(+), 433 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
index 95b04ff..6af1317 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
@@ -27,7 +27,7 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
     /**
      * Publish health statistics to complex event processor.
      *
-     * @param timeStamp          time
+     * @param timestamp          Time
      * @param clusterId          Cluster id of the member
      * @param clusterInstanceId  Cluster instance id of the member
      * @param networkPartitionId Network partition id of the member
@@ -36,6 +36,6 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
      * @param health             Health type: memory_consumption | load_average
      * @param value              Health type value
      */
-    void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+    void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
                  String memberId, String partitionId, String health, double value);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
index af9c8e9..e4e65c0 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
@@ -27,12 +27,12 @@ public interface InFlightRequestPublisher extends StatisticsPublisher {
     /**
      * Publish in-flight request count.
      *
-     * @param timeStamp            time
+     * @param timestamp            Time
      * @param clusterId            Cluster id
      * @param clusterInstanceId    Cluster instance id
      * @param networkPartitionId   Network partition id of the cluster
      * @param inFlightRequestCount In-flight request count of the cluster
      */
-    void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+    void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
                  int inFlightRequestCount);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
index 77c5b78..d4b9a87 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
@@ -23,5 +23,5 @@ package org.apache.stratos.common.statistics.publisher;
  * Statistics publisher type enumneration.
  */
 public enum StatisticsPublisherType {
-    WSO2CEP
+    WSO2CEP, WSO2DAS
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
new file mode 100644
index 0000000..25ee897
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Thrift Client configuration.
+ */
+public class ThriftClientConfig {
+
+    public static final String THRIFT_CLIENT_CONFIG_FILE_PATH = "thrift.client.config.file.path";
+
+    private static volatile ThriftClientConfig instance;
+    private ThriftClientInfo thriftClientInfo;
+
+    /*
+    * A private Constructor prevents any other
+    * class from instantiating.
+    */
+    ThriftClientConfig() {
+    }
+
+    public static ThriftClientConfig getInstance() {
+        if (instance == null) {
+            synchronized (ThriftClientConfig.class) {
+                if (instance == null) {
+                    String configFilePath = System.getProperty(THRIFT_CLIENT_CONFIG_FILE_PATH);
+                    if (StringUtils.isBlank(configFilePath)) {
+                        throw new RuntimeException(String.format("Thrift client configuration file path system " +
+                                "property is not set: %s", THRIFT_CLIENT_CONFIG_FILE_PATH));
+                    }
+                    instance = ThriftClientConfigParser.parse(configFilePath);
+                }
+            }
+        }
+        return instance;
+    }
+
+    /**
+     * Returns an ThriftClientInfo Object that stores the credential information.
+     * Thrift client credential information can be found under thrift-client-config.xml file
+     * These credential information then get parsed and assigned into ThriftClientInfo
+     * Object.
+     * <p/>
+     * This method is used to return the assigned values in ThriftClientInfo Object
+     *
+     * @return ThriftClientInfo object which consists of username,password,ip and port values
+     */
+    public ThriftClientInfo getThriftClientInfo() {
+        return thriftClientInfo;
+    }
+
+    /**
+     * Parsed values will be assigned to ThriftClientInfo object. Required fields will be taken
+     * from thrift-client-config.xml file.
+     *
+     * @param thriftClientInfo Object of the ThriftClientInfo
+     */
+    public void setThriftClientInfo(ThriftClientInfo thriftClientInfo) {
+        this.thriftClientInfo = thriftClientInfo;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
new file mode 100644
index 0000000..aa05d6f
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.util.AxiomXpathParserUtil;
+import org.wso2.securevault.SecretResolver;
+import org.wso2.securevault.SecretResolverFactory;
+
+import java.io.File;
+import java.util.Iterator;
+
+/**
+ * Thrift client config parser.
+ */
+public class ThriftClientConfigParser {
+
+    private static final Log log = LogFactory.getLog(ThriftClientConfigParser.class);
+
+    /**
+     * Fields to be read from the thrift-client-config.xml file
+     */
+    private static final String USERNAME_ELEMENT = "username";
+    private static final String PASSWORD_ELEMENT = "password";
+    private static final String IP_ELEMENT = "ip";
+    private static final String PORT_ELEMENT = "port";
+
+    /**
+     * This method reads thrift-client-config.xml file and assign necessary credential
+     * values into thriftClientInfo object.  A singleton design has been implemented
+     * with the use of thriftClientIConfig class.
+     * <p/>
+     * The filePath argument is the path to thrift-client-config.xml file
+     *
+     * @param filePath the path to thrift-client-config.xml file
+     * @return ThriftClientConfig object
+     */
+    public static ThriftClientConfig parse(String filePath) {
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Parsing thrift client config file: %s", filePath));
+            }
+
+            ThriftClientConfig thriftClientIConfig = new ThriftClientConfig();
+            ThriftClientInfo thriftClientInfo = new ThriftClientInfo();
+            thriftClientIConfig.setThriftClientInfo(thriftClientInfo);
+
+            File configFile = new File(filePath);
+            if (!configFile.exists()) {
+                throw new RuntimeException(String.format("Thrift client config file does not exist: %s", filePath));
+            }
+            OMElement document = AxiomXpathParserUtil.parse(configFile);
+            Iterator thriftClientIterator = document.getChildElements();
+
+            //Initialize the SecretResolver providing the configuration element.
+            SecretResolver secretResolver = SecretResolverFactory.create(document, false);
+
+            String userNameValuesStr = null;
+            String passwordValueStr = null;
+            String ipValuesStr = null;
+            String portValueStr = null;
+
+            //same entry used in cipher-text.properties and cipher-tool.properties.
+            String secretAlias = "thrift.client.configuration.password";
+
+            // Iterate the thrift-client-config.xml file and read child element
+            // consists of credential information necessary for ThriftStatisticsPublisher
+            while (thriftClientIterator.hasNext()) {
+                OMElement thriftClientElement = (OMElement) thriftClientIterator.next();
+
+                if (USERNAME_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+                    userNameValuesStr = thriftClientElement.getText();
+                    thriftClientInfo.setUsername(userNameValuesStr);
+                }
+                //password field protected using Secure vault
+                if (PASSWORD_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+                    if ((secretResolver != null) && (secretResolver.isInitialized())) {
+                        if (secretResolver.isTokenProtected(secretAlias)) {
+                            passwordValueStr = secretResolver.resolve(secretAlias);
+                        } else {
+                            passwordValueStr = thriftClientElement.getText();
+                        }
+                    } else {
+                        passwordValueStr = thriftClientElement.getText();
+                    }
+                    thriftClientInfo.setPassword(passwordValueStr);
+                }
+
+                if (IP_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+                    ipValuesStr = thriftClientElement.getText();
+                    thriftClientInfo.setIp(ipValuesStr);
+                }
+
+                if (PORT_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+                    portValueStr = thriftClientElement.getText();
+                    thriftClientInfo.setPort(portValueStr);
+                }
+            }
+
+            if (userNameValuesStr == null) {
+                throw new RuntimeException("Username value not found in thrift client configuration");
+            }
+            if (passwordValueStr == null) {
+                throw new RuntimeException("Password not found in thrift client configuration ");
+            }
+
+            if (ipValuesStr == null) {
+                throw new RuntimeException("Ip values not found in thrift client configuration ");
+            }
+
+            if (portValueStr == null) {
+                throw new RuntimeException("Port not found in thrift client configuration ");
+            }
+
+            return thriftClientIConfig;
+        } catch (Exception e) {
+            throw new RuntimeException("Could not parse thrift client configuration", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
new file mode 100644
index 0000000..514d907
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+/**
+ * Thrift Client Info
+ */
+public class ThriftClientInfo {
+    private String username;
+    private String password;
+    private String ip;
+    private String port;
+
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getIp() {
+        return ip;
+    }
+
+    public void setIp(String ip) {
+        this.ip = ip;
+    }
+
+    public String getPort() {
+        return port;
+    }
+
+    public void setPort(String port) {
+        this.port = port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
new file mode 100644
index 0000000..6a9e955
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.databridge.agent.thrift.Agent;
+import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
+import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.HashMap;
+
+/**
+ * Thrift statistics publisher.
+ */
+public class ThriftStatisticsPublisher implements StatisticsPublisher {
+
+    private static final Log log = LogFactory.getLog(ThriftStatisticsPublisher.class);
+
+    private StreamDefinition streamDefinition;
+    private AsyncDataPublisher asyncDataPublisher;
+    private String ip;
+    private String port;
+    private String username;
+    private String password;
+    private boolean enabled = false;
+
+    /**
+     * Credential information stored inside thrift-client-config.xml file
+     * is parsed and assigned into ip,port,username and password fields
+     *
+     * @param streamDefinition Thrift Event Stream Definition
+     */
+    public ThriftStatisticsPublisher(StreamDefinition streamDefinition, String statsPublisherEnabled) {
+        ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance();
+        ThriftClientInfo thriftClientInfo = thriftClientConfig.getThriftClientInfo();
+
+        this.streamDefinition = streamDefinition;
+        this.ip = thriftClientInfo.getIp();
+        this.port = thriftClientInfo.getPort();
+        this.username = thriftClientInfo.getUsername();
+        this.password = thriftClientInfo.getPassword();
+
+        enabled = Boolean.getBoolean(statsPublisherEnabled);
+        if (enabled) {
+            init();
+        }
+    }
+
+    private void init() {
+        AgentConfiguration agentConfiguration = new AgentConfiguration();
+        Agent agent = new Agent(agentConfiguration);
+
+        // Initialize asynchronous data publisher
+        asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port + "", username, password, agent);
+        asyncDataPublisher.addStreamDefinition(streamDefinition);
+    }
+
+    @Override
+    public void setEnabled(boolean enabled) {
+        this.enabled = enabled;
+        if (this.enabled) {
+            init();
+        }
+    }
+
+    @Override
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    @Override
+    public void publish(Object[] payload) {
+        if (!isEnabled()) {
+            throw new RuntimeException("Statistics publisher is not enabled");
+        }
+
+        Event event = new Event();
+        event.setPayloadData(payload);
+        event.setArbitraryDataMap(new HashMap<String, String>());
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Publishing thrift event: [stream] %s [version] %s",
+                        streamDefinition.getName(), streamDefinition.getVersion()));
+            }
+            asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
+        } catch (AgentException e) {
+            if (log.isErrorEnabled()) {
+                log.error(String.format("Could not publish thrift event: [stream] %s [version] %s",
+                        streamDefinition.getName(), streamDefinition.getVersion()), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java
deleted file mode 100644
index 178c5a6..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-
-import org.apache.commons.lang.StringUtils;
-
-/**
- * Thrift Client configuration.
- */
-public class ThriftClientConfig {
-
-    public static final String THRIFT_CLIENT_CONFIG_FILE_PATH = "thrift.client.config.file.path";
-
-    private static volatile ThriftClientConfig instance;
-    private ThriftClientInfo thriftClientInfo;
-
-    /*
-    * A private Constructor prevents any other
-    * class from instantiating.
-    */
-    ThriftClientConfig() {
-    }
-
-    public static ThriftClientConfig getInstance() {
-        if (instance == null) {
-            synchronized (ThriftClientConfig.class) {
-                if (instance == null) {
-                    String configFilePath = System.getProperty(THRIFT_CLIENT_CONFIG_FILE_PATH);
-                    if (StringUtils.isBlank(configFilePath)) {
-                        throw new RuntimeException(String.format("Thrift client configuration file path system " +
-                                "property is not set: %s", THRIFT_CLIENT_CONFIG_FILE_PATH));
-                    }
-                    instance = ThriftClientConfigParser.parse(configFilePath);
-                }
-            }
-        }
-        return instance;
-    }
-
-    /**
-     * Returns an ThriftClientInfo Object that stores the credential information.
-     * CEP credential information can be found under thrift-client-config.xml file
-     * These credential information then get parsed and assigned into ThriftClientInfo
-     * Object.
-     * <p/>
-     * This method is used to return the assigned values in ThriftClientInfo Object
-     *
-     * @return ThriftClientInfo object which consists of username,password,ip and port values
-     */
-    public ThriftClientInfo getThriftClientInfo() {
-        return thriftClientInfo;
-    }
-
-    /**
-     * Parsed values will be assigned to ThriftClientInfo object. Required fields will be taken
-     * from thrift-client-config.xml file.
-     *
-     * @param thriftClientInfo Object of the ThriftClientInfo
-     */
-    public void setThriftClientInfo(ThriftClientInfo thriftClientInfo) {
-        this.thriftClientInfo = thriftClientInfo;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java
deleted file mode 100644
index ae3c692..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.util.AxiomXpathParserUtil;
-import org.wso2.securevault.SecretResolver;
-import org.wso2.securevault.SecretResolverFactory;
-
-import java.io.File;
-import java.util.Iterator;
-
-/**
- * Thrift client config parser.
- */
-public class ThriftClientConfigParser {
-
-    private static final Log log = LogFactory.getLog(ThriftClientConfigParser.class);
-
-    /**
-     * Fields to be read from the thrift-client-config.xml file
-     */
-    private static final String USERNAME_ELEMENT = "username";
-    private static final String PASSWORD_ELEMENT = "password";
-    private static final String IP_ELEMENT = "ip";
-    private static final String PORT_ELEMENT = "port";
-
-    /**
-     * This method reads thrift-client-config.xml file and assign necessary credential
-     * values into thriftClientInfo object.  A singleton design has been implemented
-     * with the use of thriftClientIConfig class.
-     * <p/>
-     * The filePath argument is the path to thrift-client-config.xml file
-     *
-     * @param filePath the path to thrift-client-config.xml file
-     * @return ThriftClientConfig object
-     */
-    public static ThriftClientConfig parse(String filePath) {
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Parsing thrift client config file: %s", filePath));
-            }
-
-            ThriftClientConfig thriftClientIConfig = new ThriftClientConfig();
-            ThriftClientInfo thriftClientInfo = new ThriftClientInfo();
-            thriftClientIConfig.setThriftClientInfo(thriftClientInfo);
-
-            File configFile = new File(filePath);
-            if (!configFile.exists()) {
-                throw new RuntimeException(String.format("Thrift client config file does not exist: %s", filePath));
-            }
-            OMElement document = AxiomXpathParserUtil.parse(configFile);
-            Iterator thriftClientIterator = document.getChildElements();
-
-            //Initialize the SecretResolver providing the configuration element.
-            SecretResolver secretResolver = SecretResolverFactory.create(document, false);
-
-            String userNameValuesStr = null;
-            String passwordValueStr = null;
-            String ipValuesStr = null;
-            String portValueStr = null;
-
-            //same entry used in cipher-text.properties and cipher-tool.properties.
-            String secretAlias = "thrift.client.configuration.password";
-
-            // Iterate the thrift-client-config.xml file and read child element
-            // consists of credential information necessary for WSO2CEPStatisticsPublisher
-            while (thriftClientIterator.hasNext()) {
-                OMElement thriftClientElement = (OMElement) thriftClientIterator.next();
-
-                if (USERNAME_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
-                    userNameValuesStr = thriftClientElement.getText();
-                    thriftClientInfo.setUsername(userNameValuesStr);
-                }
-                //password field protected using Secure vault
-                if (PASSWORD_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
-                    if ((secretResolver != null) && (secretResolver.isInitialized())) {
-                        if (secretResolver.isTokenProtected(secretAlias)) {
-                            passwordValueStr = secretResolver.resolve(secretAlias);
-                        } else {
-                            passwordValueStr = thriftClientElement.getText();
-                        }
-                    } else {
-                        passwordValueStr = thriftClientElement.getText();
-                    }
-                    thriftClientInfo.setPassword(passwordValueStr);
-                }
-
-                if (IP_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
-                    ipValuesStr = thriftClientElement.getText();
-                    thriftClientInfo.setIp(ipValuesStr);
-                }
-
-                if (PORT_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
-                    portValueStr = thriftClientElement.getText();
-                    thriftClientInfo.setPort(portValueStr);
-                }
-            }
-
-            if (userNameValuesStr == null) {
-                throw new RuntimeException("Username value not found in thrift client configuration");
-            }
-            if (passwordValueStr == null) {
-                throw new RuntimeException("Password not found in thrift client configuration ");
-            }
-
-            if (ipValuesStr == null) {
-                throw new RuntimeException("Ip values not found in thrift client configuration ");
-            }
-
-            if (portValueStr == null) {
-                throw new RuntimeException("Port not found in thrift client configuration ");
-            }
-
-            return thriftClientIConfig;
-        } catch (Exception e) {
-            throw new RuntimeException("Could not parse thrift client configuration", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java
deleted file mode 100644
index 1a9ba81..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-/**
- * Thrift Client Info
- */
-public class ThriftClientInfo {
-    private String username;
-    private String password;
-    private String ip;
-    private String port;
-
-
-    public String getUsername() {
-        return username;
-    }
-
-    public void setUsername(String username) {
-        this.username = username;
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    public String getIp() {
-        return ip;
-    }
-
-    public void setIp(String ip) {
-        this.ip = ip;
-    }
-
-    public String getPort() {
-        return port;
-    }
-
-    public void setPort(String port) {
-        this.port = port;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index d5c9265..33cf0b5 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -22,6 +22,7 @@ package org.apache.stratos.common.statistics.publisher.wso2.cep;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.statistics.publisher.HealthStatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -32,15 +33,16 @@ import java.util.List;
 /**
  * Health statistics publisher for publishing statistics to WSO2 CEP.
  */
-public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher implements HealthStatisticsPublisher {
+public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher implements HealthStatisticsPublisher {
 
     private static final Log log = LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class);
 
+    private static final String STATS_PUBLISHER_ENABLED = "cep.stats.publisher.enabled";
     private static final String DATA_STREAM_NAME = "cartridge_agent_health_stats";
     private static final String VERSION = "1.0.0";
 
     public WSO2CEPHealthStatisticsPublisher() {
-        super(createStreamDefinition());
+        super(createStreamDefinition(), STATS_PUBLISHER_ENABLED);
     }
 
     private static StreamDefinition createStreamDefinition() {
@@ -71,17 +73,17 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
     /**
      * Publish health statistics to cep.
      *
-     * @param timeStamp
-     * @param clusterId
-     * @param clusterInstanceId
-     * @param networkPartitionId
-     * @param memberId
-     * @param partitionId
-     * @param health
-     * @param value
+     * @param timestamp          Time
+     * @param clusterId          Cluster id of the member
+     * @param clusterInstanceId  Cluster instance id of the member
+     * @param networkPartitionId Network partition id of the member
+     * @param memberId           Member id
+     * @param partitionId        Partition id of the member
+     * @param health             Health type: memory_consumption | load_average
+     * @param value              Health type value
      */
     @Override
-    public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+    public void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
                         String memberId, String partitionId, String health, double value) {
         if (log.isDebugEnabled()) {
             log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s " +
@@ -90,7 +92,7 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
         }
         // Set payload values
         List<Object> payload = new ArrayList<Object>();
-        payload.add(timeStamp);
+        payload.add(timestamp);
         payload.add(clusterId);
         payload.add(clusterInstanceId);
         payload.add(networkPartitionId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index f51eb91..f853f23 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -19,7 +19,10 @@
 
 package org.apache.stratos.common.statistics.publisher.wso2.cep;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -33,13 +36,15 @@ import java.util.List;
  * In-flight request count:
  * Number of requests being served at a given moment could be identified as in-flight request count.
  */
-public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher implements InFlightRequestPublisher {
+public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher implements InFlightRequestPublisher {
+    private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class);
 
+    private static final String STATS_PUBLISHER_ENABLED = "cep.stats.publisher.enabled";
     private static final String DATA_STREAM_NAME = "in_flight_requests";
     private static final String VERSION = "1.0.0";
 
     public WSO2CEPInFlightRequestPublisher() {
-        super(createStreamDefinition());
+        super(createStreamDefinition(), STATS_PUBLISHER_ENABLED);
     }
 
     private static StreamDefinition createStreamDefinition() {
@@ -66,18 +71,23 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher
     /**
      * Publish in-flight request count of a cluster.
      *
-     * @param timeStamp
-     * @param clusterId
-     * @param clusterInstanceId
-     * @param networkPartitionId
-     * @param inFlightRequestCount
+     * @param timestamp            Time
+     * @param clusterId            Cluster id
+     * @param clusterInstanceId    Cluster instance id
+     * @param networkPartitionId   Network partition id of the cluster
+     * @param inFlightRequestCount In-flight request count of the cluster
      */
     @Override
-    public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+    public void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
                         int inFlightRequestCount) {
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Publishing health statistics: [timestamp] %d [cluster] %s " +
+                            "[cluster-instance] %s [network-partition] %s [in-flight-request-count] %d",
+                    timestamp, clusterId, clusterInstanceId, networkPartitionId, inFlightRequestCount));
+        }
         // Set payload values
         List<Object> payload = new ArrayList<Object>();
-        payload.add(timeStamp);
+        payload.add(timestamp);
         payload.add(clusterId);
         payload.add(clusterInstanceId);
         payload.add(networkPartitionId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java
deleted file mode 100644
index 653288d..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
-import org.wso2.carbon.databridge.agent.thrift.Agent;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
-import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
-import org.wso2.carbon.databridge.commons.Event;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-
-import java.util.HashMap;
-
-/**
- * WSO2 CEP statistics publisher.
- */
-public class WSO2CEPStatisticsPublisher implements StatisticsPublisher {
-
-    private static final Log log = LogFactory.getLog(WSO2CEPStatisticsPublisher.class);
-
-    private StreamDefinition streamDefinition;
-    private AsyncDataPublisher asyncDataPublisher;
-    private String ip;
-    private String port;
-    private String username;
-    private String password;
-    private boolean enabled = false;
-
-    /**
-     * Credential information stored inside thrift-client-config.xml file
-     * is parsed and assigned into ip,port,username and password fields
-     *
-     * @param streamDefinition
-     */
-    public WSO2CEPStatisticsPublisher(StreamDefinition streamDefinition) {
-        ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance();
-        thriftClientConfig.getThriftClientInfo();
-
-        this.streamDefinition = streamDefinition;
-        this.ip = thriftClientConfig.getThriftClientInfo().getIp();
-        this.port = thriftClientConfig.getThriftClientInfo().getPort();
-        this.username = thriftClientConfig.getThriftClientInfo().getUsername();
-        this.password = thriftClientConfig.getThriftClientInfo().getPassword();
-
-        enabled = Boolean.getBoolean("cep.stats.publisher.enabled");
-        if (enabled) {
-            init();
-        }
-    }
-
-    private void init() {
-        AgentConfiguration agentConfiguration = new AgentConfiguration();
-        Agent agent = new Agent(agentConfiguration);
-
-        // Initialize asynchronous data publisher
-        asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port + "", username, password, agent);
-        asyncDataPublisher.addStreamDefinition(streamDefinition);
-    }
-
-    @Override
-    public void setEnabled(boolean enabled) {
-        this.enabled = enabled;
-        if (this.enabled) {
-            init();
-        }
-    }
-
-    @Override
-    public boolean isEnabled() {
-        return enabled;
-    }
-
-    @Override
-    public void publish(Object[] payload) {
-        if (!isEnabled()) {
-            throw new RuntimeException("Statistics publisher is not enabled");
-        }
-
-        Event event = new Event();
-        event.setPayloadData(payload);
-        event.setArbitraryDataMap(new HashMap<String, String>());
-
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
-            }
-            asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
-        } catch (AgentException e) {
-            if (log.isErrorEnabled()) {
-                log.error(String.format("Could not publish cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
index 09d5b5e..352041d 100644
--- a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
+++ b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
@@ -20,7 +20,8 @@
 package org.apache.stratos.common.test;
 
 import junit.framework.TestCase;
-import org.apache.stratos.common.statistics.publisher.wso2.cep.ThriftClientConfig;
+import org.apache.stratos.common.statistics.publisher.ThriftClientConfig;
+import org.apache.stratos.common.statistics.publisher.ThriftClientInfo;
 import org.junit.Test;
 
 import java.net.URL;
@@ -41,11 +42,11 @@ public class ThriftClientConfigParserTest extends TestCase {
         URL configFileUrl = ThriftClientConfigParserTest.class.getResource("/thrift-client-config.xml");
         System.setProperty(ThriftClientConfig.THRIFT_CLIENT_CONFIG_FILE_PATH, configFileUrl.getPath());
         ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance();
-        thriftClientConfig.getThriftClientInfo();
+        ThriftClientInfo thriftClientInfo = thriftClientConfig.getThriftClientInfo();
 
-        assertEquals("Incorrect Password", "admin", thriftClientConfig.getThriftClientInfo().getUsername());
-        assertEquals("Incorrect Password", "1234", thriftClientConfig.getThriftClientInfo().getPassword());
-        assertEquals("Incorrect IP", "192.168.10.10", thriftClientConfig.getThriftClientInfo().getIp());
-        assertEquals("Incorrect Port", "9300", thriftClientConfig.getThriftClientInfo().getPort());
+        assertEquals("Incorrect Username", "admin", thriftClientInfo.getUsername());
+        assertEquals("Incorrect Password", "1234", thriftClientInfo.getPassword());
+        assertEquals("Incorrect IP", "192.168.10.10", thriftClientInfo.getIp());
+        assertEquals("Incorrect Port", "9300", thriftClientInfo.getPort());
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
index 0cd76af..0dbd1e1 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
@@ -139,7 +139,7 @@ class HealthStatisticsPublisher:
         stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION
 
         # stream_def.add_payloaddata_attribute()
-        stream_def.add_payloaddata_attribute("time_stamp", StreamDefinition.LONG)
+        stream_def.add_payloaddata_attribute("timestamp", StreamDefinition.LONG)
         stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING)
         stream_def.add_payloaddata_attribute("cluster_instance_id", StreamDefinition.STRING)
         stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING)
@@ -226,7 +226,7 @@ class DefaultHealthStatisticsReader:
         (one, five, fifteen) = os.getloadavg()
         cores = multiprocessing.cpu_count()
 
-        return (one/cores) * 100
+        return (one / cores) * 100
 
 
 class CartridgeHealthStatistics:

http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
index a256770..9e0a833 100644
--- a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
+++ b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
@@ -30,7 +30,7 @@
         <correlationData>
         </correlationData>
         <payloadData>
-            <property name="time_stamp" type="long"/>
+            <property name="timestamp" type="long"/>
             <property name="cluster_id" type="String"/>
             <property name="cluster_instance_id" type="String"/>
             <property name="network_partition_id" type="String"/>
@@ -93,7 +93,7 @@
         <correlationData>
         </correlationData>
         <payloadData>
-            <property name="time_stamp" type="long"/>
+            <property name="timestamp" type="long"/>
             <property name="cluster_id" type="String"/>
             <property name="cluster_instance_id" type="String"/>
             <property name="network_partition_id" type="String"/>