You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ga...@apache.org on 2015/11/23 16:04:15 UTC
[1/6] stratos git commit: Fixing jira issue STRATOS-1629 - DAS
publishers created each time when an event is published from stratos to DAS
Repository: stratos
Updated Branches:
refs/heads/stratos-4.1.x 6496b05a3 -> 509bd6bdc
Fixing jira issue STRATOS-1629 - DAS publishers created each time when an event is published from stratos to DAS
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/65aee8a8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/65aee8a8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/65aee8a8
Branch: refs/heads/stratos-4.1.x
Commit: 65aee8a81ffec254486e88b5c53eefa58f06dea8
Parents: af13aeb
Author: Thanuja <th...@wso2.com>
Authored: Mon Nov 23 11:27:12 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Mon Nov 23 11:28:38 2015 +0530
----------------------------------------------------------------------
.../monitor/cluster/ClusterMonitor.java | 21 +++++++++---
.../messaging/topology/TopologyBuilder.java | 34 +++-----------------
.../src/main/conf/drools/dependent-scaling.drl | 8 ++---
.../src/main/conf/drools/mincheck.drl | 9 ++----
.../src/main/conf/drools/scaling.drl | 8 ++---
.../src/test/resources/common/scaling.drl | 8 ++---
6 files changed, 31 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
index 28c2d95..5976279 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
@@ -41,6 +41,8 @@ import org.apache.stratos.autoscaler.monitor.events.ScalingEvent;
import org.apache.stratos.autoscaler.monitor.events.ScalingUpBeyondMaxEvent;
import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder;
import org.apache.stratos.autoscaler.rule.RuleTasksDelegator;
+import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory;
+import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher;
import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiveProcessor;
import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInactiveProcessor;
import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
@@ -51,6 +53,7 @@ import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
import org.apache.stratos.common.Properties;
import org.apache.stratos.common.client.CloudControllerServiceClient;
import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.domain.application.ApplicationStatus;
import org.apache.stratos.messaging.domain.application.GroupStatus;
@@ -98,7 +101,8 @@ public class ClusterMonitor extends Monitor {
private boolean hasScalingDependents;
private boolean groupScalingEnabledSubtree;
private String deploymentPolicyId;
-
+ private ScalingDecisionPublisher scalingDecisionPublisher =
+ AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS);
public ClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree,
String deploymentPolicyId) {
@@ -337,6 +341,9 @@ public class ClusterMonitor extends Monitor {
instanceContext.getMinCheckKnowledgeSession().setGlobal("algorithmName",
paritionAlgo);
+ instanceContext.getMinCheckKnowledgeSession().setGlobal("scalingDecisionPublisher",
+ scalingDecisionPublisher);
+
if (log.isDebugEnabled()) {
log.debug(String.format("Running minimum check for [cluster instance] %s, " +
"[cluster id] %s",
@@ -350,7 +357,7 @@ public class ClusterMonitor extends Monitor {
if (log.isDebugEnabled()) {
log.debug(String.format("Running maximum check for [cluster instance] %s, " +
- "[cluster id] %s", instanceContext.getId(), clusterId));
+ "[cluster id] %s", instanceContext.getId(), clusterId));
}
instanceContext.setMaxCheckFactHandle(evaluate(instanceContext.
getMaxCheckKnowledgeSession(),
@@ -376,6 +383,8 @@ public class ClusterMonitor extends Monitor {
clusterContext.getAutoscalePolicy());
instanceContext.getScaleCheckKnowledgeSession().setGlobal("arspiReset",
averageRequestServedPerInstanceReset);
+ instanceContext.getScaleCheckKnowledgeSession().setGlobal("scalingDecisionPublisher",
+ scalingDecisionPublisher);
if (log.isDebugEnabled()) {
log.debug("Running scale check, [Is rif Reset] " + rifReset + ", " +
"[Is memoryConsumption Reset] " + memoryConsumptionReset + ", " +
@@ -552,8 +561,12 @@ public class ClusterMonitor extends Monitor {
clusterInstanceContext.setRequiredInstanceCountBasedOnDependencies(roundedRequiredInstanceCount);
clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("roundedRequiredInstanceCount", roundedRequiredInstanceCount);
- clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("algorithmName", clusterInstanceContext.getPartitionAlgorithm());
+ clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("roundedRequiredInstanceCount",
+ roundedRequiredInstanceCount);
+ clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("algorithmName",
+ clusterInstanceContext.getPartitionAlgorithm());
+ clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("scalingDecisionPublisher",
+ scalingDecisionPublisher);
if (log.isDebugEnabled()) {
log.debug(String.format("Running dependent scale check for [cluster instance] %s, " +
http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index da38337..90fa8bd 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -57,6 +57,11 @@ import java.util.*;
*/
public class TopologyBuilder {
private static final Log log = LogFactory.getLog(TopologyBuilder.class);
+ private static MemberInformationPublisher memInfoPublisher = CloudControllerPublisherFactory.
+ createMemberInformationPublisher(StatisticsPublisherType.WSO2DAS);
+
+ private static MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+ createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
public static void handleServiceCreated(List<Cartridge> cartridgeList) throws RegistryException {
Service service;
@@ -352,9 +357,6 @@ public class TopologyBuilder {
//member created time
Long timestamp = System.currentTimeMillis();
//publishing member status to DAS
- MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
- createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
-
if (memStatusPublisher.isEnabled()) {
if (log.isDebugEnabled()) {
log.debug("Publishing Member Status to DAS");
@@ -363,8 +365,6 @@ public class TopologyBuilder {
memberContext.getClusterInstanceId(), memberContext.getCartridgeType(),
memberContext.getNetworkPartitionId(), memberContext.getPartition().getId(),
memberContext.getMemberId(), MemberStatus.Created.toString());
- } else {
- log.warn("Member Status Publisher is not enabled");
}
} finally {
@@ -435,11 +435,6 @@ public class TopologyBuilder {
Long timestamp = System.currentTimeMillis();
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
//publishing member information and status to DAS
- MemberInformationPublisher memInfoPublisher = CloudControllerPublisherFactory.
- createMemberInformationPublisher(StatisticsPublisherType.WSO2DAS);
-
- MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
- createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
if (memInfoPublisher.isEnabled()) {
if (log.isInfoEnabled()) {
@@ -460,8 +455,6 @@ public class TopologyBuilder {
memberContext.getClusterInstanceId(), memberContext.getCartridgeType(),
memberContext.getNetworkPartitionId(), memberContext.getPartition().getId(),
memberContext.getMemberId(), MemberStatus.Initialized.toString());
- } else {
- log.warn("Member status publisher is not enabled");
}
}
} finally {
@@ -519,9 +512,6 @@ public class TopologyBuilder {
//memberStartedEvent.
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
//publishing member status to DAS
- MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
- createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
-
if (memStatusPublisher.isEnabled()) {
if (log.isDebugEnabled()) {
log.debug("Publishing Member Status to DAS");
@@ -533,8 +523,6 @@ public class TopologyBuilder {
instanceStartedEvent.getNetworkPartitionId(),
instanceStartedEvent.getPartitionId(), instanceStartedEvent.getMemberId(),
MemberStatus.Starting.toString());
- } else {
- log.warn("Member Status Publisher is not enabled");
}
}
} finally {
@@ -632,8 +620,6 @@ public class TopologyBuilder {
TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
//publishing member status to DAS
- MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
- createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
if (memStatusPublisher.isEnabled()) {
if (log.isDebugEnabled()) {
log.debug("Publishing Member Status to DAS");
@@ -643,8 +629,6 @@ public class TopologyBuilder {
memberActivatedEvent.getClusterInstanceId(), memberActivatedEvent.getServiceName(),
memberActivatedEvent.getNetworkPartitionId(), memberActivatedEvent.getPartitionId(),
memberActivatedEvent.getMemberId(), MemberStatus.Active.toString());
- } else {
- log.warn("Member Status Publisher is not enabled");
}
}
} finally {
@@ -700,8 +684,6 @@ public class TopologyBuilder {
}
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
//publishing member status to DAS.
- MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
- createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
if (memStatusPublisher.isEnabled()) {
if (log.isDebugEnabled()) {
log.debug("Publishing Member Status to DAS");
@@ -713,8 +695,6 @@ public class TopologyBuilder {
instanceReadyToShutdownEvent.getNetworkPartitionId(),
instanceReadyToShutdownEvent.getPartitionId(), instanceReadyToShutdownEvent.getMemberId(),
MemberStatus.ReadyToShutDown.toString());
- } else {
- log.warn("Member Status Publisher is not enabled");
}
//termination of particular instance will be handled by autoscaler
}
@@ -814,8 +794,6 @@ public class TopologyBuilder {
partitionId, properties, groupAlias);
//publishing member status to DAS.
- MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
- createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
if (memStatusPublisher.isEnabled()) {
if (log.isDebugEnabled()) {
log.debug("Publishing Member Status to DAS");
@@ -823,8 +801,6 @@ public class TopologyBuilder {
memStatusPublisher.publish(timestamp, applicationId, member.getClusterId(), clusterAlias,
member.getClusterInstanceId(), member.getServiceName(), member.getNetworkPartitionId(),
member.getPartitionId(), member.getMemberId(), MemberStatus.Terminated.toString());
- } else {
- log.warn("Member Status Publisher is not enabled");
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
index c4a1141..72c5536 100644
--- a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
@@ -28,15 +28,13 @@ import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadThresholds;
import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption;
import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage;
import java.util.UUID;
-import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher;
-import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
global org.apache.stratos.autoscaler.rule.RuleLog log;
global java.lang.String clusterId;
global Integer roundedRequiredInstanceCount;
global org.apache.stratos.autoscaler.rule.RuleTasksDelegator delegator;
global java.lang.String algorithmName;
+global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher;
rule "Dependent Scaling Rule"
dialect "mvel"
@@ -82,7 +80,7 @@ dialect "mvel"
String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString();
long scalingTime = System.currentTimeMillis();
String scalingReason = "DEPENDENCY";
- ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS);
+
if (scalingDecisionPublisher.isEnabled()) {
log.debug("Publishing scaling decision to DAS");
scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId,
@@ -90,8 +88,6 @@ dialect "mvel"
0, 0, 0, 0, 0, 0,0, 0, 0,
additionalInstances + nonTerminatedMembers,
0, additionalInstances, scalingReason);
- } else {
- log.warn("Scaling decision publisher is not enabled");
}
while(count != additionalInstances && partitionsAvailable) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
index 5ce6d21..250676d 100755
--- a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
@@ -38,9 +38,6 @@ import org.apache.stratos.cloud.controller.stub.domain.Partition;
import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
import java.util.UUID;
-import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher;
-import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
global org.apache.stratos.autoscaler.rule.RuleLog log;
global org.apache.stratos.autoscaler.pojo.policy.PolicyManager manager;
@@ -49,6 +46,7 @@ global org.apache.stratos.autoscaler.rule.RuleTasksDelegator delegator;
global java.util.Map partitionCtxts;
global java.lang.String clusterId;
global java.lang.String algorithmName;
+global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher;
rule "Minimum Rule"
dialect "mvel"
@@ -74,7 +72,7 @@ dialect "mvel"
String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString();
long scalingTime = System.currentTimeMillis();
String scalingReason = "MIN";
- ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS);
+
if (scalingDecisionPublisher.isEnabled()) {
log.debug("Publishing scaling decision to DAS");
scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId,
@@ -83,9 +81,8 @@ dialect "mvel"
0, 0, 0, 0, 0, 0,0, 0, 0,
clusterInstanceContext.getMinInstanceCount(), 0,
additionalInstances, scalingReason);
- } else {
- log.warn("Scaling decision publisher is not enabled");
}
+
while(count != additionalInstances && partitionsAvailable){
ClusterLevelPartitionContext partitionContext = (ClusterLevelPartitionContext)partitionAlgorithm.getNextScaleUpPartitionContext(clusterInstanceContext.getPartitionCtxtsAsAnArray());
http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
index 912685c..f367652 100644
--- a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
@@ -40,9 +40,6 @@ import org.apache.stratos.cloud.controller.stub.domain.Partition;
import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
import java.util.UUID;
-import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher;
-import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage
import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption
@@ -56,6 +53,7 @@ global java.lang.Boolean mcReset;
global java.lang.Boolean laReset;
global java.lang.Boolean arspiReset;
global java.lang.String algorithmName;
+global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher;
rule "Scaling Rule"
dialect "mvel"
@@ -168,7 +166,7 @@ dialect "mvel"
String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString();
long scalingTime = System.currentTimeMillis();
- ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS);
+
if (scalingDecisionPublisher.isEnabled()) {
log.debug("Publishing scaling decision to DAS");
scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId,
@@ -178,8 +176,6 @@ dialect "mvel"
laPredictedValue, laThreshold, numberOfInstancesReuquiredBasedOnLoadAverage,
numberOfRequiredInstances, activeInstancesCount,
additionalInstances, scalingReason);
- } else {
- log.warn("Scaling decision publisher is not enabled");
}
while(count != additionalInstances && partitionsAvailable){
http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl b/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl
index 4fc73fd..c4af71c 100644
--- a/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl
+++ b/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl
@@ -40,9 +40,6 @@ import org.apache.stratos.cloud.controller.stub.domain.Partition;
import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
import java.util.UUID;
-import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher;
-import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage
import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption
@@ -56,6 +53,7 @@ global java.lang.Boolean mcReset;
global java.lang.Boolean laReset;
global java.lang.Boolean arspiReset;
global java.lang.String algorithmName;
+global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher;
rule "Scaling Rule"
dialect "mvel"
@@ -169,7 +167,7 @@ dialect "mvel"
String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString();
long scalingTime = System.currentTimeMillis();
- ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS);
+
if (scalingDecisionPublisher.isEnabled()) {
log.debug("Publishing scaling decision to DAS");
scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId,
@@ -179,8 +177,6 @@ dialect "mvel"
laPredictedValue, laThreshold, numberOfInstancesReuquiredBasedOnLoadAverage,
numberOfRequiredInstances, activeInstancesCount,
additionalInstances, scalingReason);
- } else {
- log.warn("Scaling decision publisher is not enabled");
}
while(count != additionalInstances && partitionsAvailable){
[3/6] stratos git commit: Simplyfied isPublisherEnabled() logic
Posted by ga...@apache.org.
Simplyfied isPublisherEnabled() logic
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/f3a809b7
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/f3a809b7
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/f3a809b7
Branch: refs/heads/stratos-4.1.x
Commit: f3a809b768cb4275e30d0223b2cd5c374a76f02e
Parents: 65aee8a
Author: Thanuja <th...@wso2.com>
Authored: Mon Nov 23 12:31:03 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Mon Nov 23 12:31:03 2015 +0530
----------------------------------------------------------------------
.../statistics/publisher/ThriftStatisticsPublisher.java | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/f3a809b7/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
index 16dba16..4552f92 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -63,14 +63,12 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher {
}
private boolean isPublisherEnabled() {
- boolean publisherEnabled = false;
for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
- publisherEnabled = thriftClientInfo.isStatsPublisherEnabled();
- if (publisherEnabled) {
- break;
+ if (thriftClientInfo.isStatsPublisherEnabled()) {
+ return true;
}
}
- return publisherEnabled;
+ return false;
}
private void init() {
[5/6] stratos git commit: Moving STATS_PUBLISHER_THREAD_POOL_SIZE to
CloudControllerConstants to have common config for member status and info
publishers
Posted by ga...@apache.org.
Moving STATS_PUBLISHER_THREAD_POOL_SIZE to CloudControllerConstants to have common config for member status and info publishers
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/05d1c187
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/05d1c187
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/05d1c187
Branch: refs/heads/stratos-4.1.x
Commit: 05d1c187e1aeb3efeef4c1d4f6282ea41de51d63
Parents: 609bc9d
Author: Thanuja <th...@wso2.com>
Authored: Mon Nov 23 19:11:33 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Mon Nov 23 19:11:33 2015 +0530
----------------------------------------------------------------------
.../statistics/publisher/DASMemberInformationPublisher.java | 3 +--
.../controller/statistics/publisher/DASMemberStatusPublisher.java | 3 +--
.../stratos/cloud/controller/util/CloudControllerConstants.java | 1 +
3 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/05d1c187/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
index 621f9e2..8107e1b 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -47,14 +47,13 @@ public class DASMemberInformationPublisher extends MemberInformationPublisher {
private static final String DATA_STREAM_NAME = "member_info";
private static final String VERSION = "1.0.0";
private static final String DAS_THRIFT_CLIENT_NAME = "das";
- private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
private static final String VALUE_NOT_FOUND = "Value Not Found";
private ExecutorService executorService;
private DASMemberInformationPublisher() {
super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
- STATS_PUBLISHER_THREAD_POOL_SIZE);
+ CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE);
}
public static DASMemberInformationPublisher getInstance() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/05d1c187/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
index 7a291ab..6bb6251 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -41,13 +41,12 @@ public class DASMemberStatusPublisher extends MemberStatusPublisher {
private static final String DATA_STREAM_NAME = "member_lifecycle";
private static final String VERSION = "1.0.0";
private static final String DAS_THRIFT_CLIENT_NAME = "das";
- private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
private ExecutorService executorService;
private DASMemberStatusPublisher() {
super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
- STATS_PUBLISHER_THREAD_POOL_SIZE);
+ CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE);
}
public static DASMemberStatusPublisher getInstance() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/05d1c187/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
index ccd8d34..29facf0 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
@@ -163,6 +163,7 @@ public final class CloudControllerConstants {
public static final String SCALING_DECISION_ID_COL = "scaling_decision_id";
public static final String STATS_PUBLISHER_THREAD_POOL_ID = "cloud.controller.stats.publisher.thread.pool";
+ public static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
/**
* Properties
[4/6] stratos git commit: Changing publisher classes hierarchy
Posted by ga...@apache.org.
Changing publisher classes hierarchy
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/609bc9d9
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/609bc9d9
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/609bc9d9
Branch: refs/heads/stratos-4.1.x
Commit: 609bc9d9013fc3fb13d5b4a467528eb70540dccf
Parents: f3a809b
Author: Thanuja <th...@wso2.com>
Authored: Mon Nov 23 18:24:30 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Mon Nov 23 18:24:30 2015 +0530
----------------------------------------------------------------------
.../publisher/DASScalingDecisionPublisher.java | 5 ++--
.../publisher/ScalingDecisionPublisher.java | 24 ++++++++++++--------
.../DASMemberInformationPublisher.java | 8 +++----
.../publisher/DASMemberStatusPublisher.java | 8 +++----
.../publisher/MemberInformationPublisher.java | 12 +++++++---
.../publisher/MemberStatusPublisher.java | 16 +++++++++----
...InvalidStatisticsPublisherTypeException.java | 2 +-
.../publisher/HealthStatisticsPublisher.java | 12 +++++++---
.../publisher/InFlightRequestPublisher.java | 11 +++++++--
.../publisher/ThriftStatisticsPublisher.java | 4 +---
.../cep/WSO2CEPHealthStatisticsPublisher.java | 5 ++--
.../cep/WSO2CEPInFlightRequestPublisher.java | 19 ++++++++--------
12 files changed, 76 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
index 097c568..52857d4 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.autoscaler.statistics.publisher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.util.AutoscalerConstants;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
@@ -35,7 +34,7 @@ import java.util.concurrent.ExecutorService;
/**
* MemberInfoPublisher to publish member information/metadata to DAS.
*/
-public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher implements ScalingDecisionPublisher {
+public class DASScalingDecisionPublisher extends ScalingDecisionPublisher {
private static final Log log = LogFactory.getLog(DASScalingDecisionPublisher.class);
private static volatile DASScalingDecisionPublisher dasScalingDecisionPublisher;
@@ -165,7 +164,7 @@ public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher imple
payload.add(activeInstanceCount);
payload.add(additionalInstanceCount);
payload.add(scalingReason);
- DASScalingDecisionPublisher.super.publish(payload.toArray());
+ publish(payload.toArray());
}
};
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
index f7b0087..fe791f9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
@@ -19,12 +19,18 @@
package org.apache.stratos.autoscaler.statistics.publisher;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
/**
* Scaling Decision Publisher interface.
*/
-public interface ScalingDecisionPublisher extends StatisticsPublisher {
+public abstract class ScalingDecisionPublisher extends ThriftStatisticsPublisher {
+
+ public ScalingDecisionPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+ super(streamDefinition, thriftClientName);
+ }
+
/**
* Publishing scaling decision to DAS.
*
@@ -47,11 +53,11 @@ public interface ScalingDecisionPublisher extends StatisticsPublisher {
* @param additionalInstanceCount Additional Instance Needed
* @param scalingReason Scaling Reason
*/
- public void publish(Long timestamp, String scalingDecisionId, String clusterId,
- int minInstanceCount, int maxInstanceCount,
- int rifPredicted, int rifThreshold, int rifRequiredInstances,
- int mcPredicted, int mcThreshold, int mcRequiredInstances,
- int laPredicted, int laThreshold, int laRequiredInstance,
- int requiredInstanceCount, int activeInstanceCount, int additionalInstanceCount,
- String scalingReason);
+ public abstract void publish(Long timestamp, String scalingDecisionId, String clusterId,
+ int minInstanceCount, int maxInstanceCount,
+ int rifPredicted, int rifThreshold, int rifRequiredInstances,
+ int mcPredicted, int mcThreshold, int mcRequiredInstances,
+ int laPredicted, int laThreshold, int laRequiredInstance,
+ int requiredInstanceCount, int activeInstanceCount, int additionalInstanceCount,
+ String scalingReason);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
index 4ab65e1..621f9e2 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -27,7 +27,6 @@ import org.apache.stratos.cloud.controller.domain.IaasProvider;
import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
import org.apache.stratos.cloud.controller.domain.MemberContext;
import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
@@ -41,7 +40,7 @@ import java.util.concurrent.ExecutorService;
/**
* MemberInfoPublisher to publish member information/metadata to DAS.
*/
-public class DASMemberInformationPublisher extends ThriftStatisticsPublisher implements MemberInformationPublisher {
+public class DASMemberInformationPublisher extends MemberInformationPublisher {
private static final Log log = LogFactory.getLog(DASMemberInformationPublisher.class);
private static volatile DASMemberInformationPublisher dasMemberInformationPublisher;
@@ -54,7 +53,8 @@ public class DASMemberInformationPublisher extends ThriftStatisticsPublisher imp
private DASMemberInformationPublisher() {
super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
- executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE);
+ executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+ STATS_PUBLISHER_THREAD_POOL_SIZE);
}
public static DASMemberInformationPublisher getInstance() {
@@ -158,7 +158,7 @@ public class DASMemberInformationPublisher extends ThriftStatisticsPublisher imp
metadata.getOperatingSystemName(), metadata.getOperatingSystemVersion(),
metadata.getOperatingSystemArchitecture(), metadata.isOperatingSystem64bit()));
}
- DASMemberInformationPublisher.super.publish(payload.toArray());
+ publish(payload.toArray());
}
}
};
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
index 332bbba..7a291ab 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.cloud.controller.statistics.publisher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
@@ -35,7 +34,7 @@ import java.util.concurrent.ExecutorService;
/**
* Publishing member status to DAS.
*/
-public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implements MemberStatusPublisher {
+public class DASMemberStatusPublisher extends MemberStatusPublisher {
private static final Log log = LogFactory.getLog(DASMemberStatusPublisher.class);
private static volatile DASMemberStatusPublisher dasMemberStatusPublisher;
@@ -47,7 +46,8 @@ public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implemen
private DASMemberStatusPublisher() {
super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
- executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE);
+ executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+ STATS_PUBLISHER_THREAD_POOL_SIZE);
}
public static DASMemberStatusPublisher getInstance() {
@@ -131,7 +131,7 @@ public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implemen
payload.add(partitionId);
payload.add(memberId);
payload.add(status);
- DASMemberStatusPublisher.super.publish(payload.toArray());
+ publish(payload.toArray());
}
};
executorService.execute(publisher);
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
index ffe0380..fda1b41 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
@@ -20,12 +20,18 @@
package org.apache.stratos.cloud.controller.statistics.publisher;
import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
/**
* Member Information Publisher interface.
*/
-public interface MemberInformationPublisher extends StatisticsPublisher {
+public abstract class MemberInformationPublisher extends ThriftStatisticsPublisher {
+
+ public MemberInformationPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+ super(streamDefinition, thriftClientName);
+ }
+
/**
* Publishing member information.
*
@@ -33,6 +39,6 @@ public interface MemberInformationPublisher extends StatisticsPublisher {
* @param scalingDecisionId Scaling Decision Id
* @param metadata InstanceMetadata
*/
- public void publish(String memberId, String scalingDecisionId, InstanceMetadata metadata);
+ public abstract void publish(String memberId, String scalingDecisionId, InstanceMetadata metadata);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
index fad1006..4fa23b1 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
@@ -19,12 +19,18 @@
package org.apache.stratos.cloud.controller.statistics.publisher;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
/**
* Member Status Publisher Interface.
*/
-public interface MemberStatusPublisher extends StatisticsPublisher {
+public abstract class MemberStatusPublisher extends ThriftStatisticsPublisher {
+
+ public MemberStatusPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+ super(streamDefinition, thriftClientName);
+ }
+
/**
* Publishing member status.
*
@@ -39,7 +45,7 @@ public interface MemberStatusPublisher extends StatisticsPublisher {
* @param memberId Member Id
* @param status Member Status
*/
- void publish(Long timestamp, String applicationId, String clusterId,
- String clusterAlias, String clusterInstanceId, String serviceName,
- String networkPartitionId, String partitionId, String memberId, String status);
+ public abstract void publish(Long timestamp, String applicationId, String clusterId,
+ String clusterAlias, String clusterInstanceId, String serviceName,
+ String networkPartitionId, String partitionId, String memberId, String status);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
index 09efa1e..4609c9f 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
@@ -22,7 +22,7 @@ package org.apache.stratos.common.exception;
/**
* This exception will be thrown when trying to create a publisher with invalid statistics publisher type.
*/
-public class InvalidStatisticsPublisherTypeException extends Exception {
+public class InvalidStatisticsPublisherTypeException extends RuntimeException {
public InvalidStatisticsPublisherTypeException(String message) {
super(message);
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
index dd7ddd4..20f0ffe 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
@@ -19,10 +19,16 @@
package org.apache.stratos.common.statistics.publisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
/**
* Health statistics publisher interface.
*/
-public interface HealthStatisticsPublisher extends StatisticsPublisher {
+public abstract class HealthStatisticsPublisher extends ThriftStatisticsPublisher {
+
+ public HealthStatisticsPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+ super(streamDefinition, thriftClientName);
+ }
/**
* Publish health statistics to complex event processor.
@@ -35,6 +41,6 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
* @param health Health type: memory_consumption | load_average
* @param value Health type value
*/
- void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
- String memberId, String partitionId, String health, double value);
+ public abstract void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
+ String memberId, String partitionId, String health, double value);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
index 289be8b..af46ed1 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
@@ -19,10 +19,16 @@
package org.apache.stratos.common.statistics.publisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
/**
* In-flight request publisher interface.
*/
-public interface InFlightRequestPublisher extends StatisticsPublisher {
+public abstract class InFlightRequestPublisher extends ThriftStatisticsPublisher {
+
+ public InFlightRequestPublisher(StreamDefinition streamDefinition, String thriftClientName) {
+ super(streamDefinition, thriftClientName);
+ }
/**
* Publish in-flight request count.
@@ -32,5 +38,6 @@ public interface InFlightRequestPublisher extends StatisticsPublisher {
* @param networkPartitionId Network partition id of the cluster
* @param inFlightRequestCount In-flight request count of the cluster
*/
- void publish(String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount);
+ public abstract void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
+ int inFlightRequestCount);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
index 4552f92..95c0478 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -77,9 +77,7 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher {
loadBalancingDataPublisher = new LoadBalancingDataPublisher(getReceiverGroups());
//adding stream definition
- if (!loadBalancingDataPublisher.isStreamDefinitionAdded(streamDefinition)) {
- loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
- }
+ loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
}
private ArrayList<ReceiverGroup> getReceiverGroups() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index d025c33..03222ec 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.common.statistics.publisher.wso2.cep;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.statistics.publisher.HealthStatisticsPublisher;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -33,7 +32,7 @@ import java.util.List;
/**
* Health statistics publisher for publishing statistics to WSO2 CEP.
*/
-public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher implements HealthStatisticsPublisher {
+public class WSO2CEPHealthStatisticsPublisher extends HealthStatisticsPublisher {
private static final Log log = LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class);
private static volatile WSO2CEPHealthStatisticsPublisher wso2CEPHealthStatisticsPublisher;
@@ -109,6 +108,6 @@ public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher
payload.add(health);
payload.add(value);
- super.publish(payload.toArray());
+ publish(payload.toArray());
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index 8c9189b..862a49d 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.common.statistics.publisher.wso2.cep;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher;
-import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -36,7 +35,7 @@ import java.util.List;
* In-flight request count:
* Number of requests being served at a given moment could be identified as in-flight request count.
*/
-public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher implements InFlightRequestPublisher {
+public class WSO2CEPInFlightRequestPublisher extends InFlightRequestPublisher {
private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class);
private static volatile WSO2CEPInFlightRequestPublisher wso2CEPInFlightRequestPublisher;
private static final String DATA_STREAM_NAME = "in_flight_requests";
@@ -47,11 +46,11 @@ public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher i
super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME);
}
- public static WSO2CEPInFlightRequestPublisher getInstance() {
+ public static WSO2CEPInFlightRequestPublisher getInstance() {
if (wso2CEPInFlightRequestPublisher == null) {
- synchronized ( WSO2CEPInFlightRequestPublisher.class) {
+ synchronized (WSO2CEPInFlightRequestPublisher.class) {
if (wso2CEPInFlightRequestPublisher == null) {
- wso2CEPInFlightRequestPublisher = new WSO2CEPInFlightRequestPublisher();
+ wso2CEPInFlightRequestPublisher = new WSO2CEPInFlightRequestPublisher();
}
}
}
@@ -81,10 +80,10 @@ public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher i
/**
* Publish in-flight request count of a cluster.
*
- * @param clusterId
- * @param clusterInstanceId
- * @param networkPartitionId
- * @param inFlightRequestCount
+ * @param clusterId Cluster id
+ * @param clusterInstanceId Cluster instance id
+ * @param networkPartitionId Cluster's network partition id
+ * @param inFlightRequestCount Cluster's in-flight-request count
*/
@Override
public void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
@@ -102,6 +101,6 @@ public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher i
payload.add(networkPartitionId);
payload.add((double) inFlightRequestCount);
- super.publish(payload.toArray());
+ publish(payload.toArray());
}
}
[6/6] stratos git commit: Merge branch 'stratos-4.1.x' of
https://github.com/Thanu/stratos into stratos-4.1.x
Posted by ga...@apache.org.
Merge branch 'stratos-4.1.x' of https://github.com/Thanu/stratos into stratos-4.1.x
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/509bd6bd
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/509bd6bd
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/509bd6bd
Branch: refs/heads/stratos-4.1.x
Commit: 509bd6bdc4b7e6fd7c8a0e0fc3ec3b637e68d514
Parents: 6496b05 05d1c18
Author: gayangunarathne <ga...@wso2.com>
Authored: Mon Nov 23 20:08:45 2015 +0530
Committer: gayangunarathne <ga...@wso2.com>
Committed: Mon Nov 23 20:08:45 2015 +0530
----------------------------------------------------------------------
.../monitor/cluster/ClusterMonitor.java | 21 ++++-
.../publisher/AutoscalerPublisherFactory.java | 5 +-
.../publisher/DASScalingDecisionPublisher.java | 25 ++++--
.../publisher/ScalingDecisionPublisher.java | 24 ++++--
.../autoscaler/util/AutoscalerConstants.java | 1 +
.../messaging/topology/TopologyBuilder.java | 34 ++------
.../CloudControllerPublisherFactory.java | 9 +-
.../DASMemberInformationPublisher.java | 23 ++++--
.../publisher/DASMemberStatusPublisher.java | 22 +++--
.../publisher/MemberInformationPublisher.java | 12 ++-
.../publisher/MemberStatusPublisher.java | 16 ++--
.../util/CloudControllerConstants.java | 2 +
...InvalidStatisticsPublisherTypeException.java | 30 +++++++
.../publisher/HealthStatisticsPublisher.java | 12 ++-
.../HealthStatisticsPublisherFactory.java | 5 +-
.../publisher/InFlightRequestPublisher.java | 11 ++-
.../InFlightRequestPublisherFactory.java | 5 +-
.../publisher/ThriftStatisticsPublisher.java | 86 ++++++++++----------
.../cep/WSO2CEPHealthStatisticsPublisher.java | 20 +++--
.../cep/WSO2CEPInFlightRequestPublisher.java | 28 +++++--
.../src/main/conf/drools/dependent-scaling.drl | 8 +-
.../src/main/conf/drools/mincheck.drl | 9 +-
.../src/main/conf/drools/scaling.drl | 8 +-
.../src/test/resources/common/scaling.drl | 8 +-
24 files changed, 259 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
[2/6] stratos git commit: Refactroing thrift publisher classes
Posted by ga...@apache.org.
Refactroing thrift publisher classes
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/af13aeba
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/af13aeba
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/af13aeba
Branch: refs/heads/stratos-4.1.x
Commit: af13aebae4b78c9ba4df0286eb081d48794fc427
Parents: 962ce94
Author: Thanuja <th...@wso2.com>
Authored: Mon Nov 23 11:23:15 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Mon Nov 23 11:28:38 2015 +0530
----------------------------------------------------------------------
.../publisher/AutoscalerPublisherFactory.java | 5 +-
.../publisher/DASScalingDecisionPublisher.java | 20 ++++-
.../autoscaler/util/AutoscalerConstants.java | 1 +
.../CloudControllerPublisherFactory.java | 9 +-
.../DASMemberInformationPublisher.java | 18 +++-
.../publisher/DASMemberStatusPublisher.java | 17 +++-
.../util/CloudControllerConstants.java | 1 +
...InvalidStatisticsPublisherTypeException.java | 30 +++++++
.../HealthStatisticsPublisherFactory.java | 5 +-
.../InFlightRequestPublisherFactory.java | 5 +-
.../publisher/ThriftStatisticsPublisher.java | 92 ++++++++++----------
.../cep/WSO2CEPHealthStatisticsPublisher.java | 15 +++-
.../cep/WSO2CEPInFlightRequestPublisher.java | 15 +++-
13 files changed, 165 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
index d057108..8c688ba 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
@@ -19,6 +19,7 @@
package org.apache.stratos.autoscaler.statistics.publisher;
+import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
/**
@@ -29,9 +30,9 @@ public class AutoscalerPublisherFactory {
public static ScalingDecisionPublisher createScalingDecisionPublisher(StatisticsPublisherType type) {
if (type == StatisticsPublisherType.WSO2DAS) {
- return new DASScalingDecisionPublisher();
+ return DASScalingDecisionPublisher.getInstance();
} else {
- throw new RuntimeException("Unknown statistics publisher type");
+ throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher.");
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
index a907043..097c568 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
@@ -38,22 +38,36 @@ import java.util.concurrent.ExecutorService;
public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher implements ScalingDecisionPublisher {
private static final Log log = LogFactory.getLog(DASScalingDecisionPublisher.class);
+ private static volatile DASScalingDecisionPublisher dasScalingDecisionPublisher;
private static final String DATA_STREAM_NAME = "scaling_decision";
private static final String VERSION = "1.0.0";
private static final String DAS_THRIFT_CLIENT_NAME = "das";
+ private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
private ExecutorService executorService;
public DASScalingDecisionPublisher() {
super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
- executorService = StratosThreadPool.getExecutorService("autoscaler.stats.publisher.thread.pool", 10);
+ executorService = StratosThreadPool.getExecutorService(AutoscalerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+ STATS_PUBLISHER_THREAD_POOL_SIZE);
+ }
+
+ public static DASScalingDecisionPublisher getInstance() {
+ if (dasScalingDecisionPublisher == null) {
+ synchronized (DASScalingDecisionPublisher.class) {
+ if (dasScalingDecisionPublisher == null) {
+ dasScalingDecisionPublisher = new DASScalingDecisionPublisher();
+ }
+ }
+ }
+ return dasScalingDecisionPublisher;
}
private static StreamDefinition createStreamDefinition() {
try {
// Create stream definition
StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
- streamDefinition.setNickName("Member Information");
- streamDefinition.setDescription("Member Information");
+ streamDefinition.setNickName("Scaling Decision");
+ streamDefinition.setDescription("Scaling Decision");
List<Attribute> payloadData = new ArrayList<Attribute>();
// Set payload definition
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
index 997ab0c..ef12983 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
@@ -72,6 +72,7 @@ public final class AutoscalerConstants {
public static final String PAYLOAD_DEPLOYMENT = "default";
public static final String MONITOR_THREAD_POOL_ID = "monitor.thread.pool";
+ public static final String STATS_PUBLISHER_THREAD_POOL_ID = "autoscaler.stats.publisher.thread.pool";
public static final String MONITOR_THREAD_POOL_SIZE = "monitor.thread.pool.size";
public static final String CLUSTER_MONITOR_SCHEDULER_ID = "cluster.monitor.scheduler";
public static final String MEMBER_FAULT_EVENT_NAME = "member_fault";
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
index db68396..87d7ab9 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
@@ -19,6 +19,7 @@
package org.apache.stratos.cloud.controller.statistics.publisher;
+import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
/**
@@ -33,9 +34,9 @@ public class CloudControllerPublisherFactory {
*/
public static MemberInformationPublisher createMemberInformationPublisher(StatisticsPublisherType type) {
if (type == StatisticsPublisherType.WSO2DAS) {
- return new DASMemberInformationPublisher();
+ return DASMemberInformationPublisher.getInstance();
} else {
- throw new RuntimeException("Unknown statistics publisher type");
+ throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher.");
}
}
@@ -47,9 +48,9 @@ public class CloudControllerPublisherFactory {
*/
public static MemberStatusPublisher createMemberStatusPublisher(StatisticsPublisherType type) {
if (type == StatisticsPublisherType.WSO2DAS) {
- return new DASMemberStatusPublisher();
+ return DASMemberStatusPublisher.getInstance();
} else {
- throw new RuntimeException("Unknown statistics publisher type");
+ throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher.");
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
index d0dcc49..4ab65e1 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -44,16 +44,28 @@ import java.util.concurrent.ExecutorService;
public class DASMemberInformationPublisher extends ThriftStatisticsPublisher implements MemberInformationPublisher {
private static final Log log = LogFactory.getLog(DASMemberInformationPublisher.class);
-
+ private static volatile DASMemberInformationPublisher dasMemberInformationPublisher;
private static final String DATA_STREAM_NAME = "member_info";
private static final String VERSION = "1.0.0";
private static final String DAS_THRIFT_CLIENT_NAME = "das";
+ private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
private static final String VALUE_NOT_FOUND = "Value Not Found";
private ExecutorService executorService;
- public DASMemberInformationPublisher() {
+ private DASMemberInformationPublisher() {
super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
- executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10);
+ executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE);
+ }
+
+ public static DASMemberInformationPublisher getInstance() {
+ if (dasMemberInformationPublisher == null) {
+ synchronized (DASMemberInformationPublisher.class) {
+ if (dasMemberInformationPublisher == null) {
+ dasMemberInformationPublisher = new DASMemberInformationPublisher();
+ }
+ }
+ }
+ return dasMemberInformationPublisher;
}
private static StreamDefinition createStreamDefinition() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
index 877256d..332bbba 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -38,14 +38,27 @@ import java.util.concurrent.ExecutorService;
public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implements MemberStatusPublisher {
private static final Log log = LogFactory.getLog(DASMemberStatusPublisher.class);
+ private static volatile DASMemberStatusPublisher dasMemberStatusPublisher;
private static final String DATA_STREAM_NAME = "member_lifecycle";
private static final String VERSION = "1.0.0";
private static final String DAS_THRIFT_CLIENT_NAME = "das";
+ private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
private ExecutorService executorService;
- public DASMemberStatusPublisher() {
+ private DASMemberStatusPublisher() {
super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
- executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10);
+ executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE);
+ }
+
+ public static DASMemberStatusPublisher getInstance() {
+ if (dasMemberStatusPublisher == null) {
+ synchronized (DASMemberStatusPublisher.class) {
+ if (dasMemberStatusPublisher == null) {
+ dasMemberStatusPublisher = new DASMemberStatusPublisher();
+ }
+ }
+ }
+ return dasMemberStatusPublisher;
}
private static StreamDefinition createStreamDefinition() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
index c025bb4..ccd8d34 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
@@ -162,6 +162,7 @@ public final class CloudControllerConstants {
public static final String TIMESTAMP_COL = "timestamp";
public static final String SCALING_DECISION_ID_COL = "scaling_decision_id";
+ public static final String STATS_PUBLISHER_THREAD_POOL_ID = "cloud.controller.stats.publisher.thread.pool";
/**
* Properties
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
new file mode 100644
index 0000000..09efa1e
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.exception;
+
+/**
+ * This exception will be thrown when trying to create a publisher with invalid statistics publisher type.
+ */
+public class InvalidStatisticsPublisherTypeException extends Exception {
+
+ public InvalidStatisticsPublisherTypeException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
index e4047ab..bf67c1b 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
@@ -19,6 +19,7 @@
package org.apache.stratos.common.statistics.publisher;
+import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
import org.apache.stratos.common.statistics.publisher.wso2.cep.WSO2CEPHealthStatisticsPublisher;
/**
@@ -28,9 +29,9 @@ public class HealthStatisticsPublisherFactory {
public static HealthStatisticsPublisher createHealthStatisticsPublisher(StatisticsPublisherType type) {
if (type == StatisticsPublisherType.WSO2CEP) {
- return new WSO2CEPHealthStatisticsPublisher();
+ return WSO2CEPHealthStatisticsPublisher.getInstance();
} else {
- throw new RuntimeException("Unknown statistics publisher type");
+ throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher.");
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
index c942bce..a4b9db8 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
@@ -19,6 +19,7 @@
package org.apache.stratos.common.statistics.publisher;
+import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
import org.apache.stratos.common.statistics.publisher.wso2.cep.WSO2CEPInFlightRequestPublisher;
/**
@@ -28,9 +29,9 @@ public class InFlightRequestPublisherFactory {
public static InFlightRequestPublisher createInFlightRequestPublisher(StatisticsPublisherType type) {
if (type == StatisticsPublisherType.WSO2CEP) {
- return new WSO2CEPInFlightRequestPublisher();
+ return WSO2CEPInFlightRequestPublisher.getInstance();
} else {
- throw new RuntimeException("Unknown statistics publisher type");
+ throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher.");
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
index 7d4aa6e..16dba16 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -21,14 +21,10 @@ package org.apache.stratos.common.statistics.publisher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.wso2.carbon.databridge.agent.thrift.Agent;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.agent.thrift.lb.DataPublisherHolder;
import org.wso2.carbon.databridge.agent.thrift.lb.LoadBalancingDataPublisher;
import org.wso2.carbon.databridge.agent.thrift.lb.ReceiverGroup;
-import org.wso2.carbon.databridge.agent.thrift.util.DataPublisherUtil;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -52,8 +48,8 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher {
* Credential information stored inside thrift-client-config.xml file
* is parsed and assigned into ip,port,username and password fields
*
- * @param streamDefinition Thrift Event Stream Definition
- * @param thriftClientName Thrift Client Name
+ * @param streamDefinition Thrift Event Stream Definition
+ * @param thriftClientName Thrift Client Name
*/
public ThriftStatisticsPublisher(StreamDefinition streamDefinition, String thriftClientName) {
ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance();
@@ -61,54 +57,58 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher {
this.streamDefinition = streamDefinition;
if (isPublisherEnabled()) {
- this.enabled = true;
+ this.enabled = true;
init();
}
}
private boolean isPublisherEnabled() {
- boolean publisherEnabled = false;
- for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
- publisherEnabled = thriftClientInfo.isStatsPublisherEnabled();
- if(publisherEnabled){
- break;
- }
- }
- return publisherEnabled;
- }
-
- private void init() {
-
+ boolean publisherEnabled = false;
+ for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
+ publisherEnabled = thriftClientInfo.isStatsPublisherEnabled();
+ if (publisherEnabled) {
+ break;
+ }
+ }
+ return publisherEnabled;
+ }
+
+ private void init() {
+
// Initialize load balancing data publisher
loadBalancingDataPublisher = new LoadBalancingDataPublisher(getReceiverGroups());
- loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
+
+ //adding stream definition
+ if (!loadBalancingDataPublisher.isStreamDefinitionAdded(streamDefinition)) {
+ loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
+ }
}
private ArrayList<ReceiverGroup> getReceiverGroups() {
-
- ArrayList<ReceiverGroup> receiverGroups = new ArrayList<ReceiverGroup>();
-
+
+ ArrayList<ReceiverGroup> receiverGroups = new ArrayList<ReceiverGroup>();
+
for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
- ArrayList<DataPublisherHolder> dataPublisherHolders = new ArrayList<DataPublisherHolder>();
- DataPublisherHolder aNode = new DataPublisherHolder(null, buildUrl(thriftClientInfo), thriftClientInfo.getUsername(), thriftClientInfo.getPassword());
- dataPublisherHolders.add(aNode);
- ReceiverGroup group = new ReceiverGroup(dataPublisherHolders);
- receiverGroups.add(group);
- }
- return receiverGroups;
-
- }
-
- private String buildUrl(ThriftClientInfo thriftClientInfo) {
- String url = new StringBuilder()
- .append("tcp://")
- .append(thriftClientInfo.getIp())
- .append(":")
- .append(thriftClientInfo.getPort()).toString();
- return url;
- }
-
- @Override
+ ArrayList<DataPublisherHolder> dataPublisherHolders = new ArrayList<DataPublisherHolder>();
+ DataPublisherHolder aNode = new DataPublisherHolder(null, buildUrl(thriftClientInfo), thriftClientInfo.getUsername(), thriftClientInfo.getPassword());
+ dataPublisherHolders.add(aNode);
+ ReceiverGroup group = new ReceiverGroup(dataPublisherHolders);
+ receiverGroups.add(group);
+ }
+ return receiverGroups;
+
+ }
+
+ private String buildUrl(ThriftClientInfo thriftClientInfo) {
+ String url = new StringBuilder()
+ .append("tcp://")
+ .append(thriftClientInfo.getIp())
+ .append(":")
+ .append(thriftClientInfo.getPort()).toString();
+ return url;
+ }
+
+ @Override
public void setEnabled(boolean enabled) {
this.enabled = enabled;
if (this.enabled) {
@@ -138,11 +138,11 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher {
}
loadBalancingDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
if (log.isDebugEnabled()) {
- log.debug(String.format("Successfully Published ******** thrift event: [stream] %s [version] %s",
+ log.debug(String.format("Successfully Published thrift event: [stream] %s [version] %s",
streamDefinition.getName(), streamDefinition.getVersion()));
}
-
-
+
+
} catch (AgentException e) {
if (log.isErrorEnabled()) {
log.error(String.format("Could not publish thrift event: [stream] %s [version] %s",
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index ea1c401..d025c33 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -36,15 +36,26 @@ import java.util.List;
public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher implements HealthStatisticsPublisher {
private static final Log log = LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class);
-
+ private static volatile WSO2CEPHealthStatisticsPublisher wso2CEPHealthStatisticsPublisher;
private static final String DATA_STREAM_NAME = "cartridge_agent_health_stats";
private static final String VERSION = "1.0.0";
private static final String CEP_THRIFT_CLIENT_NAME = "cep";
- public WSO2CEPHealthStatisticsPublisher() {
+ private WSO2CEPHealthStatisticsPublisher() {
super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME);
}
+ public static WSO2CEPHealthStatisticsPublisher getInstance() {
+ if (wso2CEPHealthStatisticsPublisher == null) {
+ synchronized (WSO2CEPHealthStatisticsPublisher.class) {
+ if (wso2CEPHealthStatisticsPublisher == null) {
+ wso2CEPHealthStatisticsPublisher = new WSO2CEPHealthStatisticsPublisher();
+ }
+ }
+ }
+ return wso2CEPHealthStatisticsPublisher;
+ }
+
private static StreamDefinition createStreamDefinition() {
try {
// Create stream definition
http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index 24465c7..8c9189b 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -38,15 +38,26 @@ import java.util.List;
*/
public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher implements InFlightRequestPublisher {
private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class);
-
+ private static volatile WSO2CEPInFlightRequestPublisher wso2CEPInFlightRequestPublisher;
private static final String DATA_STREAM_NAME = "in_flight_requests";
private static final String VERSION = "1.0.0";
private static final String CEP_THRIFT_CLIENT_NAME = "cep";
- public WSO2CEPInFlightRequestPublisher() {
+ private WSO2CEPInFlightRequestPublisher() {
super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME);
}
+ public static WSO2CEPInFlightRequestPublisher getInstance() {
+ if (wso2CEPInFlightRequestPublisher == null) {
+ synchronized ( WSO2CEPInFlightRequestPublisher.class) {
+ if (wso2CEPInFlightRequestPublisher == null) {
+ wso2CEPInFlightRequestPublisher = new WSO2CEPInFlightRequestPublisher();
+ }
+ }
+ }
+ return wso2CEPInFlightRequestPublisher;
+ }
+
private static StreamDefinition createStreamDefinition() {
try {
// Create stream definition