You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ga...@apache.org on 2015/08/20 08:44:48 UTC
[4/5] stratos git commit: Adding Metering and Monitoring Service
Implementation
Adding Metering and Monitoring Service Implementation
Conflicts:
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/54283eda
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/54283eda
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/54283eda
Branch: refs/heads/tenant-isolation
Commit: 54283eda982c5677402b1e4ac895d95c9cccebe7
Parents: 4fdd431
Author: Thanuja <th...@wso2.com>
Authored: Wed Jul 29 18:51:34 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Thu Aug 20 10:58:32 2015 +0530
----------------------------------------------------------------------
.../client/AutoscalerCloudControllerClient.java | 16 +-
.../autoscaler/rule/RuleTasksDelegator.java | 48 +-
.../publisher/HealthStatisticsNotifier.java | 10 +-
.../messaging/topology/TopologyBuilder.java | 1434 +++++++++---------
.../impl/CloudControllerServiceImpl.java | 6 +-
.../impl/CloudControllerServiceUtil.java | 15 +-
.../services/impl/InstanceCreator.java | 17 +-
.../publisher/BAMUsageDataPublisher.java | 44 +-
.../util/CloudControllerConstants.java | 4 +
.../common/constants/StratosConstants.java | 8 +-
.../publisher/HealthStatisticsPublisher.java | 3 +-
.../publisher/InFlightRequestPublisher.java | 4 +-
.../cep/WSO2CEPHealthStatisticsPublisher.java | 9 +-
.../cep/WSO2CEPInFlightRequestPublisher.java | 6 +-
.../LoadBalancerStatisticsNotifier.java | 3 +-
.../publisher/MockHealthStatisticsNotifier.java | 3 +
.../modules/healthstatspublisher/healthstats.py | 5 +-
.../HealthStatsEventFormatter.xml | 30 +
.../eventformatters/RIFEventFormatter.xml | 31 +
.../DASDefaultWSO2EventOutputAdaptor.xml | 29 +
.../streamdefinitions/stream-manager-config.xml | 486 +++---
extensions/das/README.md | 10 +
.../CloudControllerEventReceiver.xml | 29 +
.../eventreceivers/HealthStatsEventReceiver.xml | 29 +
.../eventreceivers/RIFEventReceiver.xml | 29 +
.../eventsink/cartridge_agent_health_stats.xml | 85 ++
.../artifacts/eventsink/in_flight_requests.xml | 64 +
.../org_apache_stratos_cloud_controller.xml | 211 +++
.../cartridge_agent_health_stats_1.0.0.json | 40 +
.../eventstreams/in_flight_requests_1.0.0.json | 28 +
...g.apache.stratos.cloud.controller_1.0.0.json | 112 ++
extensions/das/artifacts/sparkscript/CCEvent | 18 +
extensions/das/pom.xml | 40 +
extensions/das/spark-udf/pom.xml | 36 +
.../das/extension/spark/udf/TimeUDF.java | 49 +
extensions/pom.xml | 4 +-
.../src/main/conf/drools/dependent-scaling.drl | 4 +-
.../src/main/conf/drools/mincheck.drl | 5 +-
.../src/main/conf/drools/scaling.drl | 7 +-
39 files changed, 2007 insertions(+), 1004 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
index 65e952f..f19531b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
@@ -84,7 +84,8 @@ public class AutoscalerCloudControllerClient {
public synchronized MemberContext startInstance(PartitionRef partition,
String clusterId, String clusterInstanceId,
String networkPartitionId, boolean isPrimary,
- int minMemberCount) throws SpawningException {
+ int minMemberCount, String autoscalingReason,
+ long scalingTime) throws SpawningException {
try {
if (log.isInfoEnabled()) {
log.info(String.format("Trying to spawn an instance via cloud controller: " +
@@ -115,8 +116,18 @@ public class AutoscalerCloudControllerClient {
minCountProp.setName(StratosConstants.MIN_COUNT);
minCountProp.setValue(String.valueOf(minMemberCount));
+ Property autoscalingReasonProp = new Property();
+ autoscalingReasonProp.setName(StratosConstants.SCALING_REASON);
+ autoscalingReasonProp.setValue(autoscalingReason);
+
+ Property scalingTimeProp = new Property();
+ scalingTimeProp.setName(StratosConstants.SCALING_TIME);
+ scalingTimeProp.setValue(String.valueOf(scalingTime));
+
memberContextProps.addProperty(isPrimaryProp);
memberContextProps.addProperty(minCountProp);
+ memberContextProps.addProperty(autoscalingReasonProp);
+ memberContextProps.addProperty(scalingTimeProp);
instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps));
long startTime = System.currentTimeMillis();
@@ -228,7 +239,8 @@ public class AutoscalerCloudControllerClient {
public void terminateAllInstances(String clusterId) throws RemoteException,
CloudControllerServiceInvalidClusterExceptionException {
if (log.isInfoEnabled()) {
- log.info(String.format("Terminating all instances of cluster via cloud controller: [cluster] %s", clusterId));
+ log.info(String.format("Terminating all instances of cluster via cloud controller: " +
+ "[cluster] %s", clusterId));
}
long startTime = System.currentTimeMillis();
stub.terminateInstances(clusterId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index c2d2be6..5c335b1 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -49,7 +49,8 @@ public class RuleTasksDelegator {
private static final Log log = LogFactory.getLog(RuleTasksDelegator.class);
- public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative, int timeInterval) {
+ public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative,
+ int timeInterval) {
double predictedValue;
// s = u * t + 0.5 * a * t * t
if (log.isDebugEnabled()) {
@@ -176,19 +177,21 @@ public class RuleTasksDelegator {
* @param clusterId Cluster id
* @param clusterInstanceId Instance id
* @param isPrimary Is a primary member
+ * @param autoscalingReason scaling reason for member
+ * @param scalingTime scaling time
*/
public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionContext, String clusterId,
- String clusterInstanceId, boolean isPrimary) {
+ String clusterInstanceId, boolean isPrimary, String autoscalingReason, long scalingTime) {
try {
String nwPartitionId = clusterMonitorPartitionContext.getNetworkPartitionId();
- String nwPartitionUuid=null;
- NetworkPartition[] networkPartitionList= CloudControllerServiceClient.getInstance().getNetworkPartitions();
- for(int i=0;i<networkPartitionList.length;i++){
- if(networkPartitionList[i].getUuid().equals(nwPartitionId)){
- nwPartitionUuid=networkPartitionList[i].getUuid();
- }
- }
+ String nwPartitionUuid = null;
+ NetworkPartition[] networkPartitionList = CloudControllerServiceClient.getInstance().getNetworkPartitions();
+ for (int i = 0; i < networkPartitionList.length; i++) {
+ if (networkPartitionList[i].getUuid().equals(nwPartitionId)) {
+ nwPartitionUuid = networkPartitionList[i].getUuid();
+ }
+ }
// Calculate accumulation of minimum counts of all the partition of current network partition
int minimumCountOfNetworkPartition;
@@ -204,18 +207,18 @@ public class RuleTasksDelegator {
MemberContext memberContext =
AutoscalerCloudControllerClient.getInstance()
.startInstance(clusterMonitorPartitionContext.getPartition(),
- clusterId,
- clusterInstanceId,
- nwPartitionUuid,
- isPrimary,
- minimumCountOfNetworkPartition);
+ clusterId,
+ clusterInstanceId, clusterMonitorPartitionContext.getNetworkPartitionId(),
+ isPrimary,
+ minimumCountOfNetworkPartition, autoscalingReason, scalingTime);
if (memberContext != null) {
ClusterLevelPartitionContext partitionContext = clusterInstanceContext.
getPartitionCtxt(clusterMonitorPartitionContext.getPartitionId());
partitionContext.addPendingMember(memberContext);
partitionContext.addMemberStatsContext(new MemberStatsContext(memberContext.getMemberId()));
if (log.isDebugEnabled()) {
- log.debug(String.format("Pending member added, [member] %s [partition] %s", memberContext.getMemberId(),
+ log.debug(String.format("Pending member added, [member] %s [partition] %s",
+ memberContext.getMemberId(),
memberContext.getPartition().getId()));
}
@@ -254,13 +257,14 @@ public class RuleTasksDelegator {
clusterMonitor.sendScalingOverMaxEvent(networkPartitionId, instanceId);
}
- public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId, String instanceId) {
+ public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId,
+ String instanceId) {
if (log.isDebugEnabled()) {
log.debug("Scaling down lower min notification is going to the [parentInstance] " + instanceId);
}
//Notify parent for checking scaling dependencies
ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
- clusterMonitor.sendScalingDownBeyondMinEvent(networkPartitionId, instanceId);
+ clusterMonitor.sendScalingDownBeyondMinEvent(networkPartitionId, instanceId);
}
public void delegateTerminate(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) {
@@ -277,8 +281,8 @@ public class RuleTasksDelegator {
clusterMonitorPartitionContext.removeMemberStatsContext(memberId);
} else if (clusterMonitorPartitionContext.pendingMemberAvailable(memberId)) {
- log.info(String.format("[scale-down] Moving pending member to termination pending list [member id] %s " +
- "[partition] %s [network partition] %s", memberId,
+ log.info(String.format("[scale-down] Moving pending member to termination pending list " +
+ "[member id] %s " + "[partition] %s [network partition] %s", memberId,
clusterMonitorPartitionContext.getPartitionId(),
clusterMonitorPartitionContext.getNetworkPartitionId()));
clusterMonitorPartitionContext.movePendingMemberToObsoleteMembers(memberId);
@@ -289,7 +293,8 @@ public class RuleTasksDelegator {
}
}
- public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) {
+ public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext,
+ String memberId) {
try {
//calling SM to send the instance notification event.
if (log.isDebugEnabled()) {
@@ -374,7 +379,8 @@ public class RuleTasksDelegator {
float memberMemoryConsumptionAverage = memberStatsContext.getMemoryConsumption().getAverage();
float memberMemoryConsumptionGredient = memberStatsContext.getMemoryConsumption().getGradient();
- float memberMemoryConsumptionSecondDerivative = memberStatsContext.getMemoryConsumption().getSecondDerivative();
+ float memberMemoryConsumptionSecondDerivative =
+ memberStatsContext.getMemoryConsumption().getSecondDerivative();
double memberPredictedMemoryConsumption = getPredictedValueForNextMinute(memberMemoryConsumptionAverage,
memberMemoryConsumptionGredient, memberMemoryConsumptionSecondDerivative, 1);
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
index 74c5156..5ab2ebf 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
@@ -51,7 +51,8 @@ public class HealthStatisticsNotifier implements Runnable {
File pluginFile = new File(pluginFileName);
if ((pluginFile != null)
&& (pluginFile.exists())) {
- List<Class> pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile, IHealthStatisticsReader.class);
+ List<Class> pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile,
+ IHealthStatisticsReader.class);
if (!pluginClass.isEmpty()) {
try {
log.trace("Instantiating new instance of plugin type " + pluginClass);
@@ -63,7 +64,8 @@ public class HealthStatisticsNotifier implements Runnable {
}
}
} else {
- log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" : "Doesn't exist"));
+ log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" :
+ "Doesn't exist"));
}
}
if (this.statsReader == null) {
@@ -95,7 +97,7 @@ public class HealthStatisticsNotifier implements Runnable {
if (log.isDebugEnabled()) {
log.debug(String.format("Publishing memory consumption: %f", stats.getMemoryUsage()));
}
- statsPublisher.publish(
+ statsPublisher.publish(System.currentTimeMillis(),
CartridgeAgentConfiguration.getInstance().getClusterId(),
CartridgeAgentConfiguration.getInstance().getClusterInstanceId(),
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
@@ -108,7 +110,7 @@ public class HealthStatisticsNotifier implements Runnable {
if (log.isDebugEnabled()) {
log.debug(String.format("Publishing load average: %f", stats.getProcessorUsage()));
}
- statsPublisher.publish(
+ statsPublisher.publish(System.currentTimeMillis(),
CartridgeAgentConfiguration.getInstance().getClusterId(),
CartridgeAgentConfiguration.getInstance().getClusterInstanceId(),
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),