You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ga...@apache.org on 2015/08/20 08:44:48 UTC

[4/5] stratos git commit: Adding Metering and Monitoring Service Implementation

Adding Metering and Monitoring Service Implementation

Conflicts:
	components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
	components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
	components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/54283eda
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/54283eda
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/54283eda

Branch: refs/heads/tenant-isolation
Commit: 54283eda982c5677402b1e4ac895d95c9cccebe7
Parents: 4fdd431
Author: Thanuja <th...@wso2.com>
Authored: Wed Jul 29 18:51:34 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Thu Aug 20 10:58:32 2015 +0530

----------------------------------------------------------------------
 .../client/AutoscalerCloudControllerClient.java |   16 +-
 .../autoscaler/rule/RuleTasksDelegator.java     |   48 +-
 .../publisher/HealthStatisticsNotifier.java     |   10 +-
 .../messaging/topology/TopologyBuilder.java     | 1434 +++++++++---------
 .../impl/CloudControllerServiceImpl.java        |    6 +-
 .../impl/CloudControllerServiceUtil.java        |   15 +-
 .../services/impl/InstanceCreator.java          |   17 +-
 .../publisher/BAMUsageDataPublisher.java        |   44 +-
 .../util/CloudControllerConstants.java          |    4 +
 .../common/constants/StratosConstants.java      |    8 +-
 .../publisher/HealthStatisticsPublisher.java    |    3 +-
 .../publisher/InFlightRequestPublisher.java     |    4 +-
 .../cep/WSO2CEPHealthStatisticsPublisher.java   |    9 +-
 .../cep/WSO2CEPInFlightRequestPublisher.java    |    6 +-
 .../LoadBalancerStatisticsNotifier.java         |    3 +-
 .../publisher/MockHealthStatisticsNotifier.java |    3 +
 .../modules/healthstatspublisher/healthstats.py |    5 +-
 .../HealthStatsEventFormatter.xml               |   30 +
 .../eventformatters/RIFEventFormatter.xml       |   31 +
 .../DASDefaultWSO2EventOutputAdaptor.xml        |   29 +
 .../streamdefinitions/stream-manager-config.xml |  486 +++---
 extensions/das/README.md                        |   10 +
 .../CloudControllerEventReceiver.xml            |   29 +
 .../eventreceivers/HealthStatsEventReceiver.xml |   29 +
 .../eventreceivers/RIFEventReceiver.xml         |   29 +
 .../eventsink/cartridge_agent_health_stats.xml  |   85 ++
 .../artifacts/eventsink/in_flight_requests.xml  |   64 +
 .../org_apache_stratos_cloud_controller.xml     |  211 +++
 .../cartridge_agent_health_stats_1.0.0.json     |   40 +
 .../eventstreams/in_flight_requests_1.0.0.json  |   28 +
 ...g.apache.stratos.cloud.controller_1.0.0.json |  112 ++
 extensions/das/artifacts/sparkscript/CCEvent    |   18 +
 extensions/das/pom.xml                          |   40 +
 extensions/das/spark-udf/pom.xml                |   36 +
 .../das/extension/spark/udf/TimeUDF.java        |   49 +
 extensions/pom.xml                              |    4 +-
 .../src/main/conf/drools/dependent-scaling.drl  |    4 +-
 .../src/main/conf/drools/mincheck.drl           |    5 +-
 .../src/main/conf/drools/scaling.drl            |    7 +-
 39 files changed, 2007 insertions(+), 1004 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
