You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2013/11/27 19:55:21 UTC
[24/26] git commit: auto-scaler refactoring v0.1
auto-scaler refactoring v0.1
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/21aadd0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/21aadd0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/21aadd0a
Branch: refs/heads/master
Commit: 21aadd0a108c4a6f269a3b0eabfc0b1a2d7f2db1
Parents: ffaaeee
Author: Nirmal Fernando <ni...@apache.org>
Authored: Thu Nov 28 00:14:24 2013 +0530
Committer: Nirmal Fernando <ni...@apache.org>
Committed: Thu Nov 28 00:14:24 2013 +0530
----------------------------------------------------------------------
.../stratos/autoscaler/ClusterContext.java | 54 ++++-
.../stratos/autoscaler/ClusterMonitor.java | 154 +++++++++++++
.../stratos/autoscaler/PartitionContext.java | 77 +++++++
.../algorithm/AutoscaleAlgorithm.java | 15 +-
.../autoscaler/algorithm/OneAfterAnother.java | 4 +-
.../PartitionGroupOneAfterAnother.java | 4 +-
.../autoscaler/algorithm/RoundRobin.java | 4 +-
.../cloud/controller/CloudControllerClient.java | 88 +++++---
.../autoscaler/exception/SpawningException.java | 12 +-
.../exception/TerminationException.java | 4 +
.../internal/AutoscalerServerComponent.java | 26 ++-
.../health/HealthEventMessageDelegator.java | 54 ++---
.../autoscaler/policy/PolicyManager.java | 2 +-
.../deployers/DeploymentPolicyDeployer.java | 2 +-
.../deployers/DeploymentPolicyReader.java | 6 +-
.../rule/AutoscalerRuleEvaluator.java | 133 ++++++-----
.../autoscaler/rule/ExecutorTaskScheduler.java | 58 ++---
.../processors/AutoscalerTopologyReceiver.java | 223 +++++++++++++++++++
.../topology/processors/TopologyReceiver.java | 80 +++++++
.../stratos/autoscaler/util/AutoscalerUtil.java | 63 +++---
.../stratos/autoscaler/TestKnowledgeBase.java | 62 +++++-
.../src/test/resources/autoscaler.drl | 124 +++++++++++
.../test/resources/minimum-autoscaler-rule.drl | 62 ++++++
.../resources/test-minimum-autoscaler-rule.drl | 60 +++++
24 files changed, 1163 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
index 10e09f7..ce3249b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
@@ -20,9 +20,15 @@
package org.apache.stratos.autoscaler;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.FactHandle;
+
/**
* Defines cluster context properties.
*/
@@ -39,9 +45,14 @@ public class ClusterContext {
private float requestsInFlightGradient;
private int memberCount;
+
+ private StatefulKnowledgeSession ksession;
+ private FactHandle facthandle;
//This map will keep number of currently spawned instance count against partitionId
private Map<String, Integer> partitionCountMap;
+
+ private List<Partition> partitionsOfThisCluster;
private int currentPartitionIndex;
private int currentPartitionGroupIndex;
@@ -49,13 +60,22 @@ public class ClusterContext {
private Properties properties;
private Map<String, MemberContext> memberContextMap;
+ private DeploymentPolicy deploymentPolicy;
- public ClusterContext(String clusterId, String serviceId) {
+ public ClusterContext(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy) {
this.clusterId = clusterId;
this.serviceId = serviceId;
+ this.setDeploymentPolicy(deploymentPolicy);
+ if (deploymentPolicy != null) {
+ this.setPartitionsOfThisCluster(deploymentPolicy.getAllPartitions());
+ }
memberContextMap = new HashMap<String, MemberContext>();
partitionCountMap = new HashMap<String, Integer>();
+
+ for (Partition partition : partitionsOfThisCluster) {
+ this.addPartitionCount(partition.getId(), 0);
+ }
memberCount = 0;
}
@@ -192,4 +212,36 @@ public class ClusterContext {
public void setCurrentPartitionGroupIndex(int currentPartitionGroupIndex) {
this.currentPartitionGroupIndex = currentPartitionGroupIndex;
}
+
+ public List<Partition> getAllPartitions() {
+ return partitionsOfThisCluster;
+ }
+
+ public void setPartitionsOfThisCluster(List<Partition> partitionsOfThisCluster) {
+ this.partitionsOfThisCluster = partitionsOfThisCluster;
+ }
+
+ public StatefulKnowledgeSession getKsession() {
+ return ksession;
+ }
+
+ public void setKsession(StatefulKnowledgeSession ksession) {
+ this.ksession = ksession;
+ }
+
+ public FactHandle getFacthandle() {
+ return facthandle;
+ }
+
+ public void setFacthandle(FactHandle facthandle) {
+ this.facthandle = facthandle;
+ }
+
+ public DeploymentPolicy getDeploymentPolicy() {
+ return deploymentPolicy;
+ }
+
+ public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
+ this.deploymentPolicy = deploymentPolicy;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java
new file mode 100644
index 0000000..f795baa
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.FactHandle;
+
+/**
+ * Is responsible for monitoring a service cluster. This runs periodically
+ * and perform minimum instance check and scaling check using the underlying
+ * rules engine.
+ * @author nirmal
+ *
+ */
+public class ClusterMonitor implements Runnable{
+
+ private String clusterId;
+ private ClusterContext clusterCtxt;
+ private List<MemberContext> memberCtxt;
+ private Map<String, PartitionContext> partitionCtxts;
+ private StatefulKnowledgeSession ksession;
+ private boolean isDestroyed;
+
+ private FactHandle facthandle;
+
+ public ClusterMonitor(String clusterId, ClusterContext ctxt, StatefulKnowledgeSession ksession) {
+ this.clusterId = clusterId;
+ this.clusterCtxt = ctxt;
+ this.ksession = ksession;
+ partitionCtxts = new ConcurrentHashMap<String, PartitionContext>();
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public ClusterContext getClusterCtxt() {
+ return clusterCtxt;
+ }
+
+ public void setClusterCtxt(ClusterContext clusterCtxt) {
+ this.clusterCtxt = clusterCtxt;
+ }
+
+ public List<MemberContext> getMemberCtxt() {
+ return memberCtxt;
+ }
+
+ public void setMemberCtxt(List<MemberContext> memberCtxt) {
+ this.memberCtxt = memberCtxt;
+ }
+
+ public Map<String, PartitionContext> getPartitionCtxt() {
+ return partitionCtxts;
+ }
+
+ public void setPartitionCtxt(Map<String, PartitionContext> partitionCtxt) {
+ this.partitionCtxts = partitionCtxt;
+ }
+
+ public void addPartitionCtxt(PartitionContext ctxt) {
+ this.partitionCtxts.put(ctxt.getPartitionId(), ctxt);
+ }
+
+ public PartitionContext getPartitionCtxt(String id) {
+ return this.partitionCtxts.get(id);
+ }
+
+ public StatefulKnowledgeSession getKsession() {
+ return ksession;
+ }
+
+ public void setKsession(StatefulKnowledgeSession ksession) {
+ this.ksession = ksession;
+ }
+
+ public FactHandle getFacthandle() {
+ return facthandle;
+ }
+
+ public void setFacthandle(FactHandle facthandle) {
+ this.facthandle = facthandle;
+ }
+
+ @Override
+ public void run() {
+
+ while (!isDestroyed()) {
+ minInstanceCountCheck();
+ // TODO scale
+ try {
+ // TODO make this configurable
+ Thread.sleep(30000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+
+ private void minInstanceCountCheck() {
+ if(clusterCtxt != null ) {
+ ksession.setGlobal("clusterId", clusterId);
+ //TODO make this concurrent
+ for (Partition partition : clusterCtxt.getAllPartitions()) {
+ String id = partition.getId();
+ PartitionContext ctxt = partitionCtxts.get(id);
+ if(ctxt == null) {
+ ctxt = new PartitionContext(partition);
+ }
+ ctxt.setMinimumMemberCount(partition.getPartitionMembersMin());
+
+ AutoscalerRuleEvaluator.evaluate(ksession, facthandle, ctxt);
+ }
+ }
+ }
+
+ public void destroy() {
+ ksession.dispose();
+ setDestroyed(true);
+ }
+
+ public boolean isDestroyed() {
+ return isDestroyed;
+ }
+
+ public void setDestroyed(boolean isDestroyed) {
+ this.isDestroyed = isDestroyed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java
new file mode 100644
index 0000000..3579bcf
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.stratos.messaging.domain.policy.Partition;
+
+/**
+ * @author nirmal
+ *
+ */
+public class PartitionContext {
+
+ private String partitionId;
+ private Partition partition;
+ private int currentMemberCount = 0;
+ private int minimumMemberCount = 0;
+ private List<String> memberIds = new ArrayList<String>();
+
+ public PartitionContext(Partition partition) {
+ this.setPartition(partition);
+ this.partitionId = partition.getId();
+ }
+
+ public String getPartitionId() {
+ return partitionId;
+ }
+ public void setPartitionId(String partitionId) {
+ this.partitionId = partitionId;
+ }
+ public int getCurrentMemberCount() {
+ return currentMemberCount;
+ }
+ public void incrementCurrentMemberCount(int count) {
+ this.currentMemberCount += count;
+ }
+ public List<String> getMemberIds() {
+ return memberIds;
+ }
+ public void setMemberIds(List<String> memberIds) {
+ this.memberIds = memberIds;
+ }
+
+ public int getMinimumMemberCount() {
+ return minimumMemberCount;
+ }
+
+ public void setMinimumMemberCount(int minimumMemberCount) {
+ this.minimumMemberCount = minimumMemberCount;
+ }
+
+ public Partition getPartition() {
+ return partition;
+ }
+
+ public void setPartition(Partition partition) {
+ this.partition = partition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
index aacc6bf..3a3fef5 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
@@ -19,16 +19,19 @@
package org.apache.stratos.autoscaler.algorithm;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
/**
*
*/
public interface AutoscaleAlgorithm {
-public boolean scaleUpPartitionAvailable(String clusterId);
-public boolean scaleDownPartitionAvailable(String clusterId);
-public Partition getNextScaleUpPartition(PartitionGroup partition,String clusterId);
-public Partition getNextScaleDownPartition(PartitionGroup partition,String clusterId);
+ public boolean scaleUpPartitionAvailable(String clusterId);
+
+ public boolean scaleDownPartitionAvailable(String clusterId);
+
+ public Partition getNextScaleUpPartition(PartitionGroup partition, String clusterId);
+
+ public Partition getNextScaleDownPartition(PartitionGroup partition, String clusterId);
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
index 3c06c13..309c2c9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
@@ -25,8 +25,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.AutoscalerContext;
import org.apache.stratos.autoscaler.ClusterContext;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
/**
* Completes partitions in the order defined in autoscaler policy, go to next if current one reached the max limit
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/PartitionGroupOneAfterAnother.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/PartitionGroupOneAfterAnother.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/PartitionGroupOneAfterAnother.java
index dd47678..c95d621 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/PartitionGroupOneAfterAnother.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/PartitionGroupOneAfterAnother.java
@@ -27,8 +27,8 @@ import org.apache.stratos.autoscaler.AutoscalerContext;
import org.apache.stratos.autoscaler.ClusterContext;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
/**
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
index d75f11a..47e72d4 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
@@ -23,8 +23,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.AutoscalerContext;
import org.apache.stratos.autoscaler.ClusterContext;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
index 00ee1cb..2536007 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
@@ -25,9 +25,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.Constants;
import org.apache.stratos.autoscaler.exception.SpawningException;
import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.policy.model.Partition;
import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.CloudControllerServiceIllegalArgumentExceptionException;
+import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidCartridgeTypeExceptionException;
+import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidMemberExceptionException;
import org.apache.stratos.cloud.controller.stub.CloudControllerServiceStub;
+import org.apache.stratos.cloud.controller.stub.CloudControllerServiceUnregisteredCartridgeExceptionException;
+import org.apache.stratos.messaging.domain.policy.Partition;
import java.rmi.RemoteException;
@@ -38,9 +42,24 @@ import java.rmi.RemoteException;
public class CloudControllerClient {
private static final Log log = LogFactory.getLog(CloudControllerClient.class);
- private CloudControllerServiceStub stub;
+ private static CloudControllerServiceStub stub;
+ private static CloudControllerClient instance;
- public CloudControllerClient(){
+ public static CloudControllerClient getInstance() {
+
+ if (instance == null) {
+ synchronized (CloudControllerClient.class) {
+
+ if(instance == null) {
+ instance = new CloudControllerClient();
+ }
+ }
+ }
+
+ return instance;
+ }
+
+ private CloudControllerClient(){
try {
XMLConfiguration conf = ConfUtil.getInstance().getConfiguration();
int port = conf.getInt("autoscaler.cloudController.port", Constants.CLOUD_CONTROLLER_DEFAULT_PORT);
@@ -58,53 +77,54 @@ public class CloudControllerClient {
log.info("Calling CC for spawning instances in cluster " + clusterId);
log.info("Member count to be increased: " + memberCountToBeIncreased);
- org.apache.stratos.messaging.domain.topology.xsd.Partition partitionTopology = new
- org.apache.stratos.messaging.domain.topology.xsd.Partition();
- partitionTopology.setId(partition.getId());
-
- try {
- for(int i =0; i< memberCountToBeIncreased; i++){
- stub.startInstance(clusterId, partitionTopology);
- }
- } catch (RemoteException e) {
- log.error("Error occurred in cloud controller side while spawning instance");
+ for(int i =0; i< memberCountToBeIncreased; i++){
+ spawnAnInstance(partition, clusterId);
}
+
}
- public void spawnAnInstance(Partition partition, String clusterId) throws SpawningException {
+ public static void spawnAnInstance(Partition partition, String clusterId) throws SpawningException {
- log.info("Calling CC for spawning an instance in cluster " + clusterId);
- org.apache.stratos.messaging.domain.topology.xsd.Partition partitionTopology = new
- org.apache.stratos.messaging.domain.topology.xsd.Partition();
+ org.apache.stratos.messaging.domain.policy.xsd.Partition partitionTopology = new
+ org.apache.stratos.messaging.domain.policy.xsd.Partition();
partitionTopology.setId(partition.getId());
- /*locationScope.setCloud(partition.getIaas());
- locationScope.setRegion(partition.getZone());*/
-
+
try {
stub.startInstance(clusterId, partitionTopology);
+ } catch (CloudControllerServiceIllegalArgumentExceptionException e) {
+ log.error(e.getMessage());
+ throw new SpawningException(e);
+ } catch (CloudControllerServiceUnregisteredCartridgeExceptionException e) {
+ log.error(e.getMessage());
+ throw new SpawningException(e);
} catch (RemoteException e) {
-
- log.error("Error occurred in cloud controller side while spawning instance");
-
+ String msg = "Error occurred in cloud controller side while spawning instance";
+ log.error(msg);
+ throw new SpawningException(msg, e);
}
}
- public void terminate(Partition partition, String clusterId) throws TerminationException {
+ public void terminate(String memberId) throws TerminationException {
//call CC terminate method
- log.info("Calling CC for terminating an instance in cluster " + clusterId);
- org.apache.stratos.messaging.domain.topology.xsd.Partition partitionTopology = new
- org.apache.stratos.messaging.domain.topology.xsd.Partition();
- partitionTopology.setId(partition.getId());
- /*locationScope.setCloud(partition.getIaas());
- locationScope.setRegion(partition.getZone());*/
+ log.info("Calling CC for terminating member with id: " + memberId);
try {
- stub.terminateInstance(clusterId, partitionTopology);
+ stub.terminateInstance(memberId);
} catch (RemoteException e) {
-
- log.error("Error occurred in cloud controller side while terminating instance");
-
+ String msg = "Error occurred in cloud controller side while terminating instance";
+ log.error(msg, e);
+ throw new TerminationException(msg, e);
+
+ } catch (CloudControllerServiceIllegalArgumentExceptionException e) {
+ log.error(e.getMessage());
+ throw new TerminationException(e);
+ } catch (CloudControllerServiceInvalidMemberExceptionException e) {
+ log.error(e.getMessage());
+ throw new TerminationException(e);
+ } catch (CloudControllerServiceInvalidCartridgeTypeExceptionException e) {
+ log.error(e.getMessage());
+ throw new TerminationException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/SpawningException.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/SpawningException.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/SpawningException.java
index bf46354..6b228b9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/SpawningException.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/SpawningException.java
@@ -1,15 +1,19 @@
package org.apache.stratos.autoscaler.exception;
-import java.rmi.RemoteException;
-
/**
*
*/
public class SpawningException extends Exception {
- public SpawningException(String exception, RemoteException message){
- super(exception, message);
+ private static final long serialVersionUID = 4761501174753405374L;
+
+
+ public SpawningException(String message, Exception exception){
+ super(message, exception);
}
+ public SpawningException(Exception exception){
+ super(exception);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/TerminationException.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/TerminationException.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/TerminationException.java
index efb3f21..8c7ede7 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/TerminationException.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/TerminationException.java
@@ -7,4 +7,8 @@ public class TerminationException extends Throwable {
public TerminationException(String s, Exception e) {
super(s, e);
}
+
+ public TerminationException(Exception e) {
+ super(e);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 35e79c5..4969670 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.message.receiver.health.HealthEventMessageDelegator;
import org.apache.stratos.autoscaler.message.receiver.health.HealthEventMessageReceiver;
import org.apache.stratos.autoscaler.rule.ExecutorTaskScheduler;
+import org.apache.stratos.autoscaler.topology.processors.AutoscalerTopologyReceiver;
+import org.apache.stratos.autoscaler.topology.processors.TopologyReceiver;
import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageReceiver;
@@ -42,18 +44,20 @@ public class AutoscalerServerComponent {
protected void activate(ComponentContext componentContext) throws Exception {
// Subscribe to all topics
- TopicSubscriber topologyTopicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
- topologyTopicSubscriber.setMessageListener(new TopologyEventMessageReceiver());
- Thread topologyTopicSubscriberThread = new Thread(topologyTopicSubscriber);
- topologyTopicSubscriberThread.start();
- if (log.isDebugEnabled()) {
- log.debug("Topology event message receiver thread started");
- }
-
- TopologyEventMessageDelegator tropologyEventMessageDelegator = new TopologyEventMessageDelegator();
- Thread tropologyDelegatorThread = new Thread(tropologyEventMessageDelegator);
- tropologyDelegatorThread.start();
+// TopicSubscriber topologyTopicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+// topologyTopicSubscriber.setMessageListener(new TopologyEventMessageReceiver());
+// Thread topologyTopicSubscriberThread = new Thread(topologyTopicSubscriber);
+// topologyTopicSubscriberThread.start();
+// if (log.isDebugEnabled()) {
+// log.debug("Topology event message receiver thread started");
+// }
+//
+// TopologyEventMessageDelegator tropologyEventMessageDelegator = new TopologyEventMessageDelegator();
+// Thread tropologyDelegatorThread = new Thread(tropologyEventMessageDelegator);
+// tropologyDelegatorThread.start();
+ Thread th = new Thread(new AutoscalerTopologyReceiver());
+ th.start();
if (log.isDebugEnabled()) {
log.debug("Topology message processor thread started");
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
index 5057553..5c57135 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
@@ -19,6 +19,7 @@
package org.apache.stratos.autoscaler.message.receiver.health;
import com.google.gson.stream.JsonReader;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.AutoscalerContext;
@@ -27,11 +28,13 @@ import org.apache.stratos.autoscaler.Constants;
import org.apache.stratos.autoscaler.policy.PolicyManager;
import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
import org.apache.stratos.autoscaler.policy.model.LoadThresholds;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import javax.jms.TextMessage;
+
import java.io.BufferedReader;
import java.io.StringReader;
@@ -61,31 +64,32 @@ public class HealthEventMessageDelegator implements Runnable {
log.info(clusterId);
log.info(value);
log.info(eventName);
- for (Service service : TopologyManager.getTopology().getServices()){
-
- if(service.clusterExists(clusterId)){
-
- if(!AutoscalerContext.getInstance().clusterExists(clusterId)){
-
- Cluster cluster = service.getCluster(clusterId);
- AutoscalePolicy autoscalePolicy = PolicyManager.getInstance().getAutoscalePolicy(cluster.getAutoscalePolicyName());
-
- ClusterContext clusterContext = new ClusterContext(clusterId, service.getServiceName());
-
- LoadThresholds loadThresholds = autoscalePolicy.getLoadThresholds();
- float averageLimit = loadThresholds.getRequestsInFlight().getAverage();
- float gradientLimit = loadThresholds.getRequestsInFlight().getGradient();
- float secondDerivative = loadThresholds.getRequestsInFlight().getSecondDerivative();
-
- clusterContext.setAverageRequestsInFlight(averageLimit);
- clusterContext.setRequestsInFlightGradient(gradientLimit);
- clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
-
- AutoscalerContext.getInstance().addClusterContext(clusterContext);
- }
- break;
- }
- }
+// for (Service service : TopologyManager.getTopology().getServices()){
+//
+// if(service.clusterExists(clusterId)){
+//
+// if(!AutoscalerContext.getInstance().clusterExists(clusterId)){
+//
+// Cluster cluster = service.getCluster(clusterId);
+// AutoscalePolicy autoscalePolicy = PolicyManager.getInstance().getAutoscalePolicy(cluster.getAutoscalePolicyName());
+// DeploymentPolicy deploymentPolicy = PolicyManager.getInstance().getDeploymentPolicy(cluster.getDeploymentPolicyName());
+//
+// ClusterContext clusterContext = new ClusterContext(clusterId, service.getServiceName(), deploymentPolicy.getAllPartitions());
+//
+// LoadThresholds loadThresholds = autoscalePolicy.getLoadThresholds();
+// float averageLimit = loadThresholds.getRequestsInFlight().getAverage();
+// float gradientLimit = loadThresholds.getRequestsInFlight().getGradient();
+// float secondDerivative = loadThresholds.getRequestsInFlight().getSecondDerivative();
+//
+// clusterContext.setAverageRequestsInFlight(averageLimit);
+// clusterContext.setRequestsInFlightGradient(gradientLimit);
+// clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
+//
+// AutoscalerContext.getInstance().addClusterContext(clusterContext);
+// }
+// break;
+// }
+// }
if(Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)){
AutoscalerContext.getInstance().getClusterContext(clusterId).setAverageRequestsInFlight(value);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/PolicyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/PolicyManager.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/PolicyManager.java
index 093d0b8..2630f98 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/PolicyManager.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/PolicyManager.java
@@ -29,7 +29,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.policy.model.DeploymentPolicy;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyDeployer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyDeployer.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyDeployer.java
index d74a384..a219088 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyDeployer.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyDeployer.java
@@ -29,7 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.policy.InvalidPolicyException;
import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.DeploymentPolicy;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyReader.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyReader.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyReader.java
index ab8f106..603742b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyReader.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyReader.java
@@ -29,9 +29,9 @@ import org.apache.axis2.deployment.DeploymentException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.policy.InvalidPolicyException;
-import org.apache.stratos.autoscaler.policy.model.DeploymentPolicy;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.Partition;
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
index 538fb22..a360c52 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
@@ -20,13 +20,18 @@
package org.apache.stratos.autoscaler.rule;
import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.ClusterContext;
+import org.apache.stratos.autoscaler.ClusterMonitor;
+import org.apache.stratos.autoscaler.PartitionContext;
import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.Partition;
+import org.apache.stratos.messaging.domain.policy.Partition;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.drools.KnowledgeBase;
@@ -43,6 +48,7 @@ import org.apache.stratos.autoscaler.algorithm.RoundRobin;
import org.apache.stratos.autoscaler.util.AutoscalerUtil;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.autoscaler.algorithm.PartitionGroupOneAfterAnother;
+import org.drools.runtime.rule.FactHandle;
/**
* This class is responsible for evaluating the current details of topology, statistics, and health
@@ -54,50 +60,63 @@ public class AutoscalerRuleEvaluator {
private static AutoscalerRuleEvaluator instance = null;
private static final String DRL_FILE_NAME = "autoscaler.drl";
-
- private KnowledgeBase kbase;
- private StatefulKnowledgeSession ksession;
+ private Map<String, ClusterMonitor> monitors;
+ private static KnowledgeBase kbase;
private AutoscalerRuleEvaluator() {
try {
kbase = readKnowledgeBase();
+ setMonitors(new HashMap<String, ClusterMonitor>());
} catch (Exception e) {
log.error("Rule evaluate error", e);
}
}
- public void evaluate(Service service) throws Exception{
- try {
+ public static void evaluate(StatefulKnowledgeSession ksession, FactHandle handle, Object obj) {
- for (Cluster cluster: service.getClusters()){
- //update cluster-context
- cluster.setDeploymentPolicyName("economy-deployment");
- AutoscalerUtil.updateClusterContext(cluster);
- }
+ if (handle == null) {
- ksession = kbase.newStatefulKnowledgeSession();
- ksession.setGlobal("$context", AutoscalerContext.getInstance());
- ksession.setGlobal("log", log);
- ksession.setGlobal("$manager", PolicyManager.getInstance());
- ksession.setGlobal("$topology", TopologyManager.getTopology());
- ksession.setGlobal("$evaluator", this);
- ksession.insert(service);
- ksession.fireAllRules();
- } catch (Exception e) {
- throw new Exception("Rule evaluate error", e);
- }
+ handle = ksession.insert(obj);
+ } else {
+ ksession.update(handle, obj);
+ }
+ ksession.fireAllRules();
+
+ }
+
+ public void addMonitor(ClusterMonitor monitor) {
+ monitors.put(monitor.getClusterId(), monitor);
}
- public boolean delegateSpawn(Partition partition, String clusterId) {
- CloudControllerClient cloudControllerClient = new CloudControllerClient();
+ public ClusterMonitor getMonitor(String clusterId) {
+ return monitors.get(clusterId);
+ }
+
+ public ClusterMonitor removeMonitor(String clusterId) {
+ return monitors.remove(clusterId);
+ }
+
+ public StatefulKnowledgeSession getStatefulSession() {
+ StatefulKnowledgeSession ksession;
+ ksession = kbase.newStatefulKnowledgeSession();
+// ksession.setGlobal("$partitions", ctxt.getPartitionsOfThisCluster());
+// ksession.setGlobal("log", log);
+// ksession.setGlobal("$manager", PolicyManager.getInstance());
+// ksession.setGlobal("$topology", TopologyManager.getTopology());
+// ksession.setGlobal("$evaluator", this);
+ return ksession;
+// ksession.insert(clusterCtxt);
+// ksession.fireAllRules();
+ }
+
+ public static boolean delegateSpawn(Partition partition, String clusterId) {
try {
- int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
+// int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
- if(currentMemberCount < partition.getPartitionMembersMax()) {
- AutoscalerContext.getInstance().getClusterContext(clusterId).increaseMemberCount(1);
- cloudControllerClient.spawnAnInstance(partition, clusterId);
- }
+// if(currentMemberCount < partition.getPartitionMembersMax()) {
+// AutoscalerContext.getInstance().getClusterContext(clusterId).increaseMemberCount(1);
+ CloudControllerClient.spawnAnInstance(partition, clusterId);
} catch (Throwable e) {
log.error("Cannot spawn an instance", e);
@@ -106,15 +125,15 @@ public class AutoscalerRuleEvaluator {
}
public boolean delegateTerminate(Partition partition, String clusterId) {
- CloudControllerClient cloudControllerClient = new CloudControllerClient();
try {
- int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
- log.info("Current member count is " + currentMemberCount );
- if(currentMemberCount > partition.getPartitionMembersMin()) {
- AutoscalerContext.getInstance().getClusterContext(clusterId).decreaseMemberCount();
- cloudControllerClient.terminate(partition, clusterId);
- }
+// int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
+// log.info("Current member count is " + currentMemberCount );
+// if(currentMemberCount > partition.getPartitionMembersMin()) {
+// AutoscalerContext.getInstance().getClusterContext(clusterId).decreaseMemberCount();
+ //FIXME
+// cloudControllerClient.terminate(partition, clusterId);
+// }
return true;
} catch (Throwable e) {
log.error("Cannot terminate instance", e);
@@ -122,22 +141,22 @@ public class AutoscalerRuleEvaluator {
return false;
}
- public boolean delegateSpawn(Partition partition, String clusterId, int memberCountToBeIncreased) {
- CloudControllerClient cloudControllerClient = new CloudControllerClient();
- try {
- int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
- log.info("Current member count is " + currentMemberCount );
-
- if(currentMemberCount < partition.getPartitionMembersMax()) {
- AutoscalerContext.getInstance().getClusterContext(clusterId).increaseMemberCount(memberCountToBeIncreased);
- cloudControllerClient.spawnInstances(partition, clusterId, memberCountToBeIncreased);
- }
- return true;
- } catch (Throwable e) {
- log.error("Cannot spawn an instance", e);
- }
- return false;
- }
+// public boolean delegateSpawn(Partition partition, String clusterId, int memberCountToBeIncreased) {
+// CloudControllerClient cloudControllerClient = new CloudControllerClient();
+// try {
+// int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
+// log.info("Current member count is " + currentMemberCount );
+//
+// if(currentMemberCount < partition.getPartitionMembersMax()) {
+// AutoscalerContext.getInstance().getClusterContext(clusterId).increaseMemberCount(memberCountToBeIncreased);
+// cloudControllerClient.spawnInstances(partition, clusterId, memberCountToBeIncreased);
+// }
+// return true;
+// } catch (Throwable e) {
+// log.error("Cannot spawn an instance", e);
+// }
+// return false;
+// }
public static synchronized AutoscalerRuleEvaluator getInstance() {
if (instance == null) {
@@ -184,4 +203,14 @@ public class AutoscalerRuleEvaluator {
{
return new PartitionGroupOneAfterAnother().getNextScaleDownPartition(clusterID);
}
+
+
+ public Map<String, ClusterMonitor> getMonitors() {
+ return monitors;
+ }
+
+
+ public void setMonitors(Map<String, ClusterMonitor> monitors) {
+ this.monitors = monitors;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
index 5722a57..fe79a59 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
@@ -55,34 +55,36 @@ public class ExecutorTaskScheduler implements Runnable {
final Runnable rulesEvaluator = new Runnable() {
public void run() {
- try {
- for (Service service : TopologyManager.getTopology().getServices()) {
-
- AutoscalerRuleEvaluator.getInstance().evaluate(service);
- }
-
- // Remove cluster context if its already removed from Topology
- for (String clusterContextId : AutoscalerContext.getInstance().getClusterContexes().keySet()) {
- boolean clusterAvailable = false;
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.getClusterId().equals(clusterContextId)) {
-
- clusterAvailable = true;
- }
- }
- }
-
- if (!clusterAvailable) {
- AutoscalerContext.getInstance().removeClusterContext(clusterContextId);
- }
- }
-
- } catch (Exception e) {
- log.error("Error", e);
- log.debug("Shutting down rule scheduler");
- ex.shutdownNow();
- }
+// try {
+// for (Service service : TopologyManager.getTopology().getServices()) {
+//
+// AutoscalerRuleEvaluator.getInstance().evaluate(service);
+// }
+//
+// // Remove cluster context if its already removed from Topology
+// for (String clusterContextId : AutoscalerContext.getInstance().getClusterContexes().keySet()) {
+// boolean clusterAvailable = false;
+// for (Service service : TopologyManager.getTopology().getServices()) {
+// for (Cluster cluster : service.getClusters()) {
+// if (cluster.getClusterId().equals(clusterContextId)) {
+//
+// clusterAvailable = true;
+// }
+// }
+// }
+//
+// if (!clusterAvailable) {
+// AutoscalerContext.getInstance().removeClusterContext(clusterContextId);
+// }
+// }
+//
+// } catch (Exception e) {
+// String msg = "Error while evaluating rules.";
+// log.error(msg, e);
+// throw new RuntimeException(msg, e);
+//// log.debug("Shutting down rule scheduler");
+//// ex.shutdownNow();
+// }
}
};
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
new file mode 100644
index 0000000..ca79578
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.topology.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.ClusterContext;
+import org.apache.stratos.autoscaler.ClusterMonitor;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoscalerUtil;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListener;
+import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
+import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+import java.util.Collection;
+
+/**
+ * Load balancer topology receiver.
+ */
+public class AutoscalerTopologyReceiver implements Runnable {
+
+ private static final Log log = LogFactory.getLog(AutoscalerTopologyReceiver.class);
+
+ private TopologyReceiver topologyReceiver;
+ private boolean terminated;
+
+ public AutoscalerTopologyReceiver() {
+ this.topologyReceiver = new TopologyReceiver(createMessageDelegator());
+ }
+
+ @Override
+ public void run() {
+ Thread thread = new Thread(topologyReceiver);
+ thread.start();
+ if(log.isInfoEnabled()) {
+ log.info("Load balancer topology receiver thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated);
+ if(log.isInfoEnabled()) {
+ log.info("Load balancer topology receiver thread terminated");
+ }
+ }
+
+ private TopologyEventMessageDelegator createMessageDelegator() {
+ TopologyEventProcessorChain processorChain = createEventProcessorChain();
+ final TopologyEventMessageDelegator messageDelegator = new TopologyEventMessageDelegator(processorChain);
+ messageDelegator.addCompleteTopologyEventListener(new CompleteTopologyEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ TopologyManager.acquireReadLock();
+ for(Service service : TopologyManager.getTopology().getServices()) {
+ for(Cluster cluster : service.getClusters()) {
+ addClusterToContext(cluster);
+ }
+ }
+ }
+ finally {
+ TopologyManager.releaseReadLock();
+ }
+ // Complete topology is only consumed once, remove listener
+ messageDelegator.removeCompleteTopologyEventListener(this);
+ }
+
+ });
+ return messageDelegator;
+ }
+
+ private TopologyEventProcessorChain createEventProcessorChain() {
+ // Listen to topology events that affect clusters
+ TopologyEventProcessorChain processorChain = new TopologyEventProcessorChain();
+ processorChain.addEventListener(new ClusterCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ ClusterCreatedEvent e = (ClusterCreatedEvent) event;
+ TopologyManager.acquireReadLock();
+ Service service = TopologyManager.getTopology().getService(e.getServiceName());
+ Cluster cluster = service.getCluster(e.getClusterId());
+ addClusterToContext(cluster);
+ }
+ finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ });
+
+ processorChain.addEventListener(new ClusterRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ ClusterRemovedEvent e = (ClusterRemovedEvent) event;
+ TopologyManager.acquireReadLock();
+
+ removeClusterFromContext(e.getClusterId());
+ }
+ finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ });
+
+ processorChain.addEventListener(new MemberActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+// try {
+// TopologyManager.acquireReadLock();
+//
+// // Add cluster to the context when its first member is activated
+// MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent)event;
+// Cluster cluster = findCluster(memberActivatedEvent.getClusterId());
+// if(cluster == null) {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster not found in topology: [cluster] %s", memberActivatedEvent.getClusterId()));
+// }
+// }
+// addClusterToContext(cluster);
+// }
+// finally {
+// TopologyManager.releaseReadLock();
+// }
+ }
+ });
+ processorChain.addEventListener(new ServiceRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+// try {
+// TopologyManager.acquireReadLock();
+//
+// // Remove all clusters of given service from context
+// ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event;
+// for(Service service : TopologyManager.getTopology().getServices()) {
+// for(Cluster cluster : service.getClusters()) {
+// removeClusterFromContext(cluster.getHostName());
+// }
+// }
+// }
+// finally {
+// TopologyManager.releaseReadLock();
+// }
+ }
+ });
+ return processorChain;
+ }
+
+ private void addClusterToContext(Cluster cluster) {
+ ClusterContext ctxt = AutoscalerUtil.getClusterContext(cluster);
+ AutoscalerRuleEvaluator ruleCtxt = AutoscalerRuleEvaluator.getInstance();
+ ClusterMonitor monitor =
+ new ClusterMonitor(cluster.getClusterId(), ctxt,
+ ruleCtxt.getStatefulSession());
+ Thread th = new Thread(monitor);
+ th.start();
+ AutoscalerRuleEvaluator.getInstance().addMonitor(monitor);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster monitor has been added: [cluster] %s",
+ cluster.getClusterId()));
+ }
+ }
+
+ private void removeClusterFromContext(String clusterId) {
+ ClusterMonitor monitor = AutoscalerRuleEvaluator.getInstance().removeMonitor(clusterId);
+ monitor.destroy();
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Cluster monitor has been removed: [cluster] %s ", clusterId));
+ }
+ }
+
+ private Cluster findCluster(String clusterId) {
+ if(clusterId == null) {
+ return null;
+ }
+
+ Collection<Service> services = TopologyManager.getTopology().getServices();
+ for (Service service : services) {
+ for (Cluster cluster : service.getClusters()) {
+ if (clusterId.equals(cluster.getClusterId())) {
+ return cluster;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Terminate load balancer topology receiver thread.
+ */
+ public void terminate() {
+ topologyReceiver.terminate();
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/TopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/TopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/TopologyReceiver.java
new file mode 100644
index 0000000..09c4a30
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/TopologyReceiver.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.topology.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageReceiver;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * A thread for receiving topology information from message broker.
+ */
+public class TopologyReceiver implements Runnable {
+ private static final Log log = LogFactory.getLog(TopologyReceiver.class);
+ private TopologyEventMessageDelegator messageDelegator;
+ private TopicSubscriber topicSubscriber;
+ private boolean terminated;
+
+ public TopologyReceiver() {
+ this.messageDelegator = new TopologyEventMessageDelegator();
+ }
+
+ public TopologyReceiver(TopologyEventMessageDelegator messageDelegator) {
+ this.messageDelegator = messageDelegator;
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Start topic subscriber thread
+ topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+ topicSubscriber.setMessageListener(new TopologyEventMessageReceiver());
+ Thread subscriberThread = new Thread(topicSubscriber);
+ subscriberThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Topology event message receiver thread started");
+ }
+
+ // Start topology event message delegator thread
+ Thread receiverThread = new Thread(messageDelegator);
+ receiverThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Topology event message delegator thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated);
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Topology receiver failed", e);
+ }
+ }
+ }
+
+ public void terminate() {
+ topicSubscriber.terminate();
+ messageDelegator.terminate();
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
index 16143f9..3b7cef8 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
@@ -19,14 +19,11 @@
package org.apache.stratos.autoscaler.util;
-import org.apache.stratos.autoscaler.AutoscalerContext;
import org.apache.stratos.autoscaler.ClusterContext;
import org.apache.stratos.autoscaler.policy.PolicyManager;
import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.policy.model.DeploymentPolicy;
import org.apache.stratos.autoscaler.policy.model.LoadThresholds;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
import org.apache.stratos.messaging.domain.topology.Cluster;
@@ -44,40 +41,40 @@ public class AutoscalerUtil {
* @param cluster
* @return ClusterContext - Updated ClusterContext
*/
- public static ClusterContext updateClusterContext(Cluster cluster) {
- AutoscalerContext context = AutoscalerContext.getInstance();
- ClusterContext clusterContext = context.getClusterContext(cluster.getClusterId());
- if (null == clusterContext) {
+ public static ClusterContext getClusterContext(Cluster cluster) {
+ // FIXME fix the following code to correctly update
+ // AutoscalerContext context = AutoscalerContext.getInstance();
+ if (null == cluster) {
+ return null;
+ }
- clusterContext = new ClusterContext(cluster.getClusterId(), cluster.getServiceName());
- AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(cluster.getAutoscalePolicyName());
+ AutoscalePolicy policy =
+ PolicyManager.getInstance()
+ .getAutoscalePolicy(cluster.getAutoscalePolicyName());
+ DeploymentPolicy deploymentPolicy =
+ PolicyManager.getInstance()
+ .getDeploymentPolicy(cluster.getDeploymentPolicyName());
- if(policy!=null){
+ ClusterContext clusterContext =
+ new ClusterContext(cluster.getClusterId(),
+ cluster.getServiceName(),
+ deploymentPolicy);
- //get values from policy
- LoadThresholds loadThresholds = policy.getLoadThresholds();
- float averageLimit = loadThresholds.getRequestsInFlight().getAverage();
- float gradientLimit = loadThresholds.getRequestsInFlight().getGradient();
- float secondDerivative = loadThresholds.getRequestsInFlight().getSecondDerivative();
+ if (policy != null) {
+ // get values from policy
+ LoadThresholds loadThresholds = policy.getLoadThresholds();
+ float averageLimit = loadThresholds.getRequestsInFlight().getAverage();
+ float gradientLimit = loadThresholds.getRequestsInFlight().getGradient();
+ float secondDerivative = loadThresholds.getRequestsInFlight().getSecondDerivative();
- clusterContext.setRequestsInFlightGradient(gradientLimit);
- clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
- clusterContext.setAverageRequestsInFlight(averageLimit);
- DeploymentPolicy deploymentPolicy = PolicyManager.getInstance().getDeploymentPolicy(cluster.getDeploymentPolicyName());
- if(deploymentPolicy!=null){
- for(PartitionGroup group :deploymentPolicy.getPartitionGroups()){
- for (Partition partition : group.getPartitions()) {
- clusterContext.addPartitionCount(partition.getId(), 0);
- }
- }
- }
-
- }
+ clusterContext.setRequestsInFlightGradient(gradientLimit);
+ clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
+ clusterContext.setAverageRequestsInFlight(averageLimit);
- context.addClusterContext(clusterContext);
- }
- return clusterContext;
- }
+ }
+
+ return clusterContext;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/test/java/org/apache/stratos/autoscaler/TestKnowledgeBase.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/test/java/org/apache/stratos/autoscaler/TestKnowledgeBase.java b/components/org.apache.stratos.autoscaler/src/test/java/org/apache/stratos/autoscaler/TestKnowledgeBase.java
index b719995..cd6476c 100644
--- a/components/org.apache.stratos.autoscaler/src/test/java/org/apache/stratos/autoscaler/TestKnowledgeBase.java
+++ b/components/org.apache.stratos.autoscaler/src/test/java/org/apache/stratos/autoscaler/TestKnowledgeBase.java
@@ -19,19 +19,34 @@
package org.apache.stratos.autoscaler;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.drools.runtime.rule.FactHandle;
+import org.drools.KnowledgeBase;
+import org.drools.KnowledgeBaseFactory;
import org.drools.builder.*;
import org.drools.io.Resource;
import org.drools.io.ResourceFactory;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.StatelessKnowledgeSession;
+import org.junit.Before;
import org.junit.Test;
public class TestKnowledgeBase {
private static final Log log = LogFactory.getLog(TestKnowledgeBase.class);
- private String droolsFilePath = "../../products/autoscaler/modules/distribution/src/main/conf/autoscaler.drl";
+ private String droolsFilePath = "src/test/resources/test-minimum-autoscaler-rule.drl";
+ private KnowledgeBase kbase;
+ private StatefulKnowledgeSession ksession;
+ private StatelessKnowledgeSession ksession1;
- @Test
- public void testKnowledgeBase() {
+ @Before
+ public void setUp() {
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
Resource resource = ResourceFactory.newFileResource(droolsFilePath);
kbuilder.add(resource, ResourceType.DRL);
@@ -46,5 +61,46 @@ public class TestKnowledgeBase {
}
throw new IllegalArgumentException(String.format("Could not parse drools file: %s", droolsFilePath));
}
+
+ kbase = KnowledgeBaseFactory.newKnowledgeBase();
+ kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
+ }
+
+ @Test
+ public void testMinimumRule() {
+ if(kbase == null) {
+ throw new IllegalArgumentException("Knowledge base is null.");
+ }
+
+// ksession1 = kbase.newStatelessKnowledgeSession();
+ ksession = kbase.newStatefulKnowledgeSession();
+ List<String> p = new ArrayList<String>();
+ p.add("aa");
+ p.add("bb");
+// p.setId("pp");
+// ksession.setGlobal("pa", p);
+// ksession.setGlobal("log", log);
+// ksession.setGlobal("$manager", PolicyManager.getInstance());
+// ksession.setGlobal("$topology", TopologyManager.getTopology());
+// ksession.setGlobal("$evaluator", this);
+// ksession1.execute(p);
+// FactHandle handle = ksession.insert(p);
+ ksession.insert(p);
+ ksession.fireAllRules();
+// p = new Partition();
+// p.setId("3");
+// ksession.update(handle, p);
+// ksession.fireAllRules();
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+// System.err.println(p.getId());
+// ksession1.execute(p);
+// ksession.insert(p);
+// ksession.execute(p);
+
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl b/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl
new file mode 100644
index 0000000..cf9982f
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.rule;
+
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.ClusterContext;
+import org.apache.stratos.autoscaler.Constants;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.policy.model.RequestsInFlight;
+import org.apache.stratos.autoscaler.policy.model.LoadThresholds;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.algorithm.AutoscaleAlgorithm;
+import org.apache.stratos.autoscaler.algorithm.OneAfterAnother;
+import org.apache.stratos.autoscaler.algorithm.RoundRobin;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.commons.logging.Log;
+
+global org.apache.stratos.autoscaler.policy.PolicyManager $manager;
+global org.apache.stratos.autoscaler.AutoscalerContext $context;
+global org.apache.commons.logging.Log log;
+global org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator $evaluator;
+global org.apache.stratos.messaging.domain.topology.Topology $topology;
+
+
+rule "Minimum Rule"
+dialect "mvel"
+ when
+ $service : Service ()
+ $cluster : Cluster () from $service.getClusters()
+ $deploymentPolicy : DeploymentPolicy(id == $cluster.getDeploymentPolicyName() ) from $manager.getDeploymentPolicyList()
+ $partitionGroup : PartitionGroup () from $deploymentPolicy.getPartitionGroups()
+ $partition : Partition () from $partitionGroup.getPartitions()
+ $clusterContext : ClusterContext() from $context.getClusterContext($cluster.getClusterId())
+ eval($clusterContext.getMemberCount($partition.getId()) < $partition.getPartitionMembersMin())
+
+
+ then
+ int memberCountToBeIncreased = 1 ;
+ if($evaluator.delegateSpawn($partition,$cluster.getClusterId(), memberCountToBeIncreased)){
+ $clusterContext.increaseMemberCountInPartitionBy($partition.getId(), memberCountToBeIncreased);
+ }
+end
+
+rule "Scaler-up Rule"
+dialect "mvel"
+ when
+ $service : Service ()
+ $cluster : Cluster () from $service.getClusters()
+ $autoScalingPolicy : AutoscalePolicy(id == $cluster.getAutoscalePolicyName() ) from $manager.getAutoscalePolicyList()
+ $deploymentPolicy : DeploymentPolicy(id == $cluster.getDeploymentPolicyName() ) from $manager.getDeploymentPolicyList()
+ $partitionGroup : PartitionGroup () from $deploymentPolicy.getPartitionGroups()
+ $clusterContext : ClusterContext() from $context.getClusterContext($cluster.getClusterId())
+ $loadThresholds :LoadThresholds() from $autoScalingPolicy.getLoadThresholds()
+
+ autoscaleAlgorithm : AutoscaleAlgorithm() from $evaluator.getAutoscaleAlgorithm($partitionGroup.getPartitionAlgo())
+ lbStatAverage : Float() from $clusterContext.getAverageRequestsInFlight()
+ lbStatGradient : Float() from $clusterContext.getRequestsInFlightGradient()
+ lbStatSecondDerivative : Float() from $clusterContext.getRequestsInFlightSecondDerivative()
+ averageLimit : Float() from $loadThresholds.getRequestsInFlight().getAverage()
+ gradientLimit : Float() from $loadThresholds.getRequestsInFlight().getGradient()
+ secondDerivative : Float() from $loadThresholds.getRequestsInFlight().getSecondDerivative()
+ partition : Partition() from autoscaleAlgorithm.getNextScaleUpPartition($partitionGroup, $cluster.getClusterId())
+ eval (lbStatAverage > averageLimit && lbStatGradient > gradientLimit)
+ then
+ int numberOfInstancesToBeSpawned = (lbStatSecondDerivative > secondDerivative) ? 2 : 1; // take from a config
+ $evaluator.delegateSpawn(partition,$cluster.getClusterId(), numberOfInstancesToBeSpawned);
+ $clusterContext.setRequestsInFlightGradient(gradientLimit);
+ $clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
+ $clusterContext.setAverageRequestsInFlight(averageLimit);
+end
+
+rule "Scaler-down Rule"
+dialect "mvel"
+ when
+ $service : Service ()
+ $cluster : Cluster () from $service.getClusters()
+ $autoScalingPolicy : AutoscalePolicy(id == $cluster.getAutoscalePolicyName() ) from $manager.getAutoscalePolicyList()
+ $deploymentPolicy : DeploymentPolicy(id == $cluster.getDeploymentPolicyName() ) from $manager.getDeploymentPolicyList()
+ $partitionGroup : PartitionGroup () from $deploymentPolicy.getPartitionGroups()
+ $clusterContext : ClusterContext() from $context.getClusterContext($cluster.getClusterId())
+ $loadThresholds :LoadThresholds() from $autoScalingPolicy.getLoadThresholds()
+
+ autoscaleAlgorithm : AutoscaleAlgorithm() from $evaluator.getAutoscaleAlgorithm($partitionGroup.getPartitionAlgo())
+ lbStatAverage : Float() from $clusterContext.getAverageRequestsInFlight()
+ lbStatGradient : Float() from $clusterContext.getRequestsInFlightGradient()
+ lbStatSecondDerivative : Float() from $clusterContext.getRequestsInFlightSecondDerivative()
+ averageLimit : Float() from $loadThresholds.getRequestsInFlight().getAverage()
+ gradientLimit : Float() from $loadThresholds.getRequestsInFlight().getGradient()
+ secondDerivative : Float() from $loadThresholds.getRequestsInFlight().getSecondDerivative()
+ scaleDownSlowerMarginOfGradient : Float() from $loadThresholds.getRequestsInFlight().getScaleDownMarginOfGradient()
+ scaleDownSlowerMarginOfSecondDerivative : Float() from $loadThresholds.getRequestsInFlight().getScaleDownMarginOfSecondDerivative()
+ partition : Partition() from autoscaleAlgorithm.getNextScaleDownPartition($partitionGroup, $cluster.getClusterId())
+ eval(lbStatAverage < averageLimit && lbStatGradient < gradientLimit - scaleDownSlowerMarginOfSecondDerivative
+ && lbStatSecondDerivative < secondDerivative - scaleDownSlowerMarginOfSecondDerivative)
+ then
+ $evaluator.delegateTerminate(partition,$cluster.getClusterId());
+ $clusterContext.setRequestsInFlightGradient(gradientLimit);
+ $clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
+ $clusterContext.setAverageRequestsInFlight(averageLimit);
+end