You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2013/07/11 07:18:17 UTC
[26/27] aplying 0001-Refactor-usage-module-to-apache-stratos.patch
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/RegistryUsagePersistingListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/RegistryUsagePersistingListener.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/RegistryUsagePersistingListener.java
new file mode 100644
index 0000000..8ea0ac1
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/RegistryUsagePersistingListener.java
@@ -0,0 +1,311 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners;
+
+import org.apache.stratos.usage.agent.persist.BandwidthPersistor;
+import org.apache.stratos.usage.agent.util.MonitoredReader;
+import org.apache.stratos.usage.agent.util.MonitoredWriter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.CarbonConstants;
+import org.wso2.carbon.registry.core.*;
+import org.wso2.carbon.registry.core.config.RegistryContext;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+import org.wso2.carbon.registry.core.jdbc.handlers.Handler;
+import org.wso2.carbon.registry.core.jdbc.handlers.RequestContext;
+import org.wso2.carbon.registry.core.session.CurrentSession;
+import org.wso2.carbon.registry.core.utils.RegistryUtils;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.Writer;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+
+/**
+ * Handler that intercept the registry calls
+ * Currently this handler is not registered because there is a similar handler RegistryUsageHandler
+ * After examining properly this class will be deleted.
+ */
+public class RegistryUsagePersistingListener extends Handler {
+
+ private static final Log log = LogFactory.getLog(RegistryUsagePersistingListener.class);
+
+ public void put(RequestContext context) throws RegistryException {
+ if (CurrentSession.getCallerTenantId() == MultitenantConstants.SUPER_TENANT_ID ||
+ CurrentSession.getTenantId() == MultitenantConstants.SUPER_TENANT_ID) {
+ // no limitations for the super tenant
+ return;
+ }
+ if (CarbonConstants.REGISTRY_SYSTEM_USERNAME.equals(CurrentSession.getUser()) ||
+ CarbonConstants.REGISTRY_ANONNYMOUS_USERNAME.equals(CurrentSession.getUser())) {
+ // skipping tracking for anonymous and system user
+ return;
+ }
+
+ // called only once per request
+ if (CurrentSession.getAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR)
+ != null) {
+ return;
+ }
+ CurrentSession.setAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR, true);
+
+ // pre triggering
+ int tenantId = CurrentSession.getTenantId();
+
+ ResourcePath path = context.getResourcePath();
+ Resource resource = context.getResource();
+ ((ResourceImpl) resource).setPath(path.getCompletePath());
+ if (resource instanceof CollectionImpl) {
+ return;
+ }
+ Object contentObj = resource.getContent();
+ if (contentObj == null) {
+ return;
+ }
+ int size;
+ if (contentObj instanceof String) {
+ size = ((String) contentObj).length();
+ } else if (contentObj instanceof byte[]) {
+ size = ((byte[]) contentObj).length;
+ } else {
+ String msg = "Unsupported type for the content.";
+ log.error(msg);
+ throw new RegistryException(msg);
+ }
+
+
+ // persisting bandwidth
+ BandwidthPersistor.storeIncomingBandwidth(tenantId, size);
+
+ //we will pass through, so that normal registry operation will put the resource
+ }
+
+ public void importResource(RequestContext context) throws RegistryException {
+ if (CurrentSession.getCallerTenantId() == MultitenantConstants.SUPER_TENANT_ID ||
+ CurrentSession.getTenantId() == MultitenantConstants.SUPER_TENANT_ID) {
+ // no limitations for the super tenant
+ return;
+ }
+ if (CarbonConstants.REGISTRY_SYSTEM_USERNAME.equals(CurrentSession.getUser()) ||
+ CarbonConstants.REGISTRY_ANONNYMOUS_USERNAME.equals(CurrentSession.getUser())) {
+ // skipping tracking for anonymous and system user
+ return;
+ }
+ // called only once per request..
+ if (CurrentSession.getAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR)
+ != null) {
+ return;
+ }
+ CurrentSession.setAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR, true);
+
+ // pre triggering
+ int tenantId = CurrentSession.getTenantId();
+
+// ResourcePath resourcePath = context.getResourcePath();
+ String sourceURL = context.getSourceURL();
+
+
+ // the import resource logic
+ URL url;
+ try {
+ if (sourceURL != null && sourceURL.toLowerCase().startsWith("file:")) {
+ String msg = "The source URL must not be file in the server's local file system";
+ throw new RegistryException(msg);
+ }
+ url = new URL(sourceURL);
+ } catch (MalformedURLException e) {
+ String msg = "Given source URL " + sourceURL + "is not valid.";
+ throw new RegistryException(msg, e);
+ }
+
+ try {
+ URLConnection uc = url.openConnection();
+ InputStream in = uc.getInputStream();
+ byte[] inByteArr = RegistryUtils.getByteArray(in);
+ int size = inByteArr.length;
+
+ // persisting bandwidth
+ BandwidthPersistor.storeIncomingBandwidth(tenantId, size);
+
+ } catch (IOException e) {
+
+ String msg = "Could not read from the given URL: " + sourceURL;
+ throw new RegistryException(msg, e);
+ }
+ }
+
+ public Resource get(RequestContext context) throws RegistryException {
+ if (CurrentSession.getCallerTenantId() == MultitenantConstants.SUPER_TENANT_ID ||
+ CurrentSession.getTenantId() == MultitenantConstants.SUPER_TENANT_ID) {
+ // no limitations for the super tenant
+ return null;
+ }
+ if (CarbonConstants.REGISTRY_SYSTEM_USERNAME.equals(CurrentSession.getUser()) ||
+ CarbonConstants.REGISTRY_ANONNYMOUS_USERNAME.equals(CurrentSession.getUser())) {
+ // skipping tracking for anonymous and system user
+ return null;
+ }
+ // called only once per request..
+ if (CurrentSession.getAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR)
+ != null) {
+ return null;
+ }
+ CurrentSession.setAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR, true);
+
+
+ // pre triggering
+ int tenantId = CurrentSession.getTenantId();
+
+
+ // get the resource
+ Resource resource = context.getResource();
+ if (resource == null) {
+ ResourcePath resourcePath = context.getResourcePath();
+ Registry registry = context.getRegistry();
+ if (registry.resourceExists(resourcePath.getPath())) {
+ resource = registry.get(resourcePath.getPath());
+ context.setResource(resource);
+ context.setProcessingComplete(true); // nothing else to do.
+ }
+ }
+ if (resource == null) {
+ return null;
+ }
+ if (resource instanceof CollectionImpl) {
+ return resource;
+ }
+ Object contentObj = resource.getContent();
+ if (contentObj == null) {
+ return resource;
+ }
+ int size;
+ if (contentObj instanceof String) {
+ size = ((String) contentObj).length();
+ } else if (contentObj instanceof byte[]) {
+ size = ((byte[]) contentObj).length;
+ } else {
+ String msg = "Unsupported type for the content.";
+ log.error(msg);
+ throw new RegistryException(msg);
+ }
+ // persisting bandwidth
+ BandwidthPersistor.storeOutgoingBandwidth(tenantId, size);
+ return resource;
+ }
+
+ public void dump(RequestContext requestContext) throws RegistryException {
+ if (CurrentSession.getCallerTenantId() == MultitenantConstants.SUPER_TENANT_ID ||
+ CurrentSession.getTenantId() == MultitenantConstants.SUPER_TENANT_ID) {
+ // no limitations for the super tenant
+ return;
+ }
+ if (CarbonConstants.REGISTRY_SYSTEM_USERNAME.equals(CurrentSession.getUser()) ||
+ CarbonConstants.REGISTRY_ANONNYMOUS_USERNAME.equals(CurrentSession.getUser())) {
+ // skipping tracking for anonymous and system user
+ return;
+ }
+ // called only once per request..
+ if (CurrentSession.getAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR)
+ != null) {
+ return;
+ }
+ CurrentSession.setAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR, true);
+
+ long size = requestContext.getBytesWritten();
+
+ // pre triggering
+ int tenantId = CurrentSession.getTenantId();
+
+ if (size == 0) {
+ //Still not dumped
+ Registry registry = requestContext.getRegistry();
+ String path = requestContext.getResourcePath().getPath();
+ Writer writer = requestContext.getDumpingWriter();
+ // we wrap the writer with the monitored writer
+ MonitoredWriter monitoredWriter = new MonitoredWriter(writer);
+ registry.dump(path, monitoredWriter);
+ size = monitoredWriter.getTotalWritten();
+ requestContext.setProcessingComplete(true);
+ }
+
+ // persisting bandwidth
+ BandwidthPersistor.storeOutgoingBandwidth(tenantId, size);
+
+ }
+
+ public void restore(RequestContext requestContext) throws RegistryException {
+ if (CurrentSession.getCallerTenantId() == MultitenantConstants.SUPER_TENANT_ID ||
+ CurrentSession.getTenantId() == MultitenantConstants.SUPER_TENANT_ID) {
+ // no limitations for the super tenant
+ return;
+ }
+ if (CarbonConstants.REGISTRY_SYSTEM_USERNAME.equals(CurrentSession.getUser()) ||
+ CarbonConstants.REGISTRY_ANONNYMOUS_USERNAME.equals(CurrentSession.getUser())) {
+ // skipping tracking for anonymous and system user
+ return;
+ }
+ // called only once per request..
+ if (CurrentSession.getAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR)
+ != null) {
+ return;
+ }
+ CurrentSession.setAttribute(StratosConstants.REGISTRY_USAGE_PERSISTED_SESSION_ATTR, true);
+
+ // pre triggering
+ int tenantId = CurrentSession.getTenantId();
+ long size = requestContext.getBytesRead();
+
+ if (size == 0) {
+ //not restored yet
+ Registry registry = requestContext.getRegistry();
+ String path = requestContext.getResourcePath().getPath();
+ Reader reader = requestContext.getDumpingReader();
+ // we wrap the reader with the monitored reader
+ MonitoredReader monitoredReader = new MonitoredReader(reader);
+ registry.restore(path, monitoredReader);
+ size = monitoredReader.getTotalRead();
+ requestContext.setProcessingComplete(true);
+ }
+ // persisting bandwidth
+ BandwidthPersistor.storeIncomingBandwidth(tenantId, size);
+
+ }
+
+ public static void registerRegistryUsagePersistingListener(RegistryContext registryContext)
+ throws RegistryException {
+
+ //This was commented out because there is a similar class RegistryUsageListener
+ //After examiming properly this class will be deleted
+ /*HandlerManager handlerManager = registryContext.getHandlerManager();
+ RegistryUsagePersistingListener handler = new RegistryUsagePersistingListener();
+ URLMatcher anyUrlMatcher = new URLMatcher();
+ anyUrlMatcher.setPattern(".*");
+ String[] applyingFilters = new String[]{
+ Filter.PUT, Filter.IMPORT, Filter.GET, Filter.DUMP, Filter.RESTORE,};
+
+ handlerManager.addHandlerWithPriority(applyingFilters, anyUrlMatcher, handler,
+ HandlerLifecycleManager.DEFAULT_REPORTING_HANDLER_PHASE);
+ */
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsInHandler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsInHandler.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsInHandler.java
new file mode 100644
index 0000000..d8ecb66
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsInHandler.java
@@ -0,0 +1,73 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners;
+
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.engine.Handler;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.stratos.usage.agent.util.PublisherUtils;
+import org.apache.stratos.usage.agent.util.Util;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.context.CarbonContext;
+import org.wso2.carbon.core.util.SystemFilter;
+import org.wso2.carbon.statistics.services.util.SystemStatistics;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+public class StatisticsInHandler extends AbstractHandler{
+ private static Log log = LogFactory.getLog(StatisticsOutHandler.class);
+
+ public InvocationResponse invoke(MessageContext messageContext) throws AxisFault {
+ AxisService axisService = messageContext.getAxisService();
+ if(axisService== null || SystemFilter.isFilteredOutService(axisService.getAxisServiceGroup()) ||
+ axisService.isClientSide()){
+ return InvocationResponse.CONTINUE;
+ }
+
+ if(Util.getSystemStatisticsUtil()==null){
+ return InvocationResponse.CONTINUE;
+ }
+
+ SystemStatistics systemStatistics = Util.getSystemStatisticsUtil().getSystemStatistics(messageContext);
+
+ int tenantId = MultitenantConstants.INVALID_TENANT_ID;
+ tenantId = CarbonContext.getCurrentContext().getTenantId();
+
+ if(tenantId == MultitenantConstants.INVALID_TENANT_ID ||
+ tenantId == MultitenantConstants.SUPER_TENANT_ID) {
+ return Handler.InvocationResponse.CONTINUE;
+ }
+
+ try {
+ PublisherUtils.publish(systemStatistics, tenantId);
+ } catch (Exception e) {
+ //Logging the complete stacktrace in debug mode
+ if(log.isDebugEnabled()){
+ log.debug(e);
+ }
+
+ log.error("Error occurred while publishing request statistics. Full stacktrace available in debug logs. " + e.getMessage());
+ }
+
+ return InvocationResponse.CONTINUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsOutHandler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsOutHandler.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsOutHandler.java
new file mode 100644
index 0000000..9f61518
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/StatisticsOutHandler.java
@@ -0,0 +1,83 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.engine.Handler;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.stratos.usage.agent.util.Util;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.context.CarbonContext;
+import org.wso2.carbon.context.PrivilegedCarbonContext;
+import org.wso2.carbon.core.util.SystemFilter;
+import org.wso2.carbon.statistics.services.util.SystemStatistics;
+import org.apache.stratos.usage.agent.util.PublisherUtils;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+
+public class StatisticsOutHandler extends AbstractHandler{
+
+ private static Log log = LogFactory.getLog(StatisticsOutHandler.class);
+
+ public InvocationResponse invoke(MessageContext messageContext) throws AxisFault {
+
+
+ AxisService axisService = messageContext.getAxisService();
+ if(axisService== null || SystemFilter.isFilteredOutService(axisService.getAxisServiceGroup()) ||
+ axisService.isClientSide()){
+
+ PrivilegedCarbonContext.destroyCurrentContext();
+ return InvocationResponse.CONTINUE;
+ }
+
+ if(Util.getSystemStatisticsUtil()==null){
+
+ PrivilegedCarbonContext.destroyCurrentContext();
+ return InvocationResponse.CONTINUE;
+ }
+ SystemStatistics systemStatistics = Util.getSystemStatisticsUtil().getSystemStatistics(messageContext);
+
+ int tenantId = MultitenantConstants.INVALID_TENANT_ID;
+ tenantId = CarbonContext.getCurrentContext().getTenantId();
+
+ if(tenantId == MultitenantConstants.INVALID_TENANT_ID ||
+ tenantId == MultitenantConstants.SUPER_TENANT_ID) {
+
+ PrivilegedCarbonContext.destroyCurrentContext();
+ return Handler.InvocationResponse.CONTINUE;
+ }
+
+ try {
+ PublisherUtils.publish(systemStatistics, tenantId);
+ } catch (Exception e) {
+ //Logging the complete stacktrace in debug mode
+ if(log.isDebugEnabled()){
+ log.debug(e);
+ }
+
+ log.error("Error occurred while publishing request statistics. Full stacktrace available in debug logs. " + e.getMessage());
+ }
+
+ PrivilegedCarbonContext.destroyCurrentContext();
+ return InvocationResponse.CONTINUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/UsageStatsAxis2ConfigurationContextObserver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/UsageStatsAxis2ConfigurationContextObserver.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/UsageStatsAxis2ConfigurationContextObserver.java
new file mode 100644
index 0000000..152442b
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/UsageStatsAxis2ConfigurationContextObserver.java
@@ -0,0 +1,48 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.context.PrivilegedCarbonContext;
+import org.wso2.carbon.utils.AbstractAxis2ConfigurationContextObserver;
+
+
+public class UsageStatsAxis2ConfigurationContextObserver extends AbstractAxis2ConfigurationContextObserver {
+
+ private static final Log log = LogFactory.getLog(UsageStatsAxis2ConfigurationContextObserver.class);
+
+ @Override
+ public void createdConfigurationContext(ConfigurationContext configContext) {
+
+ AxisConfiguration axisConfiguration = configContext.getAxisConfiguration();
+ int tenantId = PrivilegedCarbonContext.getCurrentContext().getTenantId(false);
+ try {
+ axisConfiguration.engageModule("metering");
+ } catch (AxisFault axisFault) {
+ log.error("Could not engage metering module for tenant: " + tenantId, axisFault);
+ }
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringHandler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringHandler.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringHandler.java
new file mode 100644
index 0000000..09a5f78
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringHandler.java
@@ -0,0 +1,109 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners.axis2;
+
+import org.apache.stratos.usage.agent.util.Util;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.wso2.carbon.core.transports.metering.MeteredServletRequest;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * this class is used to obtain tenant id from MeteredServletRequest or MessageContext
+ */
+
+public class RequestMeteringHandler extends AbstractHandler {
+ private static final Log log = LogFactory.getLog(RequestMeteringHandler.class);
+
+ /**
+ * this method invoke MeteredServletRequest and return a InvocationResponse
+ * @param messageContext MessageContext
+ * @return InvocationResponse
+ * @throws AxisFault
+ */
+
+ public InvocationResponse invoke(MessageContext messageContext) throws AxisFault {
+ if (log.isDebugEnabled()) {
+ log.debug("Staring metering handler invocation. Incoming message: " +
+ messageContext.getEnvelope().toString());
+ }
+ AxisService service = messageContext.getAxisService();
+ Parameter param = service.getParameter("adminService");
+
+ Object obj = messageContext.getProperty("transport.http.servletRequest");
+ if (obj == null) {
+ // TODO: check for cause of the error.
+ log.debug("Servlet request is null. Skip monitoring.");
+ return InvocationResponse.CONTINUE;
+ }
+ if (!(obj instanceof MeteredServletRequest)) {
+ log.debug("HttpServletRequest is not of type MeteredServletRequest. Skip monitoring.");
+ return InvocationResponse.CONTINUE;
+ }
+
+ MeteredServletRequest servletRequest = (MeteredServletRequest) obj;
+
+ if (param != null && "true".equals(param.getValue())) {
+ servletRequest.setAttribute(StratosConstants.ADMIN_SERVICE_SERVLET_ATTR, "true");
+ return InvocationResponse.CONTINUE;
+ }
+ servletRequest.setAttribute(StratosConstants.SERVICE_NAME_SERVLET_ATTR, service.getName());
+
+ int tenantId = getTenantId(servletRequest);
+ servletRequest.setAttribute(StratosConstants.TENANT_ID_SERVLET_ATTR, tenantId);
+
+ return InvocationResponse.CONTINUE;
+ }
+
+ /**
+ * method to get tenant id from MeteredServletRequest
+ * @param servletRequest MeteredServletRequest
+ * @return tenant id
+ */
+
+ private int getTenantId(MeteredServletRequest servletRequest) {
+ String address = servletRequest.getRequestURI();
+ String servicesPrefix = "/services/t/";
+ if (address != null && address.contains(servicesPrefix)) {
+ int domainNameStartIndex =
+ address.indexOf(servicesPrefix) + servicesPrefix.length();
+ int domainNameEndIndex = address.indexOf('/', domainNameStartIndex);
+ String domainName = address.substring(domainNameStartIndex,
+ domainNameEndIndex == -1 ? address.length() : domainNameEndIndex);
+
+ // return tenant id if domain name is not null
+ if (domainName != null) {
+ try {
+ return Util.getRealmService().getTenantManager().getTenantId(domainName);
+ } catch (org.wso2.carbon.user.api.UserStoreException e) {
+ log.error("An error occurred while obtaining the tenant id.", e);
+ }
+ }
+ }
+
+ // return 0 if the domain name is null
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringModule.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringModule.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringModule.java
new file mode 100644
index 0000000..6d1bb2f
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/listeners/axis2/RequestMeteringModule.java
@@ -0,0 +1,47 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.listeners.axis2;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisDescription;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.modules.Module;
+import org.apache.neethi.Assertion;
+import org.apache.neethi.Policy;
+
+public class RequestMeteringModule implements Module {
+
+ public void init(ConfigurationContext configurationContext, AxisModule axisModule)
+ throws AxisFault {
+ }
+
+ public void engageNotify(AxisDescription axisDescription) throws AxisFault {
+ }
+
+ public boolean canSupportAssertion(Assertion assertion) {
+ return true;
+ }
+
+ public void applyPolicy(Policy policy, AxisDescription axisDescription) throws AxisFault {
+ }
+
+ public void shutdown(ConfigurationContext configurationContext) throws AxisFault {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthPersistor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthPersistor.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthPersistor.java
new file mode 100644
index 0000000..d99fd3c
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthPersistor.java
@@ -0,0 +1,58 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.stratos.common.constants.UsageConstants;
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.util.Util;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+/**
+ * this class is used to store incoming and outgoing bandwidth
+ */
+
+public class BandwidthPersistor {
+
+ /**
+ * method to store incoming bandwidth
+ * @param tenantId tenant id
+ * @param size value of the incoming bandwidth
+ */
+
+ public static void storeIncomingBandwidth(int tenantId, long size) {
+ if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+ BandwidthUsage usage = new BandwidthUsage(
+ tenantId, UsageConstants.REGISTRY_INCOMING_BW, size);
+ Util.addToPersistingControllerQueue(usage);
+ }
+ }
+
+ /**
+ * method to store outgoingBandwidth
+ * @param tenantId tenant id
+ * @param size value of the outgoing bandwidth
+ */
+ public static void storeOutgoingBandwidth(int tenantId, long size) {
+ if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+ BandwidthUsage usage = new BandwidthUsage(
+ tenantId, UsageConstants.REGISTRY_OUTGOING_BW, size);
+ Util.addToPersistingControllerQueue(usage);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthUsageDataRetrievalTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthUsageDataRetrievalTask.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthUsageDataRetrievalTask.java
new file mode 100644
index 0000000..8174c07
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/BandwidthUsageDataRetrievalTask.java
@@ -0,0 +1,114 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.config.UsageAgentConfiguration;
+import org.apache.stratos.usage.agent.util.UsageAgentConstants;
+import org.apache.stratos.usage.agent.util.Util;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.base.ServerConfiguration;
+import org.apache.stratos.common.constants.UsageConstants;
+import org.wso2.carbon.tomcat.ext.transport.statistics.TransportStatisticsContainer;
+import org.wso2.carbon.tomcat.ext.transport.statistics.TransportStatisticsEntry;
+import org.wso2.carbon.user.api.UserStoreException;
+
+import java.util.Queue;
+
+public class BandwidthUsageDataRetrievalTask implements Runnable {
+ private static final Log log = LogFactory.getLog(BandwidthUsageDataRetrievalTask.class);
+
+ private Queue<TransportStatisticsEntry> transportStats;
+ private UsageAgentConfiguration configuration;
+
+ //This will be decided based on whether a BAM Server URL is provided or not
+ private boolean isBamAvailable=false;
+
+ public BandwidthUsageDataRetrievalTask(UsageAgentConfiguration configuration) {
+ transportStats = TransportStatisticsContainer.getTransportStatistics();
+ this.configuration = configuration;
+
+ //Checking for the BAM Server URL
+ String bamServerUrl = ServerConfiguration.getInstance().getFirstProperty("BamServerURL");
+ if(bamServerUrl != null){
+ this.isBamAvailable = true;
+ }
+ }
+
+ public void run() {
+ /*if (log.isDebugEnabled()) {
+ log.debug("Retrieving Service and Web App bandwidth usage statistics.");
+ }*/
+
+ if (!transportStats.isEmpty()) {
+ for (int i = 0; i < configuration.getUsageTasksNumberOfRecordsPerExecution() && !transportStats.isEmpty(); i++) {
+ TransportStatisticsEntry entry = transportStats.remove();
+ try {
+ if(!isBamAvailable){
+ return;
+ }
+
+ int tenantId = getTenantID(entry.getTenantName());
+ //if the tenant does not exist, no need and no way of updating the usage data
+ //therefore ignore it
+ if(tenantId<0){
+ return;
+ }
+ if (inferMeasurement(entry).equals(UsageConstants.SERVICE_BANDWIDTH)) {
+ if (entry.getRequestSize() > 0) {
+ Util.addToPersistingControllerQueue(new BandwidthUsage(getTenantID(entry.getTenantName()), UsageConstants.SERVICE_INCOMING_BW, entry.getRequestSize()));
+ }
+ if (entry.getResponseSize() > 0) {
+ Util.addToPersistingControllerQueue(new BandwidthUsage(getTenantID(entry.getTenantName()), UsageConstants.SERVICE_OUTGOING_BW, entry.getResponseSize()));
+ }
+ } else if (inferMeasurement(entry).equals(UsageConstants.WEBAPP_BANDWIDTH)) {
+ if (entry.getRequestSize() > 0) {
+ Util.addToPersistingControllerQueue(new BandwidthUsage(getTenantID(entry.getTenantName()), UsageConstants.WEBAPP_INCOMING_BW, entry.getRequestSize()));
+ }
+ if (entry.getResponseSize() > 0) {
+ Util.addToPersistingControllerQueue(new BandwidthUsage(getTenantID(entry.getTenantName()), UsageConstants.WEBAPP_OUTGOING_BW, entry.getResponseSize()));
+ }
+ }
+ } catch (UserStoreException e) {
+ log.error("Error persisting bandwidth usage statistics.", e);
+ }
+
+ }
+ }
+ }
+
+
+ private String inferMeasurement(TransportStatisticsEntry entry) {
+ if (entry.getContext() != null) {
+ if (entry.getContext().equals(UsageAgentConstants.BANDWIDTH_USAGE_SERVICES_CONTEXT)) {
+ return UsageConstants.SERVICE_BANDWIDTH;
+ } else if (entry.getContext().equals(UsageAgentConstants.BANDWIDTH_USAGE_WEBAPPS_CONTEXT)) {
+ return UsageConstants.WEBAPP_BANDWIDTH;
+ }
+ }
+
+ return UsageAgentConstants.BANDWIDTH_CARBON;
+ }
+
+ private int getTenantID(String tenantDomain) throws UserStoreException {
+ return Util.getRealmService().getTenantManager().getTenantId(tenantDomain);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/RegistryUsagePersister.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/RegistryUsagePersister.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/RegistryUsagePersister.java
new file mode 100644
index 0000000..f56d9b9
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/RegistryUsagePersister.java
@@ -0,0 +1,76 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.stratos.common.constants.UsageConstants;
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.util.Util;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+import java.lang.System;
+
+/**
+ * this class is used to store incoming and outgoing bandwidth
+ */
+
+public class RegistryUsagePersister {
+
+ /**
+ * method to store incoming bandwidth
+ * @param tenantId tenant id
+ * @param size value of the incoming bandwidth
+ */
+
+ public static void storeIncomingBandwidth(int tenantId, long size) {
+ if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+ BandwidthUsage usage = new BandwidthUsage(
+ tenantId, UsageConstants.REGISTRY_INCOMING_BW, size);
+ Util.addToPersistingControllerQueue(usage);
+ }
+ }
+ //=============================================================
+
+ public static void storeAddContent(int tenantId, long size) {
+ if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+ BandwidthUsage usage = new BandwidthUsage(
+ tenantId, "ContentBandwidth-In", size);
+ Util.addToPersistingControllerQueue(usage);
+ }
+ }
+ public static void storeDeleteContent(int tenantId, long size) {
+ if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+ BandwidthUsage usage = new BandwidthUsage(
+ tenantId, "ContentBandwidth-Out", size);
+ Util.addToPersistingControllerQueue(usage);
+ }
+ }
+ //=============================================================
+ /**
+ * method to store outgoingBandwidth
+ * @param tenantId tenant id
+ * @param size value of the outgoing bandwidth
+ */
+ public static void storeOutgoingBandwidth(int tenantId, long size) {
+ if ((MultitenantConstants.SUPER_TENANT_ID!=tenantId) && (size > 0)) {
+ BandwidthUsage usage = new BandwidthUsage(
+ tenantId, UsageConstants.REGISTRY_OUTGOING_BW, size);
+ Util.addToPersistingControllerQueue(usage);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/ServiceDataPersistor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/ServiceDataPersistor.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/ServiceDataPersistor.java
new file mode 100644
index 0000000..c7e7fb7
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/ServiceDataPersistor.java
@@ -0,0 +1,72 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.util.Util;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.core.transports.metering.MeteredServletRequest;
+import org.wso2.carbon.core.transports.metering.MeteredServletResponse;
+import org.wso2.carbon.core.transports.metering.RequestDataPersister;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.common.constants.UsageConstants;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+/**
+ * this class is used to persist service data
+ */
+public class ServiceDataPersistor implements RequestDataPersister {
+ private static final Log log = LogFactory.getLog(ServiceDataPersistor.class);
+
+ /**
+ * this method get tenant id, inDataSize and outDataSize from the wrappedRequest, construct a
+ * BandwidthUsage object and add it to PersistingControllerQueue
+ * @param wrappedRequest MeteredServletRequest
+ * @param wrappedResponse MeteredServletResponse
+ */
+ public void persist(MeteredServletRequest wrappedRequest, MeteredServletResponse wrappedResponse) {
+ if ("true".equals(wrappedRequest.getAttribute(StratosConstants.SERVICE_NAME_SERVLET_ATTR))) {
+ return;
+ }
+
+ Integer tenantId = (Integer) wrappedRequest.getAttribute(
+ StratosConstants.TENANT_ID_SERVLET_ATTR);
+ if (tenantId == null || tenantId == MultitenantConstants.SUPER_TENANT_ID) {
+ return;
+ }
+ long inDataSize = wrappedRequest.getReadSize();
+ long outDataSize = wrappedResponse.getWrittenSize();
+
+ if(log.isTraceEnabled()){
+ log.trace("Persisting service bandwidth usage for tenant " + tenantId + " in size: " + inDataSize + " out size: " + outDataSize);
+ }
+ // add the job to queue
+ if (inDataSize > 0) {
+ BandwidthUsage usage = new BandwidthUsage(tenantId,
+ UsageConstants.SERVICE_INCOMING_BW, inDataSize);
+ Util.addToPersistingControllerQueue(usage);
+ }
+ if (outDataSize > 0) {
+ BandwidthUsage usage = new BandwidthUsage(tenantId,
+ UsageConstants.SERVICE_OUTGOING_BW, outDataSize);
+ Util.addToPersistingControllerQueue(usage);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceManager.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceManager.java
new file mode 100644
index 0000000..5c27e20
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceManager.java
@@ -0,0 +1,91 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.config.UsageAgentConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Queue;
+import java.util.concurrent.*;
+
+public class UsageDataPersistenceManager {
+ private static final Log log = LogFactory.getLog(UsageDataPersistenceManager.class);
+
+ // queue to store Bandwidth usage statistics.
+ // usage of LinkedBlockingQueue ensures operations on the queue to wait for the queue to be non
+ // empty when retrieving and wait for space when storing element.
+ private Queue<BandwidthUsage> persistenceJobs = new LinkedBlockingQueue<BandwidthUsage>();
+
+ private final ScheduledExecutorService scheduler;
+
+ private UsageAgentConfiguration configuration;
+
+ public UsageDataPersistenceManager(UsageAgentConfiguration configuration) {
+ scheduler = Executors.newScheduledThreadPool(2, new UsageDataPersistenceThreadFactory());
+ this.configuration = configuration;
+ }
+
+ /**
+ * this method add bandwidth usage entries to the jobQueue
+ *
+ * @param usage Bandwidth usage
+ */
+
+ public void addToQueue(BandwidthUsage usage) {
+ persistenceJobs.add(usage);
+ }
+
+ public void scheduleUsageDataPersistenceTask() {
+ //we will schedule the usage data persistence task only if interval is not -1
+ if(configuration.getUsageTasksExecutionIntervalInMilliSeconds()>0){
+ scheduler.scheduleWithFixedDelay(new UsageDataPersistenceTask(persistenceJobs, configuration),
+ configuration.getUsageTasksStartupDelayInMilliSeconds(),
+ configuration.getUsageTasksExecutionIntervalInMilliSeconds(),
+ TimeUnit.MILLISECONDS);
+ log.debug("Usage data persistence task was scheduled");
+ }else{
+ log.debug("Usage data persistence task is disabled");
+ }
+ }
+
+
+ public void scheduleBandwidthUsageDataRetrievalTask() {
+ //we will schedule the usage data retrieval task only if interval is not -1
+ if(configuration.getUsageTasksExecutionIntervalInMilliSeconds()>0){
+ scheduler.scheduleWithFixedDelay(new BandwidthUsageDataRetrievalTask(configuration),
+ configuration.getUsageTasksStartupDelayInMilliSeconds(),
+ configuration.getUsageTasksExecutionIntervalInMilliSeconds(),
+ TimeUnit.MILLISECONDS);
+ log.debug("Bandwidth Usage data retrieval task was scheduled");
+ }else {
+ log.debug("Bandwidth Usage data retrieval task was disabled");
+ }
+ }
+
+ class UsageDataPersistenceThreadFactory implements ThreadFactory {
+ private int counter = 0;
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "UsageDataPersistenceThread-" + counter++);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceTask.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceTask.java
new file mode 100644
index 0000000..59b5a7e
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/persist/UsageDataPersistenceTask.java
@@ -0,0 +1,166 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+package org.apache.stratos.usage.agent.persist;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.base.MultitenantConstants;
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.stratos.usage.agent.config.UsageAgentConfiguration;
+import org.apache.stratos.usage.agent.exception.UsageException;
+import org.apache.stratos.usage.agent.util.PublisherUtils;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Queue;
+
+public class UsageDataPersistenceTask implements Runnable {
+
+ private static final Log log = LogFactory.getLog(UsageDataPersistenceTask.class);
+
+ private Queue<BandwidthUsage> usagePersistenceJobs;
+ private UsageAgentConfiguration configuration;
+
+ public UsageDataPersistenceTask(Queue<BandwidthUsage> jobs, UsageAgentConfiguration configuration) {
+ usagePersistenceJobs = jobs;
+ this.configuration = configuration;
+ }
+
+ public void run() {
+ if (!usagePersistenceJobs.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Persisting Service and Web App bandwidth usage statistics");
+ }
+ try {
+ persistUsage(usagePersistenceJobs);
+ } catch (UsageException e) {
+ log.error("Error when persisting usage statistics.", e);
+ }
+ }
+ }
+
+ /**
+ * this method create a Summarizer object for each tenant and call accumulate() method to
+ * accumulate usage statistics
+ *
+ * @param jobQueue usage data persistence jobs
+ * @throws org.apache.stratos.usage.agent.exception.UsageException
+ *
+ */
+
+ public void persistUsage(Queue<BandwidthUsage> jobQueue) throws UsageException {
+
+ // create a map to hold summarizer objects against tenant id
+ HashMap<Integer, Summarizer> summarizerMap = new HashMap<Integer, Summarizer>();
+
+ // if the jobQueue is not empty
+ for (int i = 0; i < configuration.getUsageTasksNumberOfRecordsPerExecution() && !jobQueue.isEmpty(); i++) {
+
+ // get the first element from the queue, which is a BandwidthUsage object
+ BandwidthUsage usage = jobQueue.poll();
+
+ // get the tenant id
+ int tenantId = usage.getTenantId();
+
+ //get the Summarizer object corresponds to the tenant id
+ Summarizer summarizer = summarizerMap.get(tenantId);
+
+ // when tenant invoke service for the first time, no corresponding summarizer object in
+ // the map
+ if (summarizer == null) {
+ //create a Summarizer object and put to the summarizerMap
+ summarizer = new Summarizer();
+ summarizerMap.put(tenantId, summarizer);
+ }
+
+ // now accumulate usage
+ summarizer.accumulate(usage);
+ }
+
+ //Finished accumulating. Now publish the events
+
+ // get the collection view of values in summarizerMap
+ Collection<Summarizer> summarizers = summarizerMap.values();
+
+ // for each summarizer object call the publish method
+ for (Summarizer summarizer : summarizers) {
+ summarizer.publish();
+ }
+ }
+
+ /**
+ * inner class Summarizer
+ * this class is used to accumulate and publish usage statistics.
+ * for each tenant this keeps a map to store BandwidthUsage values
+ */
+ private static class Summarizer {
+ private HashMap<String, BandwidthUsage> usageMap;
+
+ public Summarizer() {
+ usageMap = new HashMap<String, BandwidthUsage>();
+ }
+
+ /**
+ * the method to accumulate usage data
+ *
+ * @param usage BandwidthUsage
+ */
+
+ public void accumulate(BandwidthUsage usage) {
+ // get the measurement name of usage entry
+ String key = usage.getMeasurement();
+
+ // get the existing value of measurement
+ BandwidthUsage existingUsage = usageMap.get(key);
+
+ // if this measurement is metered earlier add the new value to the existing value
+ if (existingUsage != null) {
+ existingUsage.setValue(existingUsage.getValue() + usage.getValue());
+ } else {
+ // if this measurement is not metered previously we need to add it to the usageMap
+ usageMap.put(key, usage);
+ }
+ }
+
+ /**
+ * this method reads usage items from the usageMap and call publish method to publish to
+ * the BAM
+ *
+ * @throws UsageException
+ */
+
+ public void publish() throws UsageException {
+
+ // get the collection view of values in usageMap
+ Collection<BandwidthUsage> usages = usageMap.values();
+
+ for (BandwidthUsage usage : usages) {
+ try {
+ // publish the usage entry if it is not the super-tenant
+ if(MultitenantConstants.SUPER_TENANT_ID != usage.getTenantId()){
+ PublisherUtils.publish(usage);
+ }
+ } catch (UsageException e) {
+ log.error("Error in publishing bandwidth usage data", e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/services/CustomMeteringService.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/services/CustomMeteringService.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/services/CustomMeteringService.java
new file mode 100644
index 0000000..dcb6c12
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/services/CustomMeteringService.java
@@ -0,0 +1,95 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.services;
+
+import org.wso2.carbon.core.AbstractAdmin;
+import org.apache.stratos.usage.agent.api.CustomMeteringAgent;
+import org.apache.stratos.usage.agent.exception.UsageException;
+
+/**
+ * CustomMeteringService class defines methods to get recorded duration, to check whether
+ * usage entries exists, persist usage, retrieve usage and add usage.
+ */
+public class CustomMeteringService extends AbstractAdmin {
+
+ /**
+ * method to get recorded durations
+ * @param measurement the measurement name
+ * @return duration array
+ * @throws Exception
+ */
+
+ public String[] getRecordedDurations(String measurement) throws Exception {
+ return new CustomMeteringAgent(getGovernanceRegistry()).getRecordedDurations(measurement);
+ }
+
+ /**
+ * method to check whether usage entry exists or not
+ * @param duration duration
+ * @param measurement measurement name
+ * @return true if usage entry exist
+ * @throws Exception
+ */
+ public boolean isUsageEntryExists( String duration, String measurement)
+ throws Exception {
+ return new CustomMeteringAgent(getGovernanceRegistry()).isUsageEntryExists(duration,
+ measurement);
+ }
+
+ /**
+ * method to persist usage
+ * @param duration
+ * @param measurement measurement name
+ * @param value value of measurement
+ * @throws Exception
+ */
+ public void persistUsage( String duration, String measurement, String value)
+ throws Exception {
+ new CustomMeteringAgent(getGovernanceRegistry()).persistUsage(duration, measurement, value);
+ }
+
+ /**
+ * method to retrieve usage
+ * @param duration
+ * @param measurement measurement name
+ * @return usage value
+ * @throws UsageException
+ */
+
+ public String retrieveUsage( String duration, String measurement)
+ throws UsageException {
+ return new CustomMeteringAgent(getGovernanceRegistry())
+ .retrieveUsage(duration, measurement);
+ }
+
+ /**
+ * method to add usage entries
+ * @param userName user name
+ * @param duration duration of the measurement
+ * @param measurement measurement name
+ * @param value usage value
+ * @return usage value
+ * @throws Exception
+ */
+ public long addUsage(String userName, String duration, String measurement, long value)
+ throws Exception {
+ return new CustomMeteringAgent(getGovernanceRegistry()).addUsage(duration, measurement,
+ value);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredReader.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredReader.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredReader.java
new file mode 100644
index 0000000..e61d9b9
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredReader.java
@@ -0,0 +1,50 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.util;
+
+import java.io.IOException;
+import java.io.Reader;
+
+
+/**
+ * this class is used to wrap the Reader object
+ */
+public class MonitoredReader extends Reader {
+ Reader reader;
+ long totalRead;
+
+ public MonitoredReader(Reader reader) {
+ this.reader = reader;
+ totalRead = 0;
+ }
+
+ public int read(char cbuf[], int off, int len) throws IOException {
+ int read = reader.read(cbuf, off, len);
+ totalRead += read;
+ return read;
+ }
+
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ public long getTotalRead() {
+ return totalRead;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredWriter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredWriter.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredWriter.java
new file mode 100644
index 0000000..307a2e4
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/MonitoredWriter.java
@@ -0,0 +1,53 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.util;
+
+import java.io.IOException;
+import java.io.Writer;
+
+
+/**
+ * this class is used to wrap Writer object
+ */
+public class MonitoredWriter extends Writer {
+ Writer writer;
+ long totalWritten;
+
+ public MonitoredWriter(Writer writer) {
+ this.writer = writer;
+ totalWritten = 0;
+ }
+
+ public void write(char cbuf[], int off, int len) throws IOException {
+ totalWritten += (len - off);
+ writer.write(cbuf, off, len);
+ }
+
+ public void flush() throws IOException {
+ writer.flush();
+ }
+
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ public long getTotalWritten() {
+ return totalWritten;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/df3475cc/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/PublisherUtils.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/PublisherUtils.java b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/PublisherUtils.java
new file mode 100644
index 0000000..607b30b
--- /dev/null
+++ b/components/org.apache.stratos.usage.agent/src/main/java/org/apache/stratos/usage/agent/util/PublisherUtils.java
@@ -0,0 +1,442 @@
+/*
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+package org.apache.stratos.usage.agent.util;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.stratos.usage.agent.beans.BandwidthUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.base.ServerConfiguration;
+import org.wso2.carbon.databridge.agent.thrift.Agent;
+import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.databridge.commons.exception.NoStreamDefinitionExistException;
+import org.wso2.carbon.statistics.services.util.SystemStatistics;
+import org.apache.stratos.common.util.CommonUtil;
+import org.apache.stratos.usage.agent.exception.UsageException;
+import org.wso2.carbon.user.api.Tenant;
+import org.wso2.carbon.utils.CarbonUtils;
+import org.wso2.carbon.utils.ConfigurationContextService;
+import org.wso2.carbon.utils.NetworkUtils;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+import org.apache.stratos.usage.agent.beans.APIManagerRequestStats;
+
+import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.SocketException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * this class provide utility methods to publish usage statistics
+ */
+public class PublisherUtils {
+ private static Log log = LogFactory.getLog(PublisherUtils.class);
+ private static final String TRANSPORT = "https";
+ private static ConfigurationContextService configurationContextService;
+ private static Agent agent;
+ private static DataPublisher dataPublisher;
+ private static AsyncDataPublisher asyncDataPublisher;
+ private static String streamId;
+ private static final String usageEventStream = "org.wso2.carbon.usage.agent";
+ private static final String usageEventStreamVersion = "1.0.0";
+
+ private static final String reqStatEventStream="org.wso2.carbon.service.request.stats";
+ private static final String reqStatEventStreamVersion="1.0.0";
+ private static String reqStatEventStreamId;
+
+ private static Map<Integer, String> serverUrlMap = new HashMap<Integer, String>();
+
+
+ /**
+ * method to update server name
+ * @param tenantId tenant id
+ * @return server name
+ * @throws UsageException
+ */
+
+ public static String updateServerName(int tenantId) throws UsageException {
+
+ String serverName;
+ String hostName;
+
+ try {
+ hostName = NetworkUtils.getLocalHostname();
+ } catch (SocketException e) {
+ throw new UsageException("Error getting host name for the registry usage event payload",
+ e);
+ }
+
+ ConfigurationContextService configurationContextService = PublisherUtils.
+ getConfigurationContextService();
+ ConfigurationContext configurationContext;
+ if (configurationContextService != null) {
+ configurationContext = configurationContextService.getServerConfigContext();
+ } else {
+ throw new UsageException("ConfigurationContext is null");
+ }
+// int port = CarbonUtils.getTransportPort(configurationContext, "https");
+
+ String carbonHttpsPort = System.getProperty("carbon." + TRANSPORT + ".port");
+ if (carbonHttpsPort == null) {
+ carbonHttpsPort = Integer.toString(
+ CarbonUtils.getTransportPort(configurationContext, TRANSPORT));
+ }
+ String baseServerUrl = TRANSPORT + "://" + hostName + ":" + carbonHttpsPort;
+ String context = configurationContext.getContextRoot();
+
+ String tenantDomain = null;
+ try {
+ Tenant tenant = Util.getRealmService().getTenantManager().getTenant(tenantId);
+ if(tenant!=null){
+ tenantDomain = tenant.getDomain();
+ }
+ } catch (org.wso2.carbon.user.api.UserStoreException e) {
+ throw new UsageException("Failed to get tenant domain", e);
+ }
+
+ if ((tenantDomain != null) &&
+ !(tenantDomain.equals(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME))) {
+ serverName = baseServerUrl + context + "t/" + tenantDomain;
+
+ } else if (context.equals("/")) {
+
+ serverName = baseServerUrl + "";
+ } else {
+ serverName = baseServerUrl + context;
+
+ }
+
+ return serverName;
+ }
+
+ public static String getServerUrl(int tenantId){
+
+ String serverUrl = serverUrlMap.get(tenantId);
+ if(serverUrl!=null){
+ return serverUrl;
+ }
+
+ if(serverUrl==null){
+ try{
+ serverUrl = updateServerName(tenantId);
+ }catch (UsageException e) {
+ log.error("Could not create the server url for tenant id: " + tenantId, e);
+ }
+ }
+
+ if(serverUrl!=null && !"".equals(serverUrl)){
+ serverUrlMap.put(tenantId, serverUrl);
+ }
+ return serverUrl;
+ }
+
+ public static void defineUsageEventStream() throws Exception {
+
+ createDataPublisher();
+
+ if(dataPublisher == null){
+ return;
+ }
+
+ try {
+
+ streamId = dataPublisher.findStream(usageEventStream, usageEventStreamVersion);
+ log.info("Event stream with stream ID: " + streamId + " found.");
+
+ } catch (NoStreamDefinitionExistException e) {
+
+ log.info("Defining the event stream because it was not found in BAM");
+ try {
+ defineStream();
+ } catch (Exception ex) {
+ String msg = "An error occurred while defining the even stream for Usage agent. " + e.getMessage();
+ log.warn(msg);
+ }
+
+ }
+
+ }
+
+ private static void defineStream() throws Exception {
+ streamId = dataPublisher.
+ defineStream("{" +
+ " 'name':'" + usageEventStream +"'," +
+ " 'version':'" + usageEventStreamVersion +"'," +
+ " 'nickName': 'usage.agent'," +
+ " 'description': 'Tenant usage data'," +
+ " 'metaData':[" +
+ " {'name':'clientType','type':'STRING'}" +
+ " ]," +
+ " 'payloadData':[" +
+ " {'name':'ServerName','type':'STRING'}," +
+ " {'name':'TenantID','type':'STRING'}," +
+ " {'name':'Type','type':'STRING'}," +
+ " {'name':'Value','type':'LONG'}" +
+ " ]" +
+ "}");
+
+ }
+
+ private static void defineRequestStatEventStream() throws Exception{
+ reqStatEventStreamId = dataPublisher.
+ defineStream("{" +
+ " 'name':'" + reqStatEventStream +"'," +
+ " 'version':'" + reqStatEventStreamVersion +"'," +
+ " 'nickName': 'service.request.stats'," +
+ " 'description': 'Tenants service request statistics'," +
+ " 'metaData':[" +
+ " {'name':'clientType','type':'STRING'}" +
+ " ]," +
+ " 'payloadData':[" +
+ " {'name':'ServerName','type':'STRING'}," +
+ " {'name':'TenantID','type':'STRING'}," +
+ " {'name':'RequestCount','type':'INT'}," +
+ " {'name':'ResponseCount','type':'INT'}," +
+ " {'name':'FaultCount','type':'INT'}," +
+ " {'name':'ResponseTime','type':'LONG'}" +
+ " ]" +
+ "}");
+ }
+
+ public static void createDataPublisher(){
+
+ ServerConfiguration serverConfig = CarbonUtils.getServerConfiguration();
+ String trustStorePath = serverConfig.getFirstProperty("Security.TrustStore.Location");
+ String trustStorePassword = serverConfig.getFirstProperty("Security.TrustStore.Password");
+ String bamServerUrl = serverConfig.getFirstProperty("BamServerURL");
+ String adminUsername = CommonUtil.getStratosConfig().getAdminUserName();
+ String adminPassword = CommonUtil.getStratosConfig().getAdminPassword();
+
+ System.setProperty("javax.net.ssl.trustStore", trustStorePath);
+ System.setProperty("javax.net.ssl.trustStorePassword", trustStorePassword);
+
+ try {
+ dataPublisher = new DataPublisher(bamServerUrl, adminUsername, adminPassword);
+ } catch (Exception e) {
+ log.warn("Unable to create a data publisher to " + bamServerUrl +
+ ". Usage Agent will not function properly. " + e.getMessage());
+ }
+
+ }
+
+ /**
+ * Creates an async data publisher using the existing data publisher object
+ */
+ public static void createAsynDataPublisher(){
+ if(dataPublisher==null){
+ createDataPublisher();
+ }
+
+ if(dataPublisher==null){
+ log.warn("Cannot create the async data publisher because the data publisher is null");
+ return;
+ }
+
+ try {
+ asyncDataPublisher = new AsyncDataPublisher(dataPublisher);
+ } catch (Exception e) {
+ log.error("Could not create an async data publisher using the data publisher", e);
+ }
+ }
+
+
+ /**
+ * this method get the event payload, construct the SOAP envelop and call the publish method in
+ * EventBrokerService.
+ *
+ * @param usage BandwidthUsage
+ * @throws UsageException
+ */
+ public static void publish(BandwidthUsage usage) throws UsageException {
+
+ if(dataPublisher==null){
+ log.info("Creating data publisher for usage data publishing");
+ createDataPublisher();
+
+ //If we cannot create a data publisher we should give up
+ //this means data will not be published
+ if(dataPublisher == null){
+ return;
+ }
+ }
+
+ if(streamId == null){
+ try{
+ streamId = dataPublisher.findStream(usageEventStream, usageEventStreamVersion);
+ }catch (NoStreamDefinitionExistException e){
+ log.info("Defining the event stream because it was not found in BAM");
+ try{
+ defineStream();
+ } catch(Exception ex){
+ String msg = "Error occurred while defining the event stream for publishing usage data. " + ex.getMessage();
+ log.error(msg);
+ //We do not want to proceed without an event stream. Therefore we return.
+ return;
+ }
+ }catch (Exception exc){
+ log.error("Error occurred while searching for stream id. " + exc.getMessage());
+ //We do not want to proceed without an event stream. Therefore we return.
+ return;
+ }
+ }
+
+ try {
+
+ Event usageEvent = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
+ new Object[]{getServerUrl(usage.getTenantId()),
+ Integer.toString(usage.getTenantId()),
+ usage.getMeasurement(),
+ usage.getValue()});
+
+ dataPublisher.publish(usageEvent);
+
+ } catch (Exception e) {
+ log.error("Error occurred while publishing usage event to BAM. " + e.getMessage(), e);
+ throw new UsageException(e.getMessage(), e);
+ }
+
+ }
+
+ public static void publish(SystemStatistics statistics, int tenantId) throws Exception {
+
+ if(dataPublisher==null){
+ log.info("Creating data publisher for service-stats publishing");
+ createDataPublisher();
+
+ //If we cannot create a data publisher we should give up
+ //this means data will not be published
+ if(dataPublisher == null){
+ return;
+ }
+ }
+
+ if(reqStatEventStreamId == null){
+ try{
+ reqStatEventStreamId = dataPublisher.findStream(reqStatEventStream, reqStatEventStreamVersion);
+ }catch (NoStreamDefinitionExistException e){
+ log.info("Defining the event stream because it was not found in BAM");
+ try{
+ defineRequestStatEventStream();
+ } catch(Exception ex){
+ String msg = "Error occurred while defining the event stream for publishing usage data. " + ex.getMessage();
+ log.error(msg);
+ //We do not want to proceed without an event stream. Therefore we return.
+ return;
+ }
+ }catch (Exception exc){
+ log.error("Error occurred while searching for stream id. " + exc.getMessage());
+ //We do not want to proceed without an event stream. Therefore we return.
+ return;
+ }
+ }
+
+ try {
+
+ Event usageEvent = new Event(reqStatEventStreamId, System.currentTimeMillis(), new Object[]{"external"}, null,
+ new Object[]{getServerUrl(tenantId),
+ Integer.toString(tenantId),
+ statistics.getCurrentInvocationRequestCount(),
+ statistics.getCurrentInvocationResponseCount(),
+ statistics.getCurrentInvocationFaultCount(),
+ statistics.getCurrentInvocationResponseTime()});
+
+ dataPublisher.publish(usageEvent);
+
+ } catch (Exception e) {
+ log.error("Error occurred while publishing usage event to BAM. " + e.getMessage(), e);
+ throw new UsageException(e.getMessage(), e);
+ }
+
+ }
+
+ /**
+ * @param statistics APIManagerRequestStats which contains usage data
+ * @param tenantId Tenant id of tenant associated with usage stat
+ * @throws Exception UsageException when error in usage stat publishing
+ */
+ public static void publish(APIManagerRequestStats statistics, int tenantId) throws Exception {
+
+ if (dataPublisher == null) {
+ log.info("Creating data publisher for usage data publishing");
+ createDataPublisher();
+
+ //If we cannot create a data publisher we should give up
+ //this means data will not be published
+ if (dataPublisher == null) {
+ return;
+ }
+ }
+
+ if (streamId == null) {
+ try {
+ streamId = dataPublisher.findStream(usageEventStream, usageEventStreamVersion);
+ } catch (NoStreamDefinitionExistException e) {
+ log.info("Defining the event stream because it was not found in BAM");
+ try {
+ defineStream();
+ } catch (Exception ex) {
+ String msg = "Error occurred while defining the event stream for publishing usage data. " + ex.getMessage();
+ log.error(msg);
+ //We do not want to proceed without an event stream. Therefore we return.
+ return;
+ }
+ } catch (Exception exc) {
+ log.error("Error occurred while searching for stream id. " + exc.getMessage());
+ //We do not want to proceed without an event stream. Therefore we return.
+ return;
+ }
+ }
+
+ try {
+ //Get data from API manager request stat object and create event
+ Event usageEvent = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
+ new Object[]{getServerUrl(statistics.getTenantId()),
+ Integer.toString(statistics.getTenantId()),
+ statistics.getMeasurement(),
+ statistics.getValue()});
+ //publish usage to bam
+ dataPublisher.publish(usageEvent);
+
+ } catch (Exception e) {
+ log.error("Error occurred while publishing usage event to BAM. " + e.getMessage(), e);
+ throw new UsageException(e.getMessage(), e);
+ }
+
+ }
+
+ /**
+ * method to get configurationContextService
+ * @return configurationContextService
+ */
+
+ public static ConfigurationContextService getConfigurationContextService() {
+ return configurationContextService;
+ }
+
+ /**
+ * method to setConfigurationContextService
+ * @param configurationContextService
+ */
+ public static void setConfigurationContextService(ConfigurationContextService configurationContextService) {
+ PublisherUtils.configurationContextService = configurationContextService;
+ }
+
+}