index 65e952f..f19531b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
@@ -84,7 +84,8 @@ public class AutoscalerCloudControllerClient {
     public synchronized MemberContext startInstance(PartitionRef partition,
                                                     String clusterId, String clusterInstanceId,
                                                     String networkPartitionId, boolean isPrimary,
-                                                    int minMemberCount) throws SpawningException {
+                                                    int minMemberCount, String autoscalingReason,
+                                                    long scalingTime) throws SpawningException {
         try {
             if (log.isInfoEnabled()) {
                 log.info(String.format("Trying to spawn an instance via cloud controller: " +
@@ -115,8 +116,18 @@ public class AutoscalerCloudControllerClient {
             minCountProp.setName(StratosConstants.MIN_COUNT);
             minCountProp.setValue(String.valueOf(minMemberCount));
 
+            Property autoscalingReasonProp = new Property();
+            autoscalingReasonProp.setName(StratosConstants.SCALING_REASON);
+            autoscalingReasonProp.setValue(autoscalingReason);
+
+            Property scalingTimeProp = new Property();
+            scalingTimeProp.setName(StratosConstants.SCALING_TIME);
+            scalingTimeProp.setValue(String.valueOf(scalingTime));
+
             memberContextProps.addProperty(isPrimaryProp);
             memberContextProps.addProperty(minCountProp);
+            memberContextProps.addProperty(autoscalingReasonProp);
+            memberContextProps.addProperty(scalingTimeProp);
             instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps));
 
             long startTime = System.currentTimeMillis();
@@ -228,7 +239,8 @@ public class AutoscalerCloudControllerClient {
     public void terminateAllInstances(String clusterId) throws RemoteException,
             CloudControllerServiceInvalidClusterExceptionException {
         if (log.isInfoEnabled()) {
-            log.info(String.format("Terminating all instances of cluster via cloud controller: [cluster] %s", clusterId));
+            log.info(String.format("Terminating all instances of cluster via cloud controller: " +
+                    "[cluster] %s", clusterId));
         }
         long startTime = System.currentTimeMillis();
         stub.terminateInstances(clusterId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index c2d2be6..5c335b1 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -49,7 +49,8 @@ public class RuleTasksDelegator {
 
     private static final Log log = LogFactory.getLog(RuleTasksDelegator.class);
 
-    public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative, int timeInterval) {
+    public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative,
+                                                 int timeInterval) {
         double predictedValue;
 //        s = u * t + 0.5 * a * t * t
         if (log.isDebugEnabled()) {
@@ -176,19 +177,21 @@ public class RuleTasksDelegator {
      * @param clusterId                      Cluster id
      * @param clusterInstanceId              Instance id
      * @param isPrimary                      Is a primary member
+     * @param autoscalingReason              scaling reason for member
+     * @param scalingTime                    scaling time
      */
     public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionContext, String clusterId,
-                              String clusterInstanceId, boolean isPrimary) {
+                              String clusterInstanceId, boolean isPrimary, String autoscalingReason, long scalingTime) {
 
         try {
             String nwPartitionId = clusterMonitorPartitionContext.getNetworkPartitionId();
-			String nwPartitionUuid=null;
-	        NetworkPartition[] networkPartitionList= CloudControllerServiceClient.getInstance().getNetworkPartitions();
-	        for(int i=0;i<networkPartitionList.length;i++){
-		        if(networkPartitionList[i].getUuid().equals(nwPartitionId)){
-			        nwPartitionUuid=networkPartitionList[i].getUuid();
-		        }
-	        }
+            String nwPartitionUuid = null;
+            NetworkPartition[] networkPartitionList = CloudControllerServiceClient.getInstance().getNetworkPartitions();
+            for (int i = 0; i < networkPartitionList.length; i++) {
+                if (networkPartitionList[i].getUuid().equals(nwPartitionId)) {
+                    nwPartitionUuid = networkPartitionList[i].getUuid();
+                }
+            }
 
             // Calculate accumulation of minimum counts of all the partition of current network partition
             int minimumCountOfNetworkPartition;
@@ -204,18 +207,18 @@ public class RuleTasksDelegator {
             MemberContext memberContext =
                     AutoscalerCloudControllerClient.getInstance()
                             .startInstance(clusterMonitorPartitionContext.getPartition(),
-                                           clusterId,
-                                           clusterInstanceId,
-                                           nwPartitionUuid,
-                                           isPrimary,
-                                           minimumCountOfNetworkPartition);
+                                    clusterId,
+                                    clusterInstanceId, clusterMonitorPartitionContext.getNetworkPartitionId(),
+                                    isPrimary,
+                                    minimumCountOfNetworkPartition, autoscalingReason, scalingTime);
             if (memberContext != null) {
                 ClusterLevelPartitionContext partitionContext = clusterInstanceContext.
                         getPartitionCtxt(clusterMonitorPartitionContext.getPartitionId());
                 partitionContext.addPendingMember(memberContext);
                 partitionContext.addMemberStatsContext(new MemberStatsContext(memberContext.getMemberId()));
                 if (log.isDebugEnabled()) {
-                    log.debug(String.format("Pending member added, [member] %s [partition] %s", memberContext.getMemberId(),
+                    log.debug(String.format("Pending member added, [member] %s [partition] %s",
+                            memberContext.getMemberId(),
                             memberContext.getPartition().getId()));
                 }
 
@@ -254,13 +257,14 @@ public class RuleTasksDelegator {
         clusterMonitor.sendScalingOverMaxEvent(networkPartitionId, instanceId);
     }
 
-    public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId, String instanceId) {
+    public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId,
+                                                         String instanceId) {
         if (log.isDebugEnabled()) {
             log.debug("Scaling down lower min notification is going to the [parentInstance] " + instanceId);
         }
         //Notify parent for checking scaling dependencies
         ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
-	    clusterMonitor.sendScalingDownBeyondMinEvent(networkPartitionId, instanceId);
+        clusterMonitor.sendScalingDownBeyondMinEvent(networkPartitionId, instanceId);
     }
 
     public void delegateTerminate(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) {
@@ -277,8 +281,8 @@ public class RuleTasksDelegator {
                 clusterMonitorPartitionContext.removeMemberStatsContext(memberId);
             } else if (clusterMonitorPartitionContext.pendingMemberAvailable(memberId)) {
 
-                log.info(String.format("[scale-down] Moving pending member to termination pending list [member id] %s " +
-                                "[partition] %s [network partition] %s", memberId,
+                log.info(String.format("[scale-down] Moving pending member to termination pending list " +
+                                "[member id] %s " + "[partition] %s [network partition] %s", memberId,
                         clusterMonitorPartitionContext.getPartitionId(),
                         clusterMonitorPartitionContext.getNetworkPartitionId()));
                 clusterMonitorPartitionContext.movePendingMemberToObsoleteMembers(memberId);
@@ -289,7 +293,8 @@ public class RuleTasksDelegator {
         }
     }
 
-    public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) {
+    public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext,
+                                            String memberId) {
         try {
             //calling SM to send the instance notification event.
             if (log.isDebugEnabled()) {
@@ -374,7 +379,8 @@ public class RuleTasksDelegator {
 
                 float memberMemoryConsumptionAverage = memberStatsContext.getMemoryConsumption().getAverage();
                 float memberMemoryConsumptionGredient = memberStatsContext.getMemoryConsumption().getGradient();
-                float memberMemoryConsumptionSecondDerivative = memberStatsContext.getMemoryConsumption().getSecondDerivative();
+                float memberMemoryConsumptionSecondDerivative =
+                        memberStatsContext.getMemoryConsumption().getSecondDerivative();
 
                 double memberPredictedMemoryConsumption = getPredictedValueForNextMinute(memberMemoryConsumptionAverage,
                         memberMemoryConsumptionGredient, memberMemoryConsumptionSecondDerivative, 1);

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
index 74c5156..5ab2ebf 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
@@ -51,7 +51,8 @@ public class HealthStatisticsNotifier implements Runnable {
             File pluginFile = new File(pluginFileName);
             if ((pluginFile != null)
                     && (pluginFile.exists())) {
-                List<Class> pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile, IHealthStatisticsReader.class);
+                List<Class> pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile,
+                        IHealthStatisticsReader.class);
                 if (!pluginClass.isEmpty()) {
                     try {
                         log.trace("Instantiating new instance of plugin type " + pluginClass);
@@ -63,7 +64,8 @@ public class HealthStatisticsNotifier implements Runnable {
                     }
                 }
             } else {
-                log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" : "Doesn't exist"));
+                log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" :
+                        "Doesn't exist"));
             }
         }
         if (this.statsReader == null) {
@@ -95,7 +97,7 @@ public class HealthStatisticsNotifier implements Runnable {
                         if (log.isDebugEnabled()) {
                             log.debug(String.format("Publishing memory consumption: %f", stats.getMemoryUsage()));
                         }
-                        statsPublisher.publish(
+                        statsPublisher.publish(System.currentTimeMillis(),
                                 CartridgeAgentConfiguration.getInstance().getClusterId(),
                                 CartridgeAgentConfiguration.getInstance().getClusterInstanceId(),
                                 CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
@@ -108,7 +110,7 @@ public class HealthStatisticsNotifier implements Runnable {
                         if (log.isDebugEnabled()) {
                             log.debug(String.format("Publishing load average: %f", stats.getProcessorUsage()));
                         }
-                        statsPublisher.publish(
+                        statsPublisher.publish(System.currentTimeMillis(),
                                 CartridgeAgentConfiguration.getInstance().getClusterId(),
                                 CartridgeAgentConfiguration.getInstance().getClusterInstanceId(),
                                 CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),