You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2013/07/11 07:18:17 UTC

[26/27] aplying 0001-Refactor-usage-module-to-apache-stratos.patch

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/RegistryUsagePersistingListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/RegistryUsagePersistingListener.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/RegistryUsagePersistingListener.java
new file mode 100644
index 0000000..8ea0ac1
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/RegistryUsagePersistingListener.java
@@ -0,0 +1,311 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners;
+
+import org.apache.stratos.usage.agent.persist.BandwidthPersistor;
+import org.apache.stratos.usage.agent.util.MonitoredReader;
+import org.apache.stratos.usage.agent.util.MonitoredWriter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.CarbonConstants;
+import org.wso2.carbon.registry.core.*;
+import org.wso2.carbon.registry.core.config.RegistryContext;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+import org.wso2.carbon.registry.core.jdbc.handlers.Handler;
+import org.wso2.carbon.registry.core.jdbc.handlers.RequestContext;
+import org.wso2.carbon.registry.core.session.CurrentSession;
+import org.wso2.carbon.registry.core.utils.RegistryUtils;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.Writer;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+
+/**
+ * Handler that intercept the registry calls
+ * Currently this handler is not registered because there is a similar handler RegistryUsageHandler
+ * After examining properly this class will be deleted.
+ */
+public class RegistryUsagePersistingListener extends Handler {
+
+    private static final Log log = LogFactory.getLog(RegistryUsagePersistingListener.class);
+
+    public void put(RequestContext context) throws RegistryException {
+        if (CurrentSession.getCallerTenantId() == MultitenantConstants.SUPER_TENANT_ID ||
+                CurrentSession.getTenantId() == MultitenantConstants.SUPER_TENANT_ID) {
+            // no limitations for the super tenant
+            return;
+        }
+        if (CarbonConstants.REGISTRY_SYSTEM_USERNAME.equals(CurrentSession.getUser()) ||
+                CarbonConstants.REGISTRY_ANONNYMOUS_USERNAME.equals(CurrentSession.getUser())) {
+            // skipping tracking for anonymous and system user
+            return;
+        }
+
+        // called only once per request
+        if (CurrentSession.getAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR)
+                != null) {
+            return;
+        }
+        CurrentSession.setAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR, true);
+
+        // pre triggering
+        int tenantId = CurrentSession.getTenantId();
+
+        ResourcePath path = context.getResourcePath();
+        Resource resource = context.getResource();
+        ((ResourceImpl) resource).setPath(path.getCompletePath());
+        if (resource instanceof CollectionImpl) {
+            return;
+        }
+        Object contentObj = resource.getContent();
+        if (contentObj == null) {
+            return;
+        }
+        int size;
+        if (contentObj instanceof String) {
+            size = ((String) contentObj).length();
+        } else if (contentObj instanceof byte[]) {
+            size = ((byte[]) contentObj).length;
+        } else {
+            String msg = "Unsupported type for the content.";
+            log.error(msg);
+            throw new RegistryException(msg);
+        }
+
+
+        // persisting bandwidth
+        BandwidthPersistor.storeIncomingBandwidth(tenantId, size);
+
+        //we will pass through, so that normal registry operation will put the resource
+    }
+
+    public void importResource(RequestContext context) throws RegistryException {
+        if (CurrentSession.getCallerTenantId() == MultitenantConstants.SUPER_TENANT_ID ||
+                CurrentSession.getTenantId() == MultitenantConstants.SUPER_TENANT_ID) {
+            // no limitations for the super tenant
+            return;
+        }
+        if (CarbonConstants.REGISTRY_SYSTEM_USERNAME.equals(CurrentSession.getUser()) ||
+                CarbonConstants.REGISTRY_ANONNYMOUS_USERNAME.equals(CurrentSession.getUser())) {
+            // skipping tracking for anonymous and system user
+            return;
+        }
+        // called only once per request..
+        if (CurrentSession.getAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR)
+                != null) {
+            return;
+        }
+        CurrentSession.setAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR, true);
+
+        // pre triggering
+        int tenantId = CurrentSession.getTenantId();
+
+//        ResourcePath resourcePath = context.getResourcePath();
+        String sourceURL = context.getSourceURL();
+     
+
+        // the import resource logic
+        URL url;
+        try {
+            if (sourceURL != null && sourceURL.toLowerCase().startsWith("file:")) {
+                String msg = "The source URL must not be file in the server's local file system";
+                throw new RegistryException(msg);
+            }
+            url = new URL(sourceURL);
+        } catch (MalformedURLException e) {
+            String msg = "Given source URL " + sourceURL + "is not valid.";
+            throw new RegistryException(msg, e);
+        }
+
+        try {
+            URLConnection uc = url.openConnection();
+            InputStream in = uc.getInputStream();
+            byte[] inByteArr = RegistryUtils.getByteArray(in);
+            int size = inByteArr.length;
+
+            // persisting bandwidth
+            BandwidthPersistor.storeIncomingBandwidth(tenantId, size);
+
+        } catch (IOException e) {
+
+            String msg = "Could not read from the given URL: " + sourceURL;
+            throw new RegistryException(msg, e);
+        }
+    }
+
+    public Resource get(RequestContext context) throws RegistryException {
+        if (CurrentSession.getCallerTenantId() == MultitenantConstants.SUPER_TENANT_ID ||
+                CurrentSession.getTenantId() == MultitenantConstants.SUPER_TENANT_ID) {
+            // no limitations for the super tenant
+            return null;
+        }
+        if (CarbonConstants.REGISTRY_SYSTEM_USERNAME.equals(CurrentSession.getUser()) ||
+                CarbonConstants.REGISTRY_ANONNYMOUS_USERNAME.equals(CurrentSession.getUser())) {
+            // skipping tracking for anonymous and system user
+            return null;
+        }
+        // called only once per request..
+        if (CurrentSession.getAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR)
+                != null) {
+            return null;
+        }
+        CurrentSession.setAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR, true);
+
+
+        // pre triggering
+        int tenantId = CurrentSession.getTenantId();
+
+
+        // get the resource
+        Resource resource = context.getResource();
+        if (resource == null) {
+            ResourcePath resourcePath = context.getResourcePath();
+            Registry registry = context.getRegistry();
+            if (registry.resourceExists(resourcePath.getPath())) {
+                resource = registry.get(resourcePath.getPath());
+                context.setResource(resource);
+                context.setProcessingComplete(true); // nothing else to do.
+            }
+        }
+        if (resource == null) {
+            return null;
+        }
+        if (resource instanceof CollectionImpl) {
+            return resource;
+        }
+        Object contentObj = resource.getContent();
+        if (contentObj == null) {
+            return resource;
+        }
+        int size;
+        if (contentObj instanceof String) {
+            size = ((String) contentObj).length();
+        } else if (contentObj instanceof byte[]) {
+            size = ((byte[]) contentObj).length;
+        } else {
+            String msg = "Unsupported type for the content.";
+            log.error(msg);
+            throw new RegistryException(msg);
+        }
+        // persisting bandwidth
+        BandwidthPersistor.storeOutgoingBandwidth(tenantId, size);
+        return resource;
+    }
+
+    public void dump(RequestContext requestContext) throws RegistryException {
+        if (CurrentSession.getCallerTenantId() == MultitenantConstants.SUPER_TENANT_ID ||
+                CurrentSession.getTenantId() == MultitenantConstants.SUPER_TENANT_ID) {
+            // no limitations for the super tenant
+            return;
+        }
+        if (CarbonConstants.REGISTRY_SYSTEM_USERNAME.equals(CurrentSession.getUser()) ||
+                CarbonConstants.REGISTRY_ANONNYMOUS_USERNAME.equals(CurrentSession.getUser())) {
+            // skipping tracking for anonymous and system user
+            return;
+        }
+        // called only once per request..
+        if (CurrentSession.getAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR)
+                != null) {
+            return;
+        }
+        CurrentSession.setAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR, true);
+
+        long size = requestContext.getBytesWritten();
+
+        // pre triggering
+        int tenantId = CurrentSession.getTenantId();
+
+        if (size == 0) {
+            //Still not dumped
+            Registry registry = requestContext.getRegistry();
+            String path = requestContext.getResourcePath().getPath();
+            Writer writer = requestContext.getDumpingWriter();
+            // we wrap the writer with the monitored writer
+            MonitoredWriter monitoredWriter = new MonitoredWriter(writer);
+            registry.dump(path, monitoredWriter);
+            size = monitoredWriter.getTotalWritten();
+            requestContext.setProcessingComplete(true);
+        }
+
+        // persisting bandwidth
+        BandwidthPersistor.storeOutgoingBandwidth(tenantId, size);
+
+    }
+
+    public void restore(RequestContext requestContext) throws RegistryException {
+        if (CurrentSession.getCallerTenantId() == MultitenantConstants.SUPER_TENANT_ID ||
+                CurrentSession.getTenantId() == MultitenantConstants.SUPER_TENANT_ID) {
+            // no limitations for the super tenant
+            return;
+        }
+        if (CarbonConstants.REGISTRY_SYSTEM_USERNAME.equals(CurrentSession.getUser()) ||
+                CarbonConstants.REGISTRY_ANONNYMOUS_USERNAME.equals(CurrentSession.getUser())) {
+            // skipping tracking for anonymous and system user
+            return;
+        }
+        // called only once per request..
+        if (CurrentSession.getAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR)
+                != null) {
+            return;
+        }
+        CurrentSession.setAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR, true);
+
+        // pre triggering
+        int tenantId = CurrentSession.getTenantId();
+        long size = requestContext.getBytesRead();
+
+        if (size == 0) {
+            //not restored yet
+            Registry registry = requestContext.getRegistry();
+            String path = requestContext.getResourcePath().getPath();
+            Reader reader = requestContext.getDumpingReader();
+            // we wrap the reader with the monitored reader
+            MonitoredReader monitoredReader = new MonitoredReader(reader);
+            registry.restore(path, monitoredReader);
+            size = monitoredReader.getTotalRead();
+            requestContext.setProcessingComplete(true);
+        }
+        // persisting bandwidth
+        BandwidthPersistor.storeIncomingBandwidth(tenantId, size);
+
+    }
+
+    public static void registerRegistryUsagePersistingListener(RegistryContext registryContext)
+            throws RegistryException {
+
+        //This was commented out because there is a similar class RegistryUsageListener
+        //After examiming properly this class will be deleted
+        /*HandlerManager handlerManager = registryContext.getHandlerManager();
+        RegistryUsagePersistingListener handler = new RegistryUsagePersistingListener();
+        URLMatcher anyUrlMatcher = new URLMatcher();
+        anyUrlMatcher.setPattern(".*");
+        String[] applyingFilters = new String[]{
+                Filter.PUT, Filter.IMPORT, Filter.GET, Filter.DUMP, Filter.RESTORE,};
+
+        handlerManager.addHandlerWithPriority(applyingFilters, anyUrlMatcher, handler,
+                HandlerLifecycleManager.DEFAULT_REPORTING_HANDLER_PHASE);
+        */
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsInHandler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsInHandler.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsInHandler.java
new file mode 100644
index 0000000..d8ecb66
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsInHandler.java
@@ -0,0 +1,73 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners;
+
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.engine.Handler;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.stratos.usage.agent.util.PublisherUtils;
+import org.apache.stratos.usage.agent.util.Util;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.context.CarbonContext;
+import org.wso2.carbon.core.util.SystemFilter;
+import org.wso2.carbon.statistics.services.util.SystemStatistics;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+public class StatisticsInHandler extends AbstractHandler{
+    private static Log log = LogFactory.getLog(StatisticsOutHandler.class);
+
+    public InvocationResponse invoke(MessageContext messageContext) throws AxisFault {
+        AxisService axisService =  messageContext.getAxisService();
+        if(axisService== null || SystemFilter.isFilteredOutService(axisService.getAxisServiceGroup()) ||
+                axisService.isClientSide()){
+            return InvocationResponse.CONTINUE;
+        }
+
+        if(Util.getSystemStatisticsUtil()==null){
+            return InvocationResponse.CONTINUE;
+        }
+
+        SystemStatistics systemStatistics = Util.getSystemStatisticsUtil().getSystemStatistics(messageContext);
+
+        int tenantId = MultitenantConstants.INVALID_TENANT_ID;
+        tenantId = CarbonContext.getCurrentContext().getTenantId();
+
+        if(tenantId == MultitenantConstants.INVALID_TENANT_ID ||
+                tenantId == MultitenantConstants.SUPER_TENANT_ID) {
+            return Handler.InvocationResponse.CONTINUE;
+        }
+
+        try {
+            PublisherUtils.publish(systemStatistics, tenantId);
+        } catch (Exception e) {
+            //Logging the complete stacktrace in debug mode
+            if(log.isDebugEnabled()){
+                log.debug(e);
+            }
+
+            log.error("Error occurred while publishing request statistics. Full stacktrace available in debug logs. " + e.getMessage());
+        }
+
+        return InvocationResponse.CONTINUE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsOutHandler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsOutHandler.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsOutHandler.java
new file mode 100644
index 0000000..9f61518
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsOutHandler.java
@@ -0,0 +1,83 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.engine.Handler;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.stratos.usage.agent.util.Util;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.context.CarbonContext;
+import org.wso2.carbon.context.PrivilegedCarbonContext;
+import org.wso2.carbon.core.util.SystemFilter;
+import org.wso2.carbon.statistics.services.util.SystemStatistics;
+import org.apache.stratos.usage.agent.util.PublisherUtils;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+
+public class StatisticsOutHandler extends AbstractHandler{
+
+    private static Log log = LogFactory.getLog(StatisticsOutHandler.class);
+
+    public InvocationResponse invoke(MessageContext messageContext) throws AxisFault {
+
+
+        AxisService axisService =  messageContext.getAxisService();
+        if(axisService== null || SystemFilter.isFilteredOutService(axisService.getAxisServiceGroup()) ||
+                axisService.isClientSide()){
+
+            PrivilegedCarbonContext.destroyCurrentContext();
+            return InvocationResponse.CONTINUE;
+        }
+
+        if(Util.getSystemStatisticsUtil()==null){
+
+            PrivilegedCarbonContext.destroyCurrentContext();
+            return InvocationResponse.CONTINUE;
+        }
+        SystemStatistics systemStatistics = Util.getSystemStatisticsUtil().getSystemStatistics(messageContext);
+        
+        int tenantId = MultitenantConstants.INVALID_TENANT_ID;
+        tenantId = CarbonContext.getCurrentContext().getTenantId();
+
+        if(tenantId == MultitenantConstants.INVALID_TENANT_ID ||
+            tenantId == MultitenantConstants.SUPER_TENANT_ID) {
+
+            PrivilegedCarbonContext.destroyCurrentContext();
+            return Handler.InvocationResponse.CONTINUE;
+        }
+
+        try {
+            PublisherUtils.publish(systemStatistics, tenantId);
+        } catch (Exception e) {
+            //Logging the complete stacktrace in debug mode
+            if(log.isDebugEnabled()){
+                log.debug(e);
+            }
+
+            log.error("Error occurred while publishing request statistics. Full stacktrace available in debug logs. " + e.getMessage());
+        }
+
+        PrivilegedCarbonContext.destroyCurrentContext();
+        return InvocationResponse.CONTINUE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/UsageStatsAxis2ConfigurationContextObserver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/UsageStatsAxis2ConfigurationContextObserver.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/UsageStatsAxis2ConfigurationContextObserver.java
new file mode 100644
index 0000000..152442b
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/UsageStatsAxis2ConfigurationContextObserver.java
@@ -0,0 +1,48 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.context.PrivilegedCarbonContext;
+import org.wso2.carbon.utils.AbstractAxis2ConfigurationContextObserver;
+
+
+public class UsageStatsAxis2ConfigurationContextObserver extends AbstractAxis2ConfigurationContextObserver {
+
+    private static final Log log = LogFactory.getLog(UsageStatsAxis2ConfigurationContextObserver.class);
+
+    @Override
+    public void createdConfigurationContext(ConfigurationContext configContext) {
+
+        AxisConfiguration axisConfiguration = configContext.getAxisConfiguration();
+        int tenantId = PrivilegedCarbonContext.getCurrentContext().getTenantId(false);
+        try {
+            axisConfiguration.engageModule("metering");
+        } catch (AxisFault axisFault) {
+            log.error("Could not engage metering module for tenant: " + tenantId, axisFault);
+        }
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringHandler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringHandler.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringHandler.java
new file mode 100644
index 0000000..09a5f78
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringHandler.java
@@ -0,0 +1,109 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners.axis2;
+
+import org.apache.stratos.usage.agent.util.Util;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.wso2.carbon.core.transports.metering.MeteredServletRequest;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * this class is used to obtain tenant id from MeteredServletRequest or MessageContext
+ */
+
+public class RequestMeteringHandler extends AbstractHandler {
+    private static final Log log = LogFactory.getLog(RequestMeteringHandler.class);
+
+    /**
+     * this method  invoke  MeteredServletRequest and return a InvocationResponse
+     * @param messageContext  MessageContext
+     * @return InvocationResponse
+     * @throws AxisFault
+     */
+
+    public InvocationResponse invoke(MessageContext messageContext) throws AxisFault {
+        if (log.isDebugEnabled()) {
+            log.debug("Staring metering handler invocation. Incoming message: " +
+                    messageContext.getEnvelope().toString());
+        }
+        AxisService service = messageContext.getAxisService();
+        Parameter param = service.getParameter("adminService");
+
+        Object obj = messageContext.getProperty("transport.http.servletRequest");
+        if (obj == null) {
+            // TODO: check for cause of the error.
+            log.debug("Servlet request is null. Skip monitoring.");
+            return InvocationResponse.CONTINUE;
+        }
+        if (!(obj instanceof MeteredServletRequest)) {
+            log.debug("HttpServletRequest is not of type MeteredServletRequest. Skip monitoring.");
+            return InvocationResponse.CONTINUE;
+        }
+
+        MeteredServletRequest servletRequest = (MeteredServletRequest) obj;
+
+        if (param != null && "true".equals(param.getValue())) {
+            servletRequest.setAttribute(StratosConstants.ADMIN_SERVICE_SERVLET_ATTR, "true");
+            return InvocationResponse.CONTINUE;
+        }
+        servletRequest.setAttribute(StratosConstants.SERVICE_NAME_SERVLET_ATTR, service.getName());
+
+        int tenantId = getTenantId(servletRequest);
+        servletRequest.setAttribute(StratosConstants.TENANT_ID_SERVLET_ATTR, tenantId);
+
+        return InvocationResponse.CONTINUE;
+    }
+
+    /**
+     * method to get tenant id from MeteredServletRequest
+     * @param servletRequest MeteredServletRequest
+     * @return tenant id
+     */
+
+    private int getTenantId(MeteredServletRequest servletRequest) {
+        String address = servletRequest.getRequestURI();
+        String servicesPrefix = "/services/t/";
+        if (address != null && address.contains(servicesPrefix)) {
+            int domainNameStartIndex =
+                    address.indexOf(servicesPrefix) + servicesPrefix.length();
+            int domainNameEndIndex = address.indexOf('/', domainNameStartIndex);
+            String domainName = address.substring(domainNameStartIndex,
+                    domainNameEndIndex == -1 ? address.length() : domainNameEndIndex);
+
+            // return tenant id if domain name is not null
+            if (domainName != null) {
+                try {
+                    return Util.getRealmService().getTenantManager().getTenantId(domainName);
+                } catch (org.wso2.carbon.user.api.UserStoreException e) {
+                    log.error("An error occurred while obtaining the tenant id.", e);
+                }
+            }
+        }
+
+        // return 0 if the domain name is null
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringModule.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringModule.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringModule.java
new file mode 100644
index 0000000..6d1bb2f
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringModule.java
@@ -0,0 +1,47 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners.axis2;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisDescription;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.modules.Module;
+import org.apache.neethi.Assertion;
+import org.apache.neethi.Policy;
+
+public class RequestMeteringModule implements Module {
+
+    public void init(ConfigurationContext configurationContext, AxisModule axisModule)
+            throws AxisFault {
+    }
+
+    public void engageNotify(AxisDescription axisDescription) throws AxisFault {
+    }
+
+    public boolean canSupportAssertion(Assertion assertion) {
+        return true;
+    }
+
+    public void applyPolicy(Policy policy, AxisDescription axisDescription) throws AxisFault {
+    }
+
+    public void shutdown(ConfigurationContext configurationContext) throws AxisFault {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthPersistor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthPersistor.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthPersistor.java
new file mode 100644
index 0000000..d99fd3c
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthPersistor.java
@@ -0,0 +1,58 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.stratos.common.constants.UsageConstants;
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.util.Util;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+/**
+ * this class is used to store incoming and outgoing bandwidth
+ */
+
+public class BandwidthPersistor {
+
+    /**
+     * method to store incoming bandwidth
+     * @param tenantId tenant id
+     * @param size value of the incoming bandwidth
+     */
+
+    public static void storeIncomingBandwidth(int tenantId, long size) {
+        if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+            BandwidthUsage usage = new BandwidthUsage(
+                    tenantId, UsageConstants.REGISTRY_INCOMING_BW, size);
+            Util.addToPersistingControllerQueue(usage);
+        }
+    }
+
+    /**
+     * method to store outgoingBandwidth
+     * @param tenantId tenant id
+     * @param size value of the outgoing bandwidth
+     */
+    public static void storeOutgoingBandwidth(int tenantId, long size) {
+        if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+            BandwidthUsage usage = new BandwidthUsage(
+                    tenantId, UsageConstants.REGISTRY_OUTGOING_BW, size);
+            Util.addToPersistingControllerQueue(usage);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthUsageDataRetrievalTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthUsageDataRetrievalTask.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthUsageDataRetrievalTask.java
new file mode 100644
index 0000000..8174c07
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthUsageDataRetrievalTask.java
@@ -0,0 +1,114 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.config.UsageAgentConfiguration;
+import org.apache.stratos.usage.agent.util.UsageAgentConstants;
+import org.apache.stratos.usage.agent.util.Util;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.base.ServerConfiguration;
+import org.apache.stratos.common.constants.UsageConstants;
+import org.wso2.carbon.tomcat.ext.transport.statistics.TransportStatisticsContainer;
+import org.wso2.carbon.tomcat.ext.transport.statistics.TransportStatisticsEntry;
+import org.wso2.carbon.user.api.UserStoreException;
+
+import java.util.Queue;
+
+public class BandwidthUsageDataRetrievalTask implements Runnable {
+    private static final Log log = LogFactory.getLog(BandwidthUsageDataRetrievalTask.class);
+
+    private Queue<TransportStatisticsEntry> transportStats;
+    private UsageAgentConfiguration configuration;
+
+    //This will be decided based on whether a BAM Server URL is provided or not
+    private boolean isBamAvailable=false;
+
+    public BandwidthUsageDataRetrievalTask(UsageAgentConfiguration configuration) {
+        transportStats = TransportStatisticsContainer.getTransportStatistics();
+        this.configuration = configuration;
+
+        //Checking for the BAM Server URL
+        String bamServerUrl = ServerConfiguration.getInstance().getFirstProperty("BamServerURL");
+        if(bamServerUrl != null){
+            this.isBamAvailable = true;
+        }
+    }
+
+    public void run() {
+        /*if (log.isDebugEnabled()) {
+            log.debug("Retrieving Service and Web App bandwidth usage statistics.");
+        }*/
+
+        if (!transportStats.isEmpty()) {
+            for (int i = 0; i < configuration.getUsageTasksNumberOfRecordsPerExecution() && !transportStats.isEmpty(); i++) {
+                TransportStatisticsEntry entry = transportStats.remove();
+                try {
+                    if(!isBamAvailable){
+                        return;
+                    }
+
+                    int tenantId = getTenantID(entry.getTenantName());
+                    //if the tenant does not exist, no need and no way of updating the usage data
+                    //therefore ignore it
+                    if(tenantId<0){
+                        return;
+                    }
+                    if (inferMeasurement(entry).equals(UsageConstants.SERVICE_BANDWIDTH)) {
+                        if (entry.getRequestSize() > 0) {
+                            Util.addToPersistingControllerQueue(new BandwidthUsage(getTenantID(entry.getTenantName()), UsageConstants.SERVICE_INCOMING_BW, entry.getRequestSize()));
+                        }
+                        if (entry.getResponseSize() > 0) {
+                            Util.addToPersistingControllerQueue(new BandwidthUsage(getTenantID(entry.getTenantName()), UsageConstants.SERVICE_OUTGOING_BW, entry.getResponseSize()));
+                        }
+                    } else if (inferMeasurement(entry).equals(UsageConstants.WEBAPP_BANDWIDTH)) {
+                        if (entry.getRequestSize() > 0) {
+                            Util.addToPersistingControllerQueue(new BandwidthUsage(getTenantID(entry.getTenantName()), UsageConstants.WEBAPP_INCOMING_BW, entry.getRequestSize()));
+                        }
+                        if (entry.getResponseSize() > 0) {
+                            Util.addToPersistingControllerQueue(new BandwidthUsage(getTenantID(entry.getTenantName()), UsageConstants.WEBAPP_OUTGOING_BW, entry.getResponseSize()));
+                        }
+                    }
+                } catch (UserStoreException e) {
+                    log.error("Error persisting bandwidth usage statistics.", e);
+                }
+
+            }
+        }
+    }
+
+
+    private String inferMeasurement(TransportStatisticsEntry entry) {
+        if (entry.getContext() != null) {
+            if (entry.getContext().equals(UsageAgentConstants.BANDWIDTH_USAGE_SERVICES_CONTEXT)) {
+                return UsageConstants.SERVICE_BANDWIDTH;
+            } else if (entry.getContext().equals(UsageAgentConstants.BANDWIDTH_USAGE_WEBAPPS_CONTEXT)) {
+                return UsageConstants.WEBAPP_BANDWIDTH;
+            }
+        }
+
+        return UsageAgentConstants.BANDWIDTH_CARBON;
+    }
+
+    private int getTenantID(String tenantDomain) throws UserStoreException {
+        return Util.getRealmService().getTenantManager().getTenantId(tenantDomain);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/RegistryUsagePersister.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/RegistryUsagePersister.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/RegistryUsagePersister.java
new file mode 100644
index 0000000..f56d9b9
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/RegistryUsagePersister.java
@@ -0,0 +1,76 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.stratos.common.constants.UsageConstants;
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.util.Util;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+import java.lang.System;
+
+/**
+ * this class is used to store incoming and outgoing bandwidth
+ */
+
+public class RegistryUsagePersister {
+
+    /**
+     * method to store incoming bandwidth
+     * @param tenantId tenant id
+     * @param size value of the incoming bandwidth
+     */
+
+    public static void storeIncomingBandwidth(int tenantId, long size) {
+        if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+            BandwidthUsage usage = new BandwidthUsage(
+                    tenantId, UsageConstants.REGISTRY_INCOMING_BW, size);
+            Util.addToPersistingControllerQueue(usage);
+        }
+    }
+    //=============================================================
+
+    public static void storeAddContent(int tenantId, long size) {
+        if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+            BandwidthUsage usage = new BandwidthUsage(
+                    tenantId, "ContentBandwidth-In", size);
+            Util.addToPersistingControllerQueue(usage);
+        }
+    }
+    public static void storeDeleteContent(int tenantId, long size) {
+        if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+            BandwidthUsage usage = new BandwidthUsage(
+                    tenantId, "ContentBandwidth-Out", size);
+            Util.addToPersistingControllerQueue(usage);
+        }
+    }
+   //=============================================================
+    /**
+     * method to store outgoingBandwidth
+     * @param tenantId tenant id
+     * @param size value of the outgoing bandwidth
+     */
+    public static void storeOutgoingBandwidth(int tenantId, long size) {
+        if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+            BandwidthUsage usage = new BandwidthUsage(
+                    tenantId, UsageConstants.REGISTRY_OUTGOING_BW, size);
+            Util.addToPersistingControllerQueue(usage);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/ServiceDataPersistor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/ServiceDataPersistor.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/ServiceDataPersistor.java
new file mode 100644
index 0000000..c7e7fb7
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/ServiceDataPersistor.java
@@ -0,0 +1,72 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.util.Util;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.core.transports.metering.MeteredServletRequest;
+import org.wso2.carbon.core.transports.metering.MeteredServletResponse;
+import org.wso2.carbon.core.transports.metering.RequestDataPersister;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.common.constants.UsageConstants;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+/**
+ * this class is used to persist service data
+ */
+public class ServiceDataPersistor implements RequestDataPersister {
+    private static final Log log = LogFactory.getLog(ServiceDataPersistor.class);
+
+    /**
+     * this method get tenant id, inDataSize and outDataSize from the wrappedRequest, construct a
+     * BandwidthUsage object and add it to PersistingControllerQueue
+     * @param wrappedRequest  MeteredServletRequest
+     * @param wrappedResponse MeteredServletResponse
+     */
+    public void persist(MeteredServletRequest wrappedRequest, MeteredServletResponse wrappedResponse) {
+        if ("true".equals(wrappedRequest.getAttribute(StratosConstants.SERVICE_NAME_SERVLET_ATTR))) {
+            return;
+        }
+
+        Integer tenantId = (Integer) wrappedRequest.getAttribute(
+                StratosConstants.TENANT_ID_SERVLET_ATTR);
+        if (tenantId == null || tenantId == MultitenantConstants.SUPER_TENANT_ID) {
+            return;
+        }
+        long inDataSize = wrappedRequest.getReadSize();
+        long outDataSize = wrappedResponse.getWrittenSize();
+
+        if(log.isTraceEnabled()){
+            log.trace("Persisting service bandwidth usage for tenant " + tenantId + " in size: " + inDataSize + " out size: " + outDataSize);
+        }
+        // add the job to queue
+        if (inDataSize > 0) {
+            BandwidthUsage usage = new BandwidthUsage(tenantId,
+                    UsageConstants.SERVICE_INCOMING_BW, inDataSize);
+            Util.addToPersistingControllerQueue(usage);
+        }
+        if (outDataSize > 0) {
+            BandwidthUsage usage = new BandwidthUsage(tenantId,
+                    UsageConstants.SERVICE_OUTGOING_BW, outDataSize);
+            Util.addToPersistingControllerQueue(usage);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceManager.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceManager.java
new file mode 100644
index 0000000..5c27e20
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceManager.java
@@ -0,0 +1,91 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.config.UsageAgentConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Queue;
+import java.util.concurrent.*;
+
+public class UsageDataPersistenceManager {
+    private static final Log log = LogFactory.getLog(UsageDataPersistenceManager.class);
+
+    // queue to store Bandwidth usage statistics.
+    // usage of  LinkedBlockingQueue ensures operations on the queue to wait for the queue to be non
+    // empty when retrieving and wait for space when storing element.
+    private Queue<BandwidthUsage> persistenceJobs = new LinkedBlockingQueue<BandwidthUsage>();
+
+    private final ScheduledExecutorService scheduler;
+
+    private UsageAgentConfiguration configuration;
+
+    public UsageDataPersistenceManager(UsageAgentConfiguration configuration) {
+        scheduler = Executors.newScheduledThreadPool(2, new UsageDataPersistenceThreadFactory());
+        this.configuration = configuration;
+    }
+
+    /**
+     * this method add bandwidth usage entries to the jobQueue
+     *
+     * @param usage Bandwidth usage
+     */
+
+    public void addToQueue(BandwidthUsage usage) {
+        persistenceJobs.add(usage);
+    }
+
+    public void scheduleUsageDataPersistenceTask() {
+        //we will schedule the usage data persistence task only if interval is not -1
+        if(configuration.getUsageTasksExecutionIntervalInMilliSeconds()>0){
+            scheduler.scheduleWithFixedDelay(new UsageDataPersistenceTask(persistenceJobs, configuration),
+                    configuration.getUsageTasksStartupDelayInMilliSeconds(),
+                    configuration.getUsageTasksExecutionIntervalInMilliSeconds(),
+                    TimeUnit.MILLISECONDS);
+            log.debug("Usage data persistence task was scheduled");
+        }else{
+            log.debug("Usage data persistence task is disabled");
+        }
+    }
+
+
+    public void scheduleBandwidthUsageDataRetrievalTask() {
+        //we will schedule the usage data retrieval task only if interval is not -1
+        if(configuration.getUsageTasksExecutionIntervalInMilliSeconds()>0){
+            scheduler.scheduleWithFixedDelay(new BandwidthUsageDataRetrievalTask(configuration),
+                    configuration.getUsageTasksStartupDelayInMilliSeconds(),
+                    configuration.getUsageTasksExecutionIntervalInMilliSeconds(),
+                    TimeUnit.MILLISECONDS);
+            log.debug("Bandwidth Usage data retrieval task was scheduled");
+        }else {
+            log.debug("Bandwidth Usage data retrieval task was disabled");
+        }
+    }
+
+    class UsageDataPersistenceThreadFactory implements ThreadFactory {
+        private int counter = 0;
+
+        public Thread newThread(Runnable r) {
+            return new Thread(r, "UsageDataPersistenceThread-" + counter++);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceTask.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceTask.java
new file mode 100644
index 0000000..59b5a7e
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceTask.java
@@ -0,0 +1,166 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.base.MultitenantConstants;
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.config.UsageAgentConfiguration;
+import org.apache.stratos.usage.agent.exception.UsageException;
+import org.apache.stratos.usage.agent.util.PublisherUtils;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Queue;
+
+public class UsageDataPersistenceTask implements Runnable {
+
+    private static final Log log = LogFactory.getLog(UsageDataPersistenceTask.class);
+
+    private Queue<BandwidthUsage> usagePersistenceJobs;
+    private UsageAgentConfiguration configuration;
+
+    public UsageDataPersistenceTask(Queue<BandwidthUsage> jobs, UsageAgentConfiguration configuration) {
+        usagePersistenceJobs = jobs;
+        this.configuration = configuration;
+    }
+
+    public void run() {
+        if (!usagePersistenceJobs.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Persisting Service and Web App bandwidth usage statistics");
+            }
+            try {
+                persistUsage(usagePersistenceJobs);
+            } catch (UsageException e) {
+                log.error("Error when persisting usage statistics.", e);
+            }
+        }
+    }
+
+    /**
+     * this method create a Summarizer object for each tenant and call accumulate() method to
+     * accumulate usage statistics
+     *
+     * @param jobQueue usage data persistence jobs
+     * @throws org.apache.stratos.usage.agent.exception.UsageException
+     *
+     */
+
+    public void persistUsage(Queue<BandwidthUsage> jobQueue) throws UsageException {
+
+        // create a map to hold summarizer objects against tenant id
+        HashMap<Integer, Summarizer> summarizerMap = new HashMap<Integer, Summarizer>();
+
+        // if the jobQueue is not empty
+        for (int i = 0; i < configuration.getUsageTasksNumberOfRecordsPerExecution() && !jobQueue.isEmpty(); i++) {
+
+            // get the first element from the queue, which is a BandwidthUsage object
+            BandwidthUsage usage = jobQueue.poll();
+
+            // get the tenant id
+            int tenantId = usage.getTenantId();
+
+            //get the Summarizer object corresponds to the tenant id
+            Summarizer summarizer = summarizerMap.get(tenantId);
+
+            // when tenant invoke service for the first time, no corresponding summarizer object in
+            // the map
+            if (summarizer == null) {
+                //create a Summarizer object and put to the summarizerMap
+                summarizer = new Summarizer();
+                summarizerMap.put(tenantId, summarizer);
+            }
+
+            //  now accumulate usage
+            summarizer.accumulate(usage);
+        }
+
+        //Finished accumulating. Now publish the events
+
+        // get the collection view of values in summarizerMap
+        Collection<Summarizer> summarizers = summarizerMap.values();
+
+        // for each summarizer object call the publish method
+        for (Summarizer summarizer : summarizers) {
+            summarizer.publish();
+        }
+    }
+
+    /**
+     * inner class Summarizer
+     * this class is used to accumulate and publish usage statistics.
+     * for each tenant this keeps a map to store BandwidthUsage values
+     */
+    private static class Summarizer {
+        private HashMap<String, BandwidthUsage> usageMap;
+
+        public Summarizer() {
+            usageMap = new HashMap<String, BandwidthUsage>();
+        }
+
+        /**
+         * the method to accumulate usage data
+         *
+         * @param usage BandwidthUsage
+         */
+
+        public void accumulate(BandwidthUsage usage) {
+            // get the measurement name of usage entry
+            String key = usage.getMeasurement();
+
+            // get the existing value of measurement
+            BandwidthUsage existingUsage = usageMap.get(key);
+
+            // if this measurement is metered earlier add the new value to the existing value
+            if (existingUsage != null) {
+                existingUsage.setValue(existingUsage.getValue() + usage.getValue());
+            } else {
+                // if this measurement is not metered previously we need to add it to the usageMap
+                usageMap.put(key, usage);
+            }
+        }
+
+        /**
+         * this method reads usage items from the usageMap and call publish method to publish to
+         * the BAM
+         *
+         * @throws UsageException
+         */
+
+        public void publish() throws UsageException {
+
+            // get the collection view of values in usageMap
+            Collection<BandwidthUsage> usages = usageMap.values();
+
+            for (BandwidthUsage usage : usages) {
+                try {
+                    // publish the usage entry if it is not the super-tenant
+                    if(MultitenantConstants.SUPER_TENANT_ID != usage.getTenantId()){
+                        PublisherUtils.publish(usage);
+                    }
+                } catch (UsageException e) {
+                    log.error("Error in publishing bandwidth usage data", e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/services/CustomMeteringService.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/services/CustomMeteringService.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/services/CustomMeteringService.java
new file mode 100644
index 0000000..dcb6c12
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/services/CustomMeteringService.java
@@ -0,0 +1,95 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.services;
+
+import org.wso2.carbon.core.AbstractAdmin;
+import org.apache.stratos.usage.agent.api.CustomMeteringAgent;
+import org.apache.stratos.usage.agent.exception.UsageException;
+
+/**
+ *      CustomMeteringService class defines methods to get recorded duration, to check whether
+ *      usage entries exists, persist usage, retrieve usage and add usage.
+ */
+public class CustomMeteringService extends AbstractAdmin {
+
+    /**
+     * method to get recorded durations
+     * @param measurement  the measurement name
+     * @return  duration array
+     * @throws Exception
+     */
+
+    public String[] getRecordedDurations(String measurement) throws Exception {
+        return new CustomMeteringAgent(getGovernanceRegistry()).getRecordedDurations(measurement);
+    }
+
+    /**
+     * method to check whether usage entry exists or not
+     * @param duration  duration
+     * @param measurement measurement name
+     * @return true if usage entry exist
+     * @throws Exception
+     */
+    public boolean isUsageEntryExists( String duration, String measurement)
+            throws Exception {
+        return new CustomMeteringAgent(getGovernanceRegistry()).isUsageEntryExists(duration,
+                measurement);
+    }
+
+    /**
+     * method to persist usage
+     * @param duration
+     * @param measurement measurement name
+     * @param value   value of measurement
+     * @throws Exception
+     */
+    public void persistUsage( String duration, String measurement, String value)
+            throws Exception {
+        new CustomMeteringAgent(getGovernanceRegistry()).persistUsage(duration, measurement, value);
+    }
+
+    /**
+     * method to retrieve usage
+     * @param duration
+     * @param measurement measurement name
+     * @return usage value
+     * @throws UsageException
+     */
+
+    public String retrieveUsage( String duration, String measurement)
+            throws UsageException {
+        return new CustomMeteringAgent(getGovernanceRegistry())
+                .retrieveUsage(duration, measurement);
+    }
+
+    /**
+     * method to add usage entries
+     * @param userName user name
+     * @param duration duration of the measurement
+     * @param measurement measurement name
+     * @param value usage value
+     * @return usage value
+     * @throws Exception
+     */
+    public long addUsage(String userName, String duration, String measurement, long value)
+            throws Exception {
+        return new CustomMeteringAgent(getGovernanceRegistry()).addUsage(duration, measurement,
+                value);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredReader.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredReader.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredReader.java
new file mode 100644
index 0000000..e61d9b9
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredReader.java
@@ -0,0 +1,50 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.util;
+
+import java.io.IOException;
+import java.io.Reader;
+
+
+/**
+ * this class is used to wrap the Reader object
+ */
+public class MonitoredReader extends Reader {
+    Reader reader;
+    long totalRead;
+
+    public MonitoredReader(Reader reader) {
+        this.reader = reader;
+        totalRead = 0;
+    }
+
+    public int read(char cbuf[], int off, int len) throws IOException {
+        int read = reader.read(cbuf, off, len);
+        totalRead += read;
+        return read;
+    }
+
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    public long getTotalRead() {
+        return totalRead;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredWriter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredWriter.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredWriter.java
new file mode 100644
index 0000000..307a2e4
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredWriter.java
@@ -0,0 +1,53 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.util;
+
+import java.io.IOException;
+import java.io.Writer;
+
+
+/**
+ * this class is used to wrap Writer object
+ */
+public class MonitoredWriter extends Writer {
+    Writer writer;
+    long totalWritten;
+
+    public MonitoredWriter(Writer writer) {
+        this.writer = writer;
+        totalWritten = 0;
+    }
+
+    public void write(char cbuf[], int off, int len) throws IOException {
+        totalWritten += (len - off);
+        writer.write(cbuf, off, len);
+    }
+
+    public void flush() throws IOException {
+        writer.flush();
+    }
+
+    public void close() throws IOException {
+        writer.close();
+    }
+
+    public long getTotalWritten() {
+        return totalWritten;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/PublisherUtils.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/PublisherUtils.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/PublisherUtils.java
new file mode 100644
index 0000000..607b30b
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/PublisherUtils.java
@@ -0,0 +1,442 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.util;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.base.ServerConfiguration;
+import org.wso2.carbon.databridge.agent.thrift.Agent;
+import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.databridge.commons.exception.NoStreamDefinitionExistException;
+import org.wso2.carbon.statistics.services.util.SystemStatistics;
+import org.apache.stratos.common.util.CommonUtil;
+import org.apache.stratos.usage.agent.exception.UsageException;
+import org.wso2.carbon.user.api.Tenant;
+import org.wso2.carbon.utils.CarbonUtils;
+import org.wso2.carbon.utils.ConfigurationContextService;
+import org.wso2.carbon.utils.NetworkUtils;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+import org.apache.stratos.usage.agent.beans.APIManagerRequestStats;
+
+import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.SocketException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * this class provide utility methods to publish usage statistics
+ */
+public class PublisherUtils {
+    private static Log log = LogFactory.getLog(PublisherUtils.class);
+    private static final String TRANSPORT = "https";
+    private static ConfigurationContextService configurationContextService;
+    private static Agent agent;
+    private static DataPublisher dataPublisher;
+    private static AsyncDataPublisher asyncDataPublisher;
+    private static String streamId;
+    private static final String usageEventStream = "org.wso2.carbon.usage.agent";
+    private static final String usageEventStreamVersion = "1.0.0";
+    
+    private static final String reqStatEventStream="org.wso2.carbon.service.request.stats";
+    private static final String reqStatEventStreamVersion="1.0.0";
+    private static String reqStatEventStreamId;
+    
+    private static Map<Integer, String> serverUrlMap = new HashMap<Integer, String>();
+
+
+    /**
+     * method to update server name
+     * @param tenantId tenant id
+     * @return server name
+     * @throws UsageException
+     */
+
+    public static String updateServerName(int tenantId) throws UsageException {
+
+        String serverName;
+        String hostName;
+
+        try {
+            hostName = NetworkUtils.getLocalHostname();
+        } catch (SocketException e) {
+            throw new UsageException("Error getting host name for the registry usage event payload",
+                    e);
+        }
+
+        ConfigurationContextService configurationContextService = PublisherUtils.
+                getConfigurationContextService();
+        ConfigurationContext configurationContext;
+        if (configurationContextService != null) {
+            configurationContext = configurationContextService.getServerConfigContext();
+        } else {
+            throw new UsageException("ConfigurationContext is null");
+        }
+//        int port = CarbonUtils.getTransportPort(configurationContext, "https");
+
+        String carbonHttpsPort = System.getProperty("carbon." + TRANSPORT + ".port");
+        if (carbonHttpsPort == null) {
+            carbonHttpsPort = Integer.toString(
+                    CarbonUtils.getTransportPort(configurationContext, TRANSPORT));
+        }
+        String baseServerUrl = TRANSPORT + "://" + hostName + ":" + carbonHttpsPort;
+        String context = configurationContext.getContextRoot();
+
+        String tenantDomain = null;
+        try {
+            Tenant tenant = Util.getRealmService().getTenantManager().getTenant(tenantId);
+            if(tenant!=null){
+                tenantDomain = tenant.getDomain();
+            }
+        } catch (org.wso2.carbon.user.api.UserStoreException e) {
+            throw new UsageException("Failed to get tenant domain", e);
+        }
+
+        if ((tenantDomain != null) &&
+                !(tenantDomain.equals(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME))) {
+            serverName = baseServerUrl + context + "t/" + tenantDomain;
+
+        } else if (context.equals("/")) {
+
+            serverName = baseServerUrl + "";
+        } else {
+            serverName = baseServerUrl + context;
+
+        }
+
+        return serverName;
+    }
+    
+    public static String getServerUrl(int tenantId){
+
+        String serverUrl = serverUrlMap.get(tenantId);
+        if(serverUrl!=null){
+            return serverUrl;
+        }
+
+        if(serverUrl==null){
+            try{
+                serverUrl = updateServerName(tenantId);
+            }catch (UsageException e) {
+                log.error("Could not create the server url for tenant id: " + tenantId, e);
+            }
+        }
+
+        if(serverUrl!=null && !"".equals(serverUrl)){
+            serverUrlMap.put(tenantId, serverUrl);
+        }
+        return serverUrl;
+    }
+
+    public static void defineUsageEventStream() throws Exception {
+
+        createDataPublisher();
+
+        if(dataPublisher == null){
+            return;
+        }
+
+        try {
+
+            streamId = dataPublisher.findStream(usageEventStream, usageEventStreamVersion);
+            log.info("Event stream with stream ID: " + streamId + " found.");
+
+        } catch (NoStreamDefinitionExistException e) {
+
+            log.info("Defining the event stream because it was not found in BAM");
+            try {
+                defineStream();    
+            } catch (Exception ex) {
+                String msg = "An error occurred while defining the even stream for Usage agent. " + e.getMessage();
+                log.warn(msg);
+            }
+
+        }
+
+    }
+    
+    private static void defineStream() throws Exception {
+        streamId = dataPublisher.
+                defineStream("{" +
+                        "  'name':'" + usageEventStream +"'," +
+                        "  'version':'" + usageEventStreamVersion +"'," +
+                        "  'nickName': 'usage.agent'," +
+                        "  'description': 'Tenant usage data'," +
+                        "  'metaData':[" +
+                        "          {'name':'clientType','type':'STRING'}" +
+                        "  ]," +
+                        "  'payloadData':[" +
+                        "          {'name':'ServerName','type':'STRING'}," +
+                        "          {'name':'TenantID','type':'STRING'}," +
+                        "          {'name':'Type','type':'STRING'}," +
+                        "          {'name':'Value','type':'LONG'}" +
+                        "  ]" +
+                        "}");
+        
+    }
+    
+    private static void defineRequestStatEventStream() throws Exception{
+        reqStatEventStreamId = dataPublisher.
+                defineStream("{" +
+                        "  'name':'" + reqStatEventStream +"'," +
+                        "  'version':'" + reqStatEventStreamVersion +"'," +
+                        "  'nickName': 'service.request.stats'," +
+                        "  'description': 'Tenants service request statistics'," +
+                        "  'metaData':[" +
+                        "          {'name':'clientType','type':'STRING'}" +
+                        "  ]," +
+                        "  'payloadData':[" +
+                        "          {'name':'ServerName','type':'STRING'}," +
+                        "          {'name':'TenantID','type':'STRING'}," +
+                        "          {'name':'RequestCount','type':'INT'}," +
+                        "          {'name':'ResponseCount','type':'INT'}," +
+                        "          {'name':'FaultCount','type':'INT'}," +
+                        "          {'name':'ResponseTime','type':'LONG'}" +
+                        "  ]" +
+                        "}");
+    }
+
+    public static void createDataPublisher(){
+
+        ServerConfiguration serverConfig =  CarbonUtils.getServerConfiguration();
+        String trustStorePath = serverConfig.getFirstProperty("Security.TrustStore.Location");
+        String trustStorePassword = serverConfig.getFirstProperty("Security.TrustStore.Password");
+        String bamServerUrl = serverConfig.getFirstProperty("BamServerURL");
+        String adminUsername = CommonUtil.getStratosConfig().getAdminUserName();
+        String adminPassword = CommonUtil.getStratosConfig().getAdminPassword();
+
+        System.setProperty("javax.net.ssl.trustStore", trustStorePath);
+        System.setProperty("javax.net.ssl.trustStorePassword", trustStorePassword);
+
+        try {
+            dataPublisher = new DataPublisher(bamServerUrl, adminUsername, adminPassword);
+        } catch (Exception e) {
+            log.warn("Unable to create a data publisher to " + bamServerUrl +
+                    ". Usage Agent will not function properly. " + e.getMessage());
+        }
+
+    }
+
+    /**
+     * Creates an async data publisher using the existing data publisher object
+     */
+    public static void createAsynDataPublisher(){
+        if(dataPublisher==null){
+            createDataPublisher();
+        }
+
+        if(dataPublisher==null){
+            log.warn("Cannot create the async data publisher because the data publisher is null");
+            return;
+        }
+
+        try {
+            asyncDataPublisher = new AsyncDataPublisher(dataPublisher);
+        } catch (Exception e) {
+            log.error("Could not create an async data publisher using the data publisher", e);
+        }
+    }
+
+
+    /**
+     * this method get the event payload, construct the SOAP envelop and call the publish method in
+     * EventBrokerService.
+     *
+     * @param usage BandwidthUsage
+     * @throws UsageException
+     */
+    public static void publish(BandwidthUsage usage) throws UsageException {
+
+        if(dataPublisher==null){
+            log.info("Creating data publisher for usage data publishing");
+            createDataPublisher();
+
+            //If we cannot create a data publisher we should give up
+            //this means data will not be published
+            if(dataPublisher == null){
+                return;
+            }
+        }
+
+        if(streamId == null){
+            try{
+                streamId = dataPublisher.findStream(usageEventStream, usageEventStreamVersion);
+            }catch (NoStreamDefinitionExistException e){
+                log.info("Defining the event stream because it was not found in BAM");
+                try{
+                    defineStream();
+                } catch(Exception ex){
+                    String msg = "Error occurred while defining the event stream for publishing usage data. " + ex.getMessage();
+                    log.error(msg);
+                    //We do not want to proceed without an event stream. Therefore we return.
+                    return;
+                }
+            }catch (Exception exc){
+                log.error("Error occurred while searching for stream id. " + exc.getMessage());
+                //We do not want to proceed without an event stream. Therefore we return.
+                return;
+            }
+        }
+
+        try {
+
+            Event usageEvent = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
+                                        new Object[]{getServerUrl(usage.getTenantId()),
+                                                    Integer.toString(usage.getTenantId()),
+                                                    usage.getMeasurement(),
+                                                    usage.getValue()});
+
+            dataPublisher.publish(usageEvent);
+
+        } catch (Exception e) {
+            log.error("Error occurred while publishing usage event to BAM. " + e.getMessage(), e);
+            throw new UsageException(e.getMessage(), e);
+        }
+
+    }
+    
+    public static void publish(SystemStatistics statistics, int tenantId) throws Exception {
+
+        if(dataPublisher==null){
+            log.info("Creating data publisher for service-stats publishing");
+            createDataPublisher();
+
+            //If we cannot create a data publisher we should give up
+            //this means data will not be published
+            if(dataPublisher == null){
+                return;
+            }
+        }
+
+        if(reqStatEventStreamId == null){
+            try{
+                reqStatEventStreamId = dataPublisher.findStream(reqStatEventStream, reqStatEventStreamVersion);
+            }catch (NoStreamDefinitionExistException e){
+                log.info("Defining the event stream because it was not found in BAM");
+                try{
+                    defineRequestStatEventStream();
+                } catch(Exception ex){
+                    String msg = "Error occurred while defining the event stream for publishing usage data. " + ex.getMessage();
+                    log.error(msg);
+                    //We do not want to proceed without an event stream. Therefore we return.
+                    return;
+                }
+            }catch (Exception exc){
+                log.error("Error occurred while searching for stream id. " + exc.getMessage());
+                //We do not want to proceed without an event stream. Therefore we return.
+                return;
+            }
+        }
+
+        try {
+
+            Event usageEvent = new Event(reqStatEventStreamId, System.currentTimeMillis(), new Object[]{"external"}, null,
+                    new Object[]{getServerUrl(tenantId),
+                            Integer.toString(tenantId),
+                            statistics.getCurrentInvocationRequestCount(),
+                            statistics.getCurrentInvocationResponseCount(),
+                            statistics.getCurrentInvocationFaultCount(),
+                            statistics.getCurrentInvocationResponseTime()});
+
+            dataPublisher.publish(usageEvent);
+
+        } catch (Exception e) {
+            log.error("Error occurred while publishing usage event to BAM. " + e.getMessage(), e);
+            throw new UsageException(e.getMessage(), e);
+        }
+
+    }
+
+    /**
+     * @param statistics APIManagerRequestStats which contains usage data
+     * @param tenantId   Tenant id of tenant associated with usage stat
+     * @throws Exception UsageException when error in usage stat publishing
+     */
+    public static void publish(APIManagerRequestStats statistics, int tenantId) throws Exception {
+
+        if (dataPublisher == null) {
+            log.info("Creating data publisher for usage data publishing");
+            createDataPublisher();
+
+            //If we cannot create a data publisher we should give up
+            //this means data will not be published
+            if (dataPublisher == null) {
+                return;
+            }
+        }
+
+        if (streamId == null) {
+            try {
+                streamId = dataPublisher.findStream(usageEventStream, usageEventStreamVersion);
+            } catch (NoStreamDefinitionExistException e) {
+                log.info("Defining the event stream because it was not found in BAM");
+                try {
+                    defineStream();
+                } catch (Exception ex) {
+                    String msg = "Error occurred while defining the event stream for publishing usage data. " + ex.getMessage();
+                    log.error(msg);
+                    //We do not want to proceed without an event stream. Therefore we return.
+                    return;
+                }
+            } catch (Exception exc) {
+                log.error("Error occurred while searching for stream id. " + exc.getMessage());
+                //We do not want to proceed without an event stream. Therefore we return.
+                return;
+            }
+        }
+
+        try {
+            //Get data from API manager request stat object and create event
+            Event usageEvent = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
+                    new Object[]{getServerUrl(statistics.getTenantId()),
+                            Integer.toString(statistics.getTenantId()),
+                            statistics.getMeasurement(),
+                            statistics.getValue()});
+            //publish usage to bam
+            dataPublisher.publish(usageEvent);
+
+        } catch (Exception e) {
+            log.error("Error occurred while publishing usage event to BAM. " + e.getMessage(), e);
+            throw new UsageException(e.getMessage(), e);
+        }
+
+    }
+
+    /**
+     * method to get configurationContextService
+     * @return configurationContextService
+     */
+
+    public static ConfigurationContextService getConfigurationContextService() {
+        return configurationContextService;
+    }
+
+    /**
+     * method to setConfigurationContextService
+     * @param configurationContextService
+     */
+    public static void setConfigurationContextService(ConfigurationContextService configurationContextService) {
+        PublisherUtils.configurationContextService = configurationContextService;
+    }
+
+}