You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/24 18:36:25 UTC

[02/12] stratos git commit: Renaming iaas classes and moving them to new packages

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsGenerator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsGenerator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsGenerator.java
new file mode 100644
index 0000000..2766928
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsGenerator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.iaases.mock.service.statistics.generator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.iaases.mock.service.config.MockIaasConfig;
+import org.apache.stratos.common.threading.StratosThreadPool;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Mock health statistics generator.
+ */
+public class MockHealthStatisticsGenerator {
+
+    private static final Log log = LogFactory.getLog(MockHealthStatisticsGenerator.class);
+
+    private static volatile MockHealthStatisticsGenerator instance;
+    private static final ScheduledExecutorService scheduledExecutorService =
+            StratosThreadPool.getScheduledExecutorService("MOCK_STATISTICS_GENERATOR_EXECUTOR_SERVICE", 10);
+
+    private boolean scheduled;
+    // Map<ServiceName, List<ScheduledFuture>>
+    private Map<String, Map<String, ScheduledFuture>> serviceNameToTaskListMap;
+
+    public static MockHealthStatisticsGenerator getInstance() {
+        if (instance == null) {
+            synchronized (MockHealthStatisticsGenerator.class) {
+                if (instance == null) {
+                    instance = new MockHealthStatisticsGenerator();
+                }
+            }
+        }
+        return instance;
+    }
+
+    private MockHealthStatisticsGenerator() {
+        serviceNameToTaskListMap = new ConcurrentHashMap<String, Map<String, ScheduledFuture>>();
+    }
+
+    /**
+     * Schedule statistics updater tasks for the given service/cartridge type.
+     *
+     * @param serviceName
+     */
+    public void scheduleStatisticsUpdaterTasks(String serviceName) {
+        synchronized (MockHealthStatisticsGenerator.class) {
+            if (!statisticsUpdaterTasksScheduled(serviceName)) {
+                List<MockHealthStatisticsPattern> statisticsPatterns = MockIaasConfig.getInstance().
+                        getMockHealthStatisticsConfig().getStatisticsPatterns();
+
+                Map<String, ScheduledFuture> taskList = serviceNameToTaskListMap.get(serviceName);
+                if (taskList == null) {
+                    taskList = new ConcurrentHashMap<String, ScheduledFuture>();
+                    serviceNameToTaskListMap.put(serviceName, taskList);
+                }
+
+                for (MockHealthStatisticsPattern statisticsPattern : statisticsPatterns) {
+                    if (statisticsPattern.getCartridgeType().equals(serviceName) &&
+                            (statisticsPattern.getSampleDuration() > 0)) {
+                        MockHealthStatisticsUpdater runnable = new MockHealthStatisticsUpdater(statisticsPattern);
+                        ScheduledFuture<?> task = scheduledExecutorService.scheduleAtFixedRate(runnable, 0,
+                                statisticsPattern.getSampleDuration(), TimeUnit.SECONDS);
+                        taskList.put(statisticsPattern.getFactor().toString(), task);
+                    }
+                }
+
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Mock statistics updaters scheduled: [service-name] %s", serviceName));
+                }
+            }
+        }
+    }
+
+    /**
+     * Stop statistics updater tasks of the given service/cartridge type.
+     *
+     * @param serviceName
+     */
+    public void stopStatisticsUpdaterTasks(String serviceName) {
+        synchronized (MockHealthStatisticsGenerator.class) {
+            Map<String, ScheduledFuture> taskMap = serviceNameToTaskListMap.get(serviceName);
+            if ((taskMap != null) && (taskMap.size() > 0)) {
+                Iterator<String> factorIterator = taskMap.keySet().iterator();
+                while(factorIterator.hasNext()) {
+                    String factor = factorIterator.next();
+                    stopStatisticsUpdaterTask(serviceName, factor);
+                }
+            }
+        }
+    }
+
+    /**
+     * Stop statistics updater task of a service/cartridge type, factor.
+     * @param serviceName
+     * @param factor
+     */
+    public void stopStatisticsUpdaterTask(String serviceName, String factor) {
+        Map<String, ScheduledFuture> factorToTaskMap = serviceNameToTaskListMap.get(serviceName);
+        if(factorToTaskMap != null) {
+            ScheduledFuture task = factorToTaskMap.get(factor);
+            if(task != null) {
+                task.cancel(true);
+                factorToTaskMap.remove(factor);
+
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Mock statistics updater task stopped: [service-name] %s" +
+                            " [factor] %s", serviceName, factor));
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns true if there are statistics updater tasks scheduled for the given service/cartridge type
+     * else returns false.
+     * @param serviceName
+     * @return
+     */
+    public boolean statisticsUpdaterTasksScheduled(String serviceName) {
+        Map<String, ScheduledFuture> tasks = serviceNameToTaskListMap.get(serviceName);
+        return ((tasks != null) && (tasks.size() > 0));
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsPattern.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsPattern.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsPattern.java
new file mode 100644
index 0000000..d7db919
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsPattern.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.iaases.mock.service.statistics.generator;
+
+import org.apache.stratos.cloud.controller.iaases.mock.service.MockAutoscalingFactor;
+import org.apache.stratos.cloud.controller.iaases.mock.service.exceptions.ContinueLastSampleValueException;
+import org.apache.stratos.cloud.controller.iaases.mock.service.exceptions.NoSampleValuesFoundException;
+import org.apache.stratos.cloud.controller.iaases.mock.service.exceptions.StopStatisticsPublishingException;
+import org.apache.stratos.cloud.controller.iaases.mock.service.statistics.StatisticsPatternMode;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Mock health statistics pattern definition.
+ */
+public class MockHealthStatisticsPattern {
+
+    private String cartridgeType;
+    private MockAutoscalingFactor factor;
+    private StatisticsPatternMode mode;
+    private List<Integer> sampleValues;
+    private int sampleDuration;
+    private Iterator sampleValuesIterator;
+
+    public MockHealthStatisticsPattern(String cartridgeType, MockAutoscalingFactor factor, StatisticsPatternMode mode, List<Integer> sampleValues,
+                                       int sampleDuration) {
+        this.cartridgeType = cartridgeType;
+        this.factor = factor;
+        this.mode = mode;
+        this.sampleValues = sampleValues;
+        this.sampleValuesIterator = this.sampleValues.iterator();
+        this.sampleDuration = sampleDuration;
+    }
+
+    public String getCartridgeType() {
+        return cartridgeType;
+    }
+
+    /**
+     * Returns autoscaling factor
+     * @return
+     */
+    public MockAutoscalingFactor getFactor() {
+        return factor;
+    }
+
+    /**
+     * Returns statistics pattern mode
+     * @return
+     */
+    public StatisticsPatternMode getMode() {
+        return mode;
+    }
+
+    /**
+     * Returns next sample value
+     * @return
+     */
+    public int getNextSample() throws NoSampleValuesFoundException, StopStatisticsPublishingException,
+            ContinueLastSampleValueException {
+        if((sampleValues == null) || (sampleValues.size() < 1)) {
+            throw new NoSampleValuesFoundException();
+        }
+
+        if(!sampleValuesIterator.hasNext()) {
+            // Iterator has come to the end of the list
+            if(getMode() == StatisticsPatternMode.Loop) {
+                // Looping: reset the iterator
+                sampleValuesIterator = sampleValues.iterator();
+                return Integer.parseInt(sampleValuesIterator.next().toString());
+            } else if(getMode() == StatisticsPatternMode.Continue) {
+                // Continue: return the last value
+                int lastSampleValue = Integer.parseInt(sampleValues.get(sampleValues.size() - 1).toString());
+                throw new ContinueLastSampleValueException(lastSampleValue);
+            } else if(getMode() == StatisticsPatternMode.Stop) {
+                throw new StopStatisticsPublishingException();
+            } else {
+                throw new RuntimeException("An unknown statistics pattern mode found");
+            }
+        } else {
+            return Integer.parseInt(sampleValuesIterator.next().toString());
+        }
+    }
+
+    /**
+     * Returns sample duration in seconds
+     * @return
+     */
+    public int getSampleDuration() {
+        return sampleDuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsUpdater.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsUpdater.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsUpdater.java
new file mode 100644
index 0000000..083aa21
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/generator/MockHealthStatisticsUpdater.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.iaases.mock.service.statistics.generator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.iaases.mock.service.exceptions.ContinueLastSampleValueException;
+import org.apache.stratos.cloud.controller.iaases.mock.service.exceptions.NoSampleValuesFoundException;
+import org.apache.stratos.cloud.controller.iaases.mock.service.exceptions.StopStatisticsPublishingException;
+import org.apache.stratos.cloud.controller.iaases.mock.service.statistics.MockHealthStatistics;
+
+/**
+ * Update health statistics according to the given sample pattern, for each pattern there will be
+ * one updater runnable created.
+ */
+public class MockHealthStatisticsUpdater implements Runnable {
+
+    private static final Log log = LogFactory.getLog(MockHealthStatisticsUpdater.class);
+
+    private MockHealthStatisticsPattern statisticsPattern;
+
+    public MockHealthStatisticsUpdater(MockHealthStatisticsPattern statisticsPattern) {
+        this.statisticsPattern = statisticsPattern;
+    }
+
+    @Override
+    public void run() {
+        try {
+            int nextSample = statisticsPattern.getNextSample();
+            MockHealthStatistics.getInstance().addStatistics(statisticsPattern.getCartridgeType(),
+                    statisticsPattern.getFactor(), nextSample);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Mock statistics updated: [cartridge-type] %s [factor] %s [value] %d",
+                        statisticsPattern.getCartridgeType(), statisticsPattern.getFactor().toString(), nextSample));
+            }
+        } catch (NoSampleValuesFoundException ignore) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("No sample values found for: [cartridge-type] %s [factor] %s",
+                        statisticsPattern.getCartridgeType(), statisticsPattern.getFactor().toString()));
+            }
+        } catch (ContinueLastSampleValueException e) {
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Continuing last sample value: [cartridge-type] %s [factor] %s [value] %d",
+                        statisticsPattern.getCartridgeType(), statisticsPattern.getFactor().toString(),
+                        e.getLastSampleValue()));
+            }
+            // Stop statistics updater task
+            MockHealthStatisticsGenerator.getInstance().stopStatisticsUpdaterTask(statisticsPattern.getCartridgeType(),
+                    statisticsPattern.getFactor().toString());
+        } catch (StopStatisticsPublishingException action) {
+            // Remove statistics
+            MockHealthStatistics.getInstance().removeStatistics(statisticsPattern.getCartridgeType(),
+                    statisticsPattern.getFactor());
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Statistics removed: [cartridge-type] %s [factor] %s",
+                        statisticsPattern.getCartridgeType(), statisticsPattern.getFactor().toString()));
+            }
+            // Stop statistics updater task
+            MockHealthStatisticsGenerator.getInstance().stopStatisticsUpdaterTask(statisticsPattern.getCartridgeType(),
+                    statisticsPattern.getFactor().toString());
+        } catch (Exception e) {
+            log.error("Could not update mock statistics", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/publisher/MockHealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/publisher/MockHealthStatisticsNotifier.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/publisher/MockHealthStatisticsNotifier.java
new file mode 100644
index 0000000..9aef223
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/publisher/MockHealthStatisticsNotifier.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.iaases.mock.service.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.iaases.mock.service.MockAutoscalingFactor;
+import org.apache.stratos.cloud.controller.iaases.mock.service.MockMemberContext;
+import org.apache.stratos.cloud.controller.iaases.mock.service.exceptions.NoStatisticsFoundException;
+import org.apache.stratos.cloud.controller.iaases.mock.service.statistics.MockHealthStatistics;
+
+/**
+ * Health statistics notifier thread for publishing statistics periodically to CEP.
+ */
+public class MockHealthStatisticsNotifier implements Runnable {
+    private static final Log log = LogFactory.getLog(MockHealthStatisticsNotifier.class);
+
+    public static final String MEMORY_CONSUMPTION = "memory_consumption";
+    public static final String LOAD_AVERAGE = "load_average";
+
+    private final MockMemberContext mockMemberContext;
+    private final MockHealthStatisticsPublisher statsPublisher;
+
+    public MockHealthStatisticsNotifier(MockMemberContext mockMemberContext) {
+        this.mockMemberContext = mockMemberContext;
+        this.statsPublisher = new MockHealthStatisticsPublisher();
+        this.statsPublisher.setEnabled(true);
+    }
+
+    @Override
+    public void run() {
+        if (!statsPublisher.isEnabled()) {
+            if (log.isWarnEnabled()) {
+                log.warn("Statistics publisher is disabled");
+            }
+            return;
+        }
+
+        try {
+            double memoryConsumption = MockHealthStatistics.getInstance().getStatistics(
+                    mockMemberContext.getServiceName(), MockAutoscalingFactor.MemoryConsumption);
+
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Publishing memory consumption: [member-id] %s [value] %f",
+                        mockMemberContext.getMemberId(), memoryConsumption));
+            }
+            statsPublisher.publish(
+                    mockMemberContext.getClusterId(),
+                    mockMemberContext.getClusterInstanceId(),
+                    mockMemberContext.getNetworkPartitionId(),
+                    mockMemberContext.getMemberId(),
+                    mockMemberContext.getPartitionId(),
+                    MEMORY_CONSUMPTION,
+                    memoryConsumption
+            );
+        } catch (NoStatisticsFoundException ignore) {
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Could not publish health statistics", e);
+            }
+        }
+
+
+        try {
+            double loadAvereage = MockHealthStatistics.getInstance().getStatistics(
+                    mockMemberContext.getServiceName(), MockAutoscalingFactor.LoadAverage);
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Publishing load average: [member-id] %s [value] %f",
+                        mockMemberContext.getMemberId(), loadAvereage));
+            }
+            statsPublisher.publish(
+                    mockMemberContext.getClusterId(),
+                    mockMemberContext.getClusterInstanceId(),
+                    mockMemberContext.getNetworkPartitionId(),
+                    mockMemberContext.getMemberId(),
+                    mockMemberContext.getPartitionId(),
+                    LOAD_AVERAGE,
+                    loadAvereage
+            );
+        } catch (NoStatisticsFoundException ignore) {
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Could not publish health statistics", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/publisher/MockHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/publisher/MockHealthStatisticsPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/publisher/MockHealthStatisticsPublisher.java
new file mode 100644
index 0000000..b2a5b27
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/service/statistics/publisher/MockHealthStatisticsPublisher.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.iaases.mock.service.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.statistics.publisher.WSO2CEPStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Health statistics publisher for publishing statistics to CEP.
+ */
+public class MockHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher {
+    private static final Log log = LogFactory.getLog(MockHealthStatisticsPublisher.class);
+
+    private static final String DATA_STREAM_NAME = "cartridge_agent_health_stats";
+    private static final String VERSION = "1.0.0";
+
+    private static StreamDefinition createStreamDefinition() {
+        try {
+            StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
+            streamDefinition.setNickName("agent health stats");
+            streamDefinition.setDescription("agent health stats");
+            // Payload definition
+            List<Attribute> payloadData = new ArrayList<Attribute>();
+            payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
+            payloadData.add(new Attribute("cluster_instance_id", AttributeType.STRING));
+            payloadData.add(new Attribute("network_partition_id", AttributeType.STRING));
+            payloadData.add(new Attribute("member_id", AttributeType.STRING));
+            payloadData.add(new Attribute("partition_id", AttributeType.STRING));
+            payloadData.add(new Attribute("health_description", AttributeType.STRING));
+            payloadData.add(new Attribute("value", AttributeType.DOUBLE));
+            streamDefinition.setPayloadData(payloadData);
+            return streamDefinition;
+        } catch (Exception e) {
+            throw new RuntimeException("Could not create stream definition", e);
+        }
+    }
+
+    public MockHealthStatisticsPublisher() {
+        super(createStreamDefinition());
+    }
+
+    /**
+     * Publish health statistics to cep.
+     * @param clusterId
+     * @param networkPartitionId
+     * @param memberId
+     * @param partitionId
+     * @param health
+     * @param value
+     */
+    public void publish(String clusterId, String clusterInstanceId, String networkPartitionId, String memberId, String partitionId, String health, double value) {
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s [partition] %s [member] %s [health] %s [value] %f",
+                    clusterId, networkPartitionId, partitionId, memberId, health, value));
+        }
+        List<Object> payload = new ArrayList<Object>();
+        // Payload values
+        payload.add(clusterId);
+        payload.add(clusterInstanceId);
+        payload.add(networkPartitionId);
+        payload.add(memberId);
+        payload.add(partitionId);
+        payload.add(health);
+        payload.add(value);
+        super.publish(payload.toArray());
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/MockHealthStatistics.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/MockHealthStatistics.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/MockHealthStatistics.java
deleted file mode 100644
index baf3b3e..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/MockHealthStatistics.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.iaases.mock.statistics;
-
-import org.apache.stratos.cloud.controller.iaases.mock.MockAutoscalingFactor;
-import org.apache.stratos.cloud.controller.iaases.mock.exceptions.NoStatisticsFoundException;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Mock health statistics singleton class.
- */
-public class MockHealthStatistics {
-    private final static int DEFAULT_MEMORY_CONSUMPTION = 20;
-    private final static int DEFAULT_LOAD_AVERAGE = 20;
-    private final static int DEFAULT_REQUESTS_IN_FLIGHT = 1;
-
-    private static volatile MockHealthStatistics instance;
-
-    private Map<String, Map<String, Integer>> statisticsMap;
-
-    private MockHealthStatistics() {
-        statisticsMap = new ConcurrentHashMap<String, Map<String, Integer>>();
-    }
-
-    public static MockHealthStatistics getInstance() {
-        if (instance == null) {
-            synchronized (MockHealthStatistics.class) {
-                if (instance == null) {
-                    instance = new MockHealthStatistics();
-                }
-            }
-        }
-        return instance;
-    }
-
-    /**
-     * Add statistics value for a cartridge type, autoscaling factor
-     * @param cartridgeType
-     * @param autoscalingFactor
-     * @param value
-     */
-    public void addStatistics(String cartridgeType, MockAutoscalingFactor autoscalingFactor, Integer value) {
-        Map<String, Integer> factorValueMap = statisticsMap.get(cartridgeType);
-        if(factorValueMap == null) {
-            synchronized (MockHealthStatistics.class) {
-                if(factorValueMap == null) {
-                    factorValueMap = new ConcurrentHashMap<String, Integer>();
-                    statisticsMap.put(cartridgeType, factorValueMap);
-                }
-            }
-        }
-        factorValueMap.put(autoscalingFactor.toString(), value);
-    }
-
-    /**
-     * Returns current statistics of the given cartridge type, autoscaling factor
-     * @param cartridgeType
-     * @param autoscalingFactor
-     * @return
-     */
-    public int getStatistics(String cartridgeType, MockAutoscalingFactor autoscalingFactor) throws NoStatisticsFoundException {
-        Map<String, Integer> factorValueMap = statisticsMap.get(cartridgeType);
-        if(factorValueMap != null) {
-            if(factorValueMap.containsKey(autoscalingFactor.toString())) {
-                return factorValueMap.get(autoscalingFactor.toString());
-            } else {
-                throw new NoStatisticsFoundException();
-            }
-        }
-        // No statistics patterns found, return default
-        return findDefault(autoscalingFactor);
-    }
-
-    /**
-     * Remove statistics found for the cartridge type, autoscaling factor
-     * @param cartridgeType
-     * @param autoscalingFactor
-     */
-    public void removeStatistics(String cartridgeType, MockAutoscalingFactor autoscalingFactor) {
-        Map<String, Integer> factorValueMap = statisticsMap.get(cartridgeType);
-        if(factorValueMap != null) {
-            if(factorValueMap.containsKey(autoscalingFactor.toString())) {
-                factorValueMap.remove(autoscalingFactor.toString());
-            }
-        }
-    }
-
-    /**
-     * Find default statistics value of the given autoscaling factor
-     * @param autoscalingFactor
-     * @return
-     */
-    private int findDefault(MockAutoscalingFactor autoscalingFactor) {
-        if(autoscalingFactor == MockAutoscalingFactor.MemoryConsumption) {
-            return DEFAULT_MEMORY_CONSUMPTION;
-        } else if(autoscalingFactor == MockAutoscalingFactor.LoadAverage) {
-            return DEFAULT_LOAD_AVERAGE;
-        } else if(autoscalingFactor == MockAutoscalingFactor.RequestInFlight) {
-            return DEFAULT_REQUESTS_IN_FLIGHT;
-        }
-        throw new RuntimeException("An unknown autoscaling factor found: " + autoscalingFactor);
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/StatisticsPatternMode.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/StatisticsPatternMode.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/StatisticsPatternMode.java
deleted file mode 100644
index 23a80d6..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/StatisticsPatternMode.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.iaases.mock.statistics;
-
-/**
- * Statistics pattern mode.
- */
-public enum StatisticsPatternMode {
-    Loop, Continue, Stop
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsGenerator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsGenerator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsGenerator.java
deleted file mode 100644
index be3e474..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsGenerator.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.iaases.mock.statistics.generator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.iaases.mock.config.MockIaasConfig;
-import org.apache.stratos.common.threading.StratosThreadPool;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Mock health statistics generator.
- */
-public class MockHealthStatisticsGenerator {
-
-    private static final Log log = LogFactory.getLog(MockHealthStatisticsGenerator.class);
-
-    private static volatile MockHealthStatisticsGenerator instance;
-    private static final ScheduledExecutorService scheduledExecutorService =
-            StratosThreadPool.getScheduledExecutorService("MOCK_STATISTICS_GENERATOR_EXECUTOR_SERVICE", 10);
-
-    private boolean scheduled;
-    // Map<ServiceName, List<ScheduledFuture>>
-    private Map<String, Map<String, ScheduledFuture>> serviceNameToTaskListMap;
-
-    public static MockHealthStatisticsGenerator getInstance() {
-        if (instance == null) {
-            synchronized (MockHealthStatisticsGenerator.class) {
-                if (instance == null) {
-                    instance = new MockHealthStatisticsGenerator();
-                }
-            }
-        }
-        return instance;
-    }
-
-    private MockHealthStatisticsGenerator() {
-        serviceNameToTaskListMap = new ConcurrentHashMap<String, Map<String, ScheduledFuture>>();
-    }
-
-    /**
-     * Schedule statistics updater tasks for the given service/cartridge type.
-     *
-     * @param serviceName
-     */
-    public void scheduleStatisticsUpdaterTasks(String serviceName) {
-        synchronized (MockHealthStatisticsGenerator.class) {
-            if (!statisticsUpdaterTasksScheduled(serviceName)) {
-                List<MockHealthStatisticsPattern> statisticsPatterns = MockIaasConfig.getInstance().
-                        getMockHealthStatisticsConfig().getStatisticsPatterns();
-
-                Map<String, ScheduledFuture> taskList = serviceNameToTaskListMap.get(serviceName);
-                if (taskList == null) {
-                    taskList = new ConcurrentHashMap<String, ScheduledFuture>();
-                    serviceNameToTaskListMap.put(serviceName, taskList);
-                }
-
-                for (MockHealthStatisticsPattern statisticsPattern : statisticsPatterns) {
-                    if (statisticsPattern.getCartridgeType().equals(serviceName) &&
-                            (statisticsPattern.getSampleDuration() > 0)) {
-                        MockHealthStatisticsUpdater runnable = new MockHealthStatisticsUpdater(statisticsPattern);
-                        ScheduledFuture<?> task = scheduledExecutorService.scheduleAtFixedRate(runnable, 0,
-                                statisticsPattern.getSampleDuration(), TimeUnit.SECONDS);
-                        taskList.put(statisticsPattern.getFactor().toString(), task);
-                    }
-                }
-
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Mock statistics updaters scheduled: [service-name] %s", serviceName));
-                }
-            }
-        }
-    }
-
-    /**
-     * Stop statistics updater tasks of the given service/cartridge type.
-     *
-     * @param serviceName
-     */
-    public void stopStatisticsUpdaterTasks(String serviceName) {
-        synchronized (MockHealthStatisticsGenerator.class) {
-            Map<String, ScheduledFuture> taskMap = serviceNameToTaskListMap.get(serviceName);
-            if ((taskMap != null) && (taskMap.size() > 0)) {
-                Iterator<String> factorIterator = taskMap.keySet().iterator();
-                while(factorIterator.hasNext()) {
-                    String factor = factorIterator.next();
-                    stopStatisticsUpdaterTask(serviceName, factor);
-                }
-            }
-        }
-    }
-
-    /**
-     * Stop statistics updater task of a service/cartridge type, factor.
-     * @param serviceName
-     * @param factor
-     */
-    public void stopStatisticsUpdaterTask(String serviceName, String factor) {
-        Map<String, ScheduledFuture> factorToTaskMap = serviceNameToTaskListMap.get(serviceName);
-        if(factorToTaskMap != null) {
-            ScheduledFuture task = factorToTaskMap.get(factor);
-            if(task != null) {
-                task.cancel(true);
-                factorToTaskMap.remove(factor);
-
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Mock statistics updater task stopped: [service-name] %s" +
-                            " [factor] %s", serviceName, factor));
-                }
-            }
-        }
-    }
-
-    /**
-     * Returns true if there are statistics updater tasks scheduled for the given service/cartridge type
-     * else returns false.
-     * @param serviceName
-     * @return
-     */
-    public boolean statisticsUpdaterTasksScheduled(String serviceName) {
-        Map<String, ScheduledFuture> tasks = serviceNameToTaskListMap.get(serviceName);
-        return ((tasks != null) && (tasks.size() > 0));
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsPattern.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsPattern.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsPattern.java
deleted file mode 100644
index f59df2d..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsPattern.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.iaases.mock.statistics.generator;
-
-import org.apache.stratos.cloud.controller.iaases.mock.MockAutoscalingFactor;
-import org.apache.stratos.cloud.controller.iaases.mock.exceptions.ContinueLastSampleValueException;
-import org.apache.stratos.cloud.controller.iaases.mock.exceptions.NoSampleValuesFoundException;
-import org.apache.stratos.cloud.controller.iaases.mock.exceptions.StopStatisticsPublishingException;
-import org.apache.stratos.cloud.controller.iaases.mock.statistics.StatisticsPatternMode;
-
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Mock health statistics pattern definition.
- */
-public class MockHealthStatisticsPattern {
-
-    private String cartridgeType;
-    private MockAutoscalingFactor factor;
-    private StatisticsPatternMode mode;
-    private List<Integer> sampleValues;
-    private int sampleDuration;
-    private Iterator sampleValuesIterator;
-
-    public MockHealthStatisticsPattern(String cartridgeType, MockAutoscalingFactor factor, StatisticsPatternMode mode, List<Integer> sampleValues,
-                                       int sampleDuration) {
-        this.cartridgeType = cartridgeType;
-        this.factor = factor;
-        this.mode = mode;
-        this.sampleValues = sampleValues;
-        this.sampleValuesIterator = this.sampleValues.iterator();
-        this.sampleDuration = sampleDuration;
-    }
-
-    public String getCartridgeType() {
-        return cartridgeType;
-    }
-
-    /**
-     * Returns autoscaling factor
-     * @return
-     */
-    public MockAutoscalingFactor getFactor() {
-        return factor;
-    }
-
-    /**
-     * Returns statistics pattern mode
-     * @return
-     */
-    public StatisticsPatternMode getMode() {
-        return mode;
-    }
-
-    /**
-     * Returns next sample value
-     * @return
-     */
-    public int getNextSample() throws NoSampleValuesFoundException, StopStatisticsPublishingException,
-            ContinueLastSampleValueException {
-        if((sampleValues == null) || (sampleValues.size() < 1)) {
-            throw new NoSampleValuesFoundException();
-        }
-
-        if(!sampleValuesIterator.hasNext()) {
-            // Iterator has come to the end of the list
-            if(getMode() == StatisticsPatternMode.Loop) {
-                // Looping: reset the iterator
-                sampleValuesIterator = sampleValues.iterator();
-                return Integer.parseInt(sampleValuesIterator.next().toString());
-            } else if(getMode() == StatisticsPatternMode.Continue) {
-                // Continue: return the last value
-                int lastSampleValue = Integer.parseInt(sampleValues.get(sampleValues.size() - 1).toString());
-                throw new ContinueLastSampleValueException(lastSampleValue);
-            } else if(getMode() == StatisticsPatternMode.Stop) {
-                throw new StopStatisticsPublishingException();
-            } else {
-                throw new RuntimeException("An unknown statistics pattern mode found");
-            }
-        } else {
-            return Integer.parseInt(sampleValuesIterator.next().toString());
-        }
-    }
-
-    /**
-     * Returns sample duration in seconds
-     * @return
-     */
-    public int getSampleDuration() {
-        return sampleDuration;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsUpdater.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsUpdater.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsUpdater.java
deleted file mode 100644
index 6e55725..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/generator/MockHealthStatisticsUpdater.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.iaases.mock.statistics.generator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.iaases.mock.exceptions.ContinueLastSampleValueException;
-import org.apache.stratos.cloud.controller.iaases.mock.exceptions.NoSampleValuesFoundException;
-import org.apache.stratos.cloud.controller.iaases.mock.exceptions.StopStatisticsPublishingException;
-import org.apache.stratos.cloud.controller.iaases.mock.statistics.MockHealthStatistics;
-
-/**
- * Update health statistics according to the given sample pattern, for each pattern there will be
- * one updater runnable created.
- */
-public class MockHealthStatisticsUpdater implements Runnable {
-
-    private static final Log log = LogFactory.getLog(MockHealthStatisticsUpdater.class);
-
-    private MockHealthStatisticsPattern statisticsPattern;
-
-    public MockHealthStatisticsUpdater(MockHealthStatisticsPattern statisticsPattern) {
-        this.statisticsPattern = statisticsPattern;
-    }
-
-    @Override
-    public void run() {
-        try {
-            int nextSample = statisticsPattern.getNextSample();
-            MockHealthStatistics.getInstance().addStatistics(statisticsPattern.getCartridgeType(),
-                    statisticsPattern.getFactor(), nextSample);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Mock statistics updated: [cartridge-type] %s [factor] %s [value] %d",
-                        statisticsPattern.getCartridgeType(), statisticsPattern.getFactor().toString(), nextSample));
-            }
-        } catch (NoSampleValuesFoundException ignore) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("No sample values found for: [cartridge-type] %s [factor] %s",
-                        statisticsPattern.getCartridgeType(), statisticsPattern.getFactor().toString()));
-            }
-        } catch (ContinueLastSampleValueException e) {
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Continuing last sample value: [cartridge-type] %s [factor] %s [value] %d",
-                        statisticsPattern.getCartridgeType(), statisticsPattern.getFactor().toString(),
-                        e.getLastSampleValue()));
-            }
-            // Stop statistics updater task
-            MockHealthStatisticsGenerator.getInstance().stopStatisticsUpdaterTask(statisticsPattern.getCartridgeType(),
-                    statisticsPattern.getFactor().toString());
-        } catch (StopStatisticsPublishingException action) {
-            // Remove statistics
-            MockHealthStatistics.getInstance().removeStatistics(statisticsPattern.getCartridgeType(),
-                    statisticsPattern.getFactor());
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Statistics removed: [cartridge-type] %s [factor] %s",
-                        statisticsPattern.getCartridgeType(), statisticsPattern.getFactor().toString()));
-            }
-            // Stop statistics updater task
-            MockHealthStatisticsGenerator.getInstance().stopStatisticsUpdaterTask(statisticsPattern.getCartridgeType(),
-                    statisticsPattern.getFactor().toString());
-        } catch (Exception e) {
-            log.error("Could not update mock statistics", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/publisher/MockHealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/publisher/MockHealthStatisticsNotifier.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/publisher/MockHealthStatisticsNotifier.java
deleted file mode 100644
index ca345b4..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/publisher/MockHealthStatisticsNotifier.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.iaases.mock.statistics.publisher;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.iaases.mock.MockAutoscalingFactor;
-import org.apache.stratos.cloud.controller.iaases.mock.MockMemberContext;
-import org.apache.stratos.cloud.controller.iaases.mock.exceptions.NoStatisticsFoundException;
-import org.apache.stratos.cloud.controller.iaases.mock.statistics.MockHealthStatistics;
-
-/**
- * Health statistics notifier thread for publishing statistics periodically to CEP.
- */
-public class MockHealthStatisticsNotifier implements Runnable {
-    private static final Log log = LogFactory.getLog(MockHealthStatisticsNotifier.class);
-
-    public static final String MEMORY_CONSUMPTION = "memory_consumption";
-    public static final String LOAD_AVERAGE = "load_average";
-
-    private final MockMemberContext mockMemberContext;
-    private final MockHealthStatisticsPublisher statsPublisher;
-
-    public MockHealthStatisticsNotifier(MockMemberContext mockMemberContext) {
-        this.mockMemberContext = mockMemberContext;
-        this.statsPublisher = new MockHealthStatisticsPublisher();
-        this.statsPublisher.setEnabled(true);
-    }
-
-    @Override
-    public void run() {
-        if (!statsPublisher.isEnabled()) {
-            if (log.isWarnEnabled()) {
-                log.warn("Statistics publisher is disabled");
-            }
-            return;
-        }
-
-        try {
-            double memoryConsumption = MockHealthStatistics.getInstance().getStatistics(
-                    mockMemberContext.getServiceName(), MockAutoscalingFactor.MemoryConsumption);
-
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing memory consumption: [member-id] %s [value] %f",
-                        mockMemberContext.getMemberId(), memoryConsumption));
-            }
-            statsPublisher.publish(
-                    mockMemberContext.getClusterId(),
-                    mockMemberContext.getClusterInstanceId(),
-                    mockMemberContext.getNetworkPartitionId(),
-                    mockMemberContext.getMemberId(),
-                    mockMemberContext.getPartitionId(),
-                    MEMORY_CONSUMPTION,
-                    memoryConsumption
-            );
-        } catch (NoStatisticsFoundException ignore) {
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Could not publish health statistics", e);
-            }
-        }
-
-
-        try {
-            double loadAvereage = MockHealthStatistics.getInstance().getStatistics(
-                    mockMemberContext.getServiceName(), MockAutoscalingFactor.LoadAverage);
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing load average: [member-id] %s [value] %f",
-                        mockMemberContext.getMemberId(), loadAvereage));
-            }
-            statsPublisher.publish(
-                    mockMemberContext.getClusterId(),
-                    mockMemberContext.getClusterInstanceId(),
-                    mockMemberContext.getNetworkPartitionId(),
-                    mockMemberContext.getMemberId(),
-                    mockMemberContext.getPartitionId(),
-                    LOAD_AVERAGE,
-                    loadAvereage
-            );
-        } catch (NoStatisticsFoundException ignore) {
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Could not publish health statistics", e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/publisher/MockHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/publisher/MockHealthStatisticsPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/publisher/MockHealthStatisticsPublisher.java
deleted file mode 100644
index 90c641d..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/statistics/publisher/MockHealthStatisticsPublisher.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.iaases.mock.statistics.publisher;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.statistics.publisher.WSO2CEPStatisticsPublisher;
-import org.wso2.carbon.databridge.commons.Attribute;
-import org.wso2.carbon.databridge.commons.AttributeType;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Health statistics publisher for publishing statistics to CEP.
- */
-public class MockHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher {
-    private static final Log log = LogFactory.getLog(MockHealthStatisticsPublisher.class);
-
-    private static final String DATA_STREAM_NAME = "cartridge_agent_health_stats";
-    private static final String VERSION = "1.0.0";
-
-    private static StreamDefinition createStreamDefinition() {
-        try {
-            StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
-            streamDefinition.setNickName("agent health stats");
-            streamDefinition.setDescription("agent health stats");
-            // Payload definition
-            List<Attribute> payloadData = new ArrayList<Attribute>();
-            payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
-            payloadData.add(new Attribute("cluster_instance_id", AttributeType.STRING));
-            payloadData.add(new Attribute("network_partition_id", AttributeType.STRING));
-            payloadData.add(new Attribute("member_id", AttributeType.STRING));
-            payloadData.add(new Attribute("partition_id", AttributeType.STRING));
-            payloadData.add(new Attribute("health_description", AttributeType.STRING));
-            payloadData.add(new Attribute("value", AttributeType.DOUBLE));
-            streamDefinition.setPayloadData(payloadData);
-            return streamDefinition;
-        } catch (Exception e) {
-            throw new RuntimeException("Could not create stream definition", e);
-        }
-    }
-
-    public MockHealthStatisticsPublisher() {
-        super(createStreamDefinition());
-    }
-
-    /**
-     * Publish health statistics to cep.
-     * @param clusterId
-     * @param networkPartitionId
-     * @param memberId
-     * @param partitionId
-     * @param health
-     * @param value
-     */
-    public void publish(String clusterId, String clusterInstanceId, String networkPartitionId, String memberId, String partitionId, String health, double value) {
-        if(log.isDebugEnabled()) {
-            log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s [partition] %s [member] %s [health] %s [value] %f",
-                    clusterId, networkPartitionId, partitionId, memberId, health, value));
-        }
-        List<Object> payload = new ArrayList<Object>();
-        // Payload values
-        payload.add(clusterId);
-        payload.add(clusterInstanceId);
-        payload.add(networkPartitionId);
-        payload.add(memberId);
-        payload.add(partitionId);
-        payload.add(health);
-        payload.add(value);
-        super.publish(payload.toArray());
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e195f2f1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/openstack/OpenstackIaas.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/openstack/OpenstackIaas.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/openstack/OpenstackIaas.java
new file mode 100644
index 0000000..34daf2a
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/openstack/OpenstackIaas.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.iaases.openstack;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.domain.IaasProvider;
+import org.apache.stratos.cloud.controller.domain.NetworkInterface;
+import org.apache.stratos.cloud.controller.exception.CloudControllerException;
+import org.apache.stratos.cloud.controller.exception.InvalidHostException;
+import org.apache.stratos.cloud.controller.exception.InvalidRegionException;
+import org.apache.stratos.cloud.controller.exception.InvalidZoneException;
+import org.apache.stratos.cloud.controller.iaases.JcloudsIaas;
+import org.apache.stratos.cloud.controller.iaases.openstack.networking.NeutronNetworkingApi;
+import org.apache.stratos.cloud.controller.iaases.openstack.networking.NovaNetworkingApi;
+import org.apache.stratos.cloud.controller.iaases.openstack.networking.OpenstackNetworkingApi;
+import org.apache.stratos.cloud.controller.iaases.PartitionValidator;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
+import org.apache.stratos.cloud.controller.util.ComputeServiceBuilderUtil;
+import org.jclouds.compute.ComputeService;
+import org.jclouds.compute.ComputeServiceContext;
+import org.jclouds.compute.domain.NodeMetadata;
+import org.jclouds.compute.domain.Template;
+import org.jclouds.compute.domain.TemplateBuilder;
+import org.jclouds.compute.options.TemplateOptions;
+import org.jclouds.openstack.nova.v2_0.NovaApi;
+import org.jclouds.openstack.nova.v2_0.compute.options.NovaTemplateOptions;
+import org.jclouds.openstack.nova.v2_0.domain.HostAggregate;
+import org.jclouds.openstack.nova.v2_0.domain.KeyPair;
+import org.jclouds.openstack.nova.v2_0.domain.Network;
+import org.jclouds.openstack.nova.v2_0.domain.Volume;
+import org.jclouds.openstack.nova.v2_0.domain.VolumeAttachment;
+import org.jclouds.openstack.nova.v2_0.domain.zonescoped.AvailabilityZone;
+import org.jclouds.openstack.nova.v2_0.extensions.AvailabilityZoneApi;
+import org.jclouds.openstack.nova.v2_0.extensions.HostAggregateApi;
+import org.jclouds.openstack.nova.v2_0.extensions.KeyPairApi;
+import org.jclouds.openstack.nova.v2_0.extensions.VolumeApi;
+import org.jclouds.openstack.nova.v2_0.extensions.VolumeAttachmentApi;
+import org.jclouds.openstack.nova.v2_0.options.CreateVolumeOptions;
+
+import com.google.common.base.Optional;
+
+public class OpenstackIaas extends JcloudsIaas {
+
+	private static final Log log = LogFactory.getLog(OpenstackIaas.class);
+	private static final String SUCCESSFUL_LOG_LINE = "A key-pair is created successfully in ";
+	private static final String FAILED_LOG_LINE = "Key-pair is unable to create in ";
+	
+	private OpenstackNetworkingApi openstackNetworkingApi;
+
+	public OpenstackIaas(IaasProvider iaasProvider) {
+		super(iaasProvider);
+		setOpenstackNetworkingApi(iaasProvider);
+	}
+	
+    private void setOpenstackNetworkingApi(IaasProvider iaasProvider) {
+        String openstackNetworkingProvider = iaasProvider.getProperty(CloudControllerConstants.OPENSTACK_NETWORKING_PROVIDER);
+        if (openstackNetworkingProvider != null && 
+                        openstackNetworkingProvider.equals(CloudControllerConstants.OPENSTACK_NEUTRON_NETWORKING)) {
+                if (log.isDebugEnabled()) {
+                        String msg = String.format("Openstack networking provider is %s. Trying to instanstiate %s", 
+                                        openstackNetworkingProvider, NeutronNetworkingApi.class.getName());
+                        log.debug(msg);
+                }
+                openstackNetworkingApi = new NeutronNetworkingApi(iaasProvider);
+        } else {
+                if (log.isDebugEnabled()) {
+                        String msg = String.format("Openstack networking provider is %s. Hence trying to instanstiate %s", 
+                                        openstackNetworkingProvider, NovaNetworkingApi.class.getName());
+                        log.debug(msg);
+                }
+                openstackNetworkingApi = new NovaNetworkingApi(iaasProvider);
+        }
+    }
+	
+	@Override
+	public void buildComputeServiceAndTemplate() {
+		// builds and sets Compute Service
+        ComputeService computeService = ComputeServiceBuilderUtil.buildDefaultComputeService(getIaasProvider());
+        getIaasProvider().setComputeService(computeService);
+
+		// builds and sets Template
+		buildTemplate();
+	}
+
+	public void buildTemplate() {
+		IaasProvider iaasProvider = getIaasProvider();
+		
+		if (iaasProvider.getComputeService() == null) {
+			throw new CloudControllerException(
+					"Compute service is null for IaaS provider: "
+							+ iaasProvider.getName());
+		}
+
+		TemplateBuilder templateBuilder = iaasProvider.getComputeService()
+				.templateBuilder();
+		templateBuilder.imageId(iaasProvider.getImage());
+        if(!(iaasProvider instanceof IaasProvider)) {
+           templateBuilder.locationId(iaasProvider.getType());
+        }
+        
+        // to avoid creation of template objects in each and every time, we
+        // create all at once!
+
+		String instanceType;
+
+		// set instance type
+		if (((instanceType = iaasProvider.getProperty(CloudControllerConstants.INSTANCE_TYPE)) != null)) {
+
+			templateBuilder.hardwareId(instanceType);
+		}
+
+		Template template = templateBuilder.build();
+
+		// In Openstack the call to IaaS should be blocking, in order to retrieve 
+		// IP addresses.
+		boolean blockUntilRunning = true;
+		if(iaasProvider.getProperty(CloudControllerConstants.BLOCK_UNTIL_RUNNING) != null) {
+			blockUntilRunning = Boolean.parseBoolean(iaasProvider.getProperty(
+					CloudControllerConstants.BLOCK_UNTIL_RUNNING));
+		}
+		template.getOptions().as(TemplateOptions.class)
+				.blockUntilRunning(blockUntilRunning);
+
+		// this is required in order to avoid creation of additional security
+		// groups by Jclouds.
+		template.getOptions().as(TemplateOptions.class)
+				.inboundPorts(new int[] {});
+
+		if (iaasProvider.getProperty(CloudControllerConstants.SECURITY_GROUPS) != null) {
+			template.getOptions()
+					.as(NovaTemplateOptions.class)
+					.securityGroupNames(
+							iaasProvider.getProperty(CloudControllerConstants.SECURITY_GROUPS).split(
+									CloudControllerConstants.ENTRY_SEPARATOR));
+		}
+
+		if (iaasProvider.getProperty(CloudControllerConstants.KEY_PAIR) != null) {
+			template.getOptions().as(NovaTemplateOptions.class)
+					.keyPairName(iaasProvider.getProperty(CloudControllerConstants.KEY_PAIR));
+		}
+		
+        if (iaasProvider.getNetworkInterfaces() != null) {
+            Set<Network> novaNetworksSet = new LinkedHashSet<Network>(iaasProvider.getNetworkInterfaces().length);
+            for (NetworkInterface ni:iaasProvider.getNetworkInterfaces()) {
+                novaNetworksSet.add(Network.builder().networkUuid(ni.getNetworkUuid()).fixedIp(ni.getFixedIp())
+                        .portUuid(ni.getPortUuid()).build());
+            }
+            template.getOptions().as(NovaTemplateOptions.class).novaNetworks(novaNetworksSet);
+        }
+		
+		if (iaasProvider.getProperty(CloudControllerConstants.AVAILABILITY_ZONE) != null) {
+			template.getOptions().as(NovaTemplateOptions.class)
+					.availabilityZone(iaasProvider.getProperty(CloudControllerConstants.AVAILABILITY_ZONE));
+		}
+		
+		//TODO
+//		if (iaas.getProperty(CloudControllerConstants.HOST) != null) {
+//            template.getOptions().as(NovaTemplateOptions.class)
+//                    .(CloudControllerConstants.HOST);
+//        }
+
+		// set Template
+		iaasProvider.setTemplate(template);
+	}
+
+    @Override
+	public void setDynamicPayload(byte[] payload) {
+		if (getIaasProvider().getTemplate() != null) {
+			getIaasProvider().getTemplate().getOptions().as(NovaTemplateOptions.class).userData(payload);
+		}
+	}
+
+	@Override
+	public synchronized boolean createKeyPairFromPublicKey(String region, String keyPairName,
+			String publicKey) {
+
+		IaasProvider iaasInfo = getIaasProvider();
+		
+		String openstackNovaMsg = " Openstack-nova. Region: " + region
+				+ " - Name: ";
+
+		ComputeServiceContext context = iaasInfo.getComputeService()
+				.getContext();
+        NovaApi novaApi = context.unwrapApi(NovaApi.class);
+		KeyPairApi api = novaApi.getKeyPairExtensionForZone(region).get();
+
+		KeyPair keyPair = api.createWithPublicKey(keyPairName, publicKey);
+
+		if (keyPair != null) {
+
+			iaasInfo.getTemplate().getOptions().as(NovaTemplateOptions.class)
+					.keyPairName(keyPair.getName());
+
+			log.info(SUCCESSFUL_LOG_LINE + openstackNovaMsg + keyPair.getName());
+			return true;
+		}
+
+		log.error(FAILED_LOG_LINE + openstackNovaMsg);
+		return false;
+
+	}
+
+	@Override
+	public synchronized List<String> associateAddresses(NodeMetadata node) {
+		//TODO return the list of IP addresses once the topology changes is done
+		return openstackNetworkingApi.associateAddresses(node);
+	}
+	
+	@Override
+	public synchronized String associatePredefinedAddress (NodeMetadata node, String ip) {
+		return openstackNetworkingApi.associatePredefinedAddress(node, ip);
+	}	
+
+	@Override
+	public synchronized void releaseAddress(String ip) {
+		openstackNetworkingApi.releaseAddress(ip);
+	}
+
+    @Override
+    public boolean isValidRegion(String region) throws InvalidRegionException {
+    	IaasProvider iaasInfo = getIaasProvider();
+    	
+        // jclouds' zone = region in openstack
+        if (region == null || iaasInfo == null) {
+            String msg =
+                         "Region or IaaSProvider is null: region: " + region + " - IaaSProvider: " +
+                                 iaasInfo;
+            log.error(msg);
+            throw new InvalidRegionException(msg);
+        }
+        
+        ComputeServiceContext context = iaasInfo.getComputeService().getContext();
+        NovaApi novaApi = context.unwrapApi(NovaApi.class);
+        Set<String> zones = novaApi.getConfiguredZones();
+        for (String configuredZone : zones) {
+            if (region.equalsIgnoreCase(configuredZone)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Found a matching region: " + region);
+                }
+                return true;
+            }
+        }
+        
+        String msg = "Invalid region: " + region +" in the iaas: "+iaasInfo.getType();
+        log.error(msg);
+        throw new InvalidRegionException(msg);
+    }
+
+    @Override
+    public boolean isValidZone(String region, String zone) throws InvalidZoneException {
+    	IaasProvider iaasInfo = getIaasProvider();
+    	
+    	// jclouds availability zone = stratos zone
+    	if (region == null || zone == null || iaasInfo == null) {
+            String msg = "Host or Zone or IaaSProvider is null: region: " + region + " - zone: " +
+                    zone + " - IaaSProvider: " + iaasInfo;
+            log.error(msg);
+            throw new InvalidZoneException(msg);
+        }
+        ComputeServiceContext context = iaasInfo.getComputeService().getContext();
+        NovaApi novaApi = context.unwrapApi(NovaApi.class);
+        Optional<? extends AvailabilityZoneApi> availabilityZoneApi = novaApi.getAvailabilityZoneApi(region);
+        for (AvailabilityZone z : availabilityZoneApi.get().list()) {
+			
+        	if (zone.equalsIgnoreCase(z.getName())) {
+        		if (log.isDebugEnabled()) {
+        			log.debug("Found a matching availability zone: " + zone);
+        		}
+        		return true;
+        	}
+		}
+        
+        String msg = "Invalid zone: " + zone +" in the region: "+region+ " and of the iaas: "+iaasInfo.getType();
+        log.error(msg);
+        throw new InvalidZoneException(msg);
+        
+    }
+
+    @Override
+    public boolean isValidHost(String zone, String host) throws InvalidHostException {
+    	IaasProvider iaasInfo = getIaasProvider();
+    	
+        if (host == null || zone == null || iaasInfo == null) {
+            String msg = String.format("Host or Zone or IaaSProvider is null: host: %s - zone: %s - IaaSProvider: %s", host, zone, iaasInfo);
+            log.error(msg);
+            throw new InvalidHostException(msg);
+        }
+        ComputeServiceContext context = iaasInfo.getComputeService().getContext();
+        NovaApi novaApi = context.unwrapApi(NovaApi.class);
+        HostAggregateApi hostApi = novaApi.getHostAggregateExtensionForZone(zone).get();
+        for (HostAggregate hostAggregate : hostApi.list()) {
+            for (String configuredHost : hostAggregate.getHosts()) {
+                if (host.equalsIgnoreCase(configuredHost)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Found a matching host: " + host);
+                    }
+                    return true;
+                }
+            }
+        }
+        
+        String msg = String.format("Invalid host: %s in the zone: %s and of the iaas: %s", host, zone, iaasInfo.getType());
+        log.error(msg);
+        throw new InvalidHostException(msg);
+    }
+
+    @Override
+    public PartitionValidator getPartitionValidator() {
+        return new OpenstackPartitionValidator();
+    }
+
+	@Override
+	public String createVolume(int sizeGB, String snapshotId) {
+		IaasProvider iaasInfo = getIaasProvider();
+		
+		if (iaasInfo == null) {
+		    log.fatal(String.format("Cannot create a new volume with snapshot ID : %s", snapshotId));
+		    return null;
+		}
+		
+		String region = ComputeServiceBuilderUtil.extractRegion(iaasInfo);
+		String zone = ComputeServiceBuilderUtil.extractZone(iaasInfo);
+		
+        if (region == null) {
+        	log.fatal(String.format("Cannot create a new volume. Extracted region is null for Iaas : %s", iaasInfo));
+            return null;
+        }
+        ComputeServiceContext context = iaasInfo.getComputeService().getContext();
+
+        NovaApi novaApi = context.unwrapApi(NovaApi.class);
+        VolumeApi volumeApi = novaApi.getVolumeExtensionForZone(region).get();
+        Volume volume;
+        if(StringUtils.isEmpty(snapshotId)){
+        	if(log.isDebugEnabled()){
+        		log.info("Creating a volume in the zone " + zone);
+        	}
+        	volume = volumeApi.create(sizeGB, CreateVolumeOptions.Builder.availabilityZone(zone));
+        }else{
+        	if(log.isDebugEnabled()){
+        		log.info("Creating a volume in the zone " + zone + " from the shanpshot " + snapshotId);
+        	}
+        	volume = volumeApi.create(sizeGB, CreateVolumeOptions.Builder.availabilityZone(zone).snapshotId(snapshotId));
+        }
+
+        if (volume == null) {
+            log.fatal(String.format("Volume creation was unsuccessful. [region] : %s [zone] : %s of Iaas : %s", region, zone, iaasInfo));
+            return null;
+        }
+
+        String volumeId = volume.getId();
+        /*
+        Volume.Status volumeStatus = this.getVolumeStatus(volumeApi, volumeId);
+
+        if(!(volumeStatus == Volume.Status.AVAILABLE || volumeStatus == Volume.Status.CREATING)){
+            log.error(String.format("Error while creating [volume id] %s. Volume status is %s", volumeId, volumeStatus));
+            return volumeId;
+        }
+        try {
+            if(!waitForStatus(volumeApi, volumeId, Volume.Status.AVAILABLE)){
+                log.error("Volume did not become AVAILABLE. Current status is " + volume.getStatus());
+            }
+        } catch (TimeoutException e) {
+            log.error("[Volume ID] " + volumeId + "did not become AVAILABLE within expected timeout");
+            return volumeId;
+        }
+        */
+		log.info(String.format("Successfully created a new volume [id]: %s in [region] : %s [zone] : %s of Iaas : %s [Volume ID]%s", volume.getId(), region, zone, iaasInfo, volume.getId()));
+		return volumeId;
+	}
+
+    private boolean waitForStatus(String volumeId, Volume.Status expectedStatus, int timeoutInMins) throws TimeoutException {
+        int timeout = 1000 * 60 * timeoutInMins;
+        long timout = System.currentTimeMillis() + timeout;
+
+        IaasProvider iaasInfo = getIaasProvider();
+        String region = ComputeServiceBuilderUtil.extractRegion(iaasInfo);
+        ComputeServiceContext context = iaasInfo.getComputeService().getContext();
+        NovaApi novaApi = context.unwrapApi(NovaApi.class);
+        VolumeApi volumeApi = novaApi.getVolumeExtensionForZone(region).get();
+        Volume.Status volumeStatus = this.getVolumeStatus(volumeApi, volumeId);
+
+        while(volumeStatus != expectedStatus){
+            try {
+                if(log.isDebugEnabled()){
+                    log.debug(String.format("Volume %s is still NOT in %s. Current State=%s", volumeId, expectedStatus, volumeStatus));
+                }
+                if(volumeStatus == Volume.Status.ERROR){
+                    log.error("Volume " + volumeId + " is in state ERROR");
+                    return false;
+                }
+                Thread.sleep(1000);
+                volumeStatus = this.getVolumeStatus(volumeApi, volumeId);
+                if (System.currentTimeMillis()> timout) {
+                    throw new TimeoutException();
+                }
+            } catch (InterruptedException e) {
+                // Ignoring the exception
+            }
+        }
+        if(log.isDebugEnabled()){
+            log.debug(String.format("Volume %s status became %s", volumeId, expectedStatus));
+        }
+
+        return true;
+    }
+
+    @Override
+	public String attachVolume(String instanceId, String volumeId, String deviceName) {
+        IaasProvider iaasInfo = getIaasProvider();
+
+        if (StringUtils.isEmpty(volumeId)) {
+            log.error("Volume provided to attach can not be null");
+        }
+
+        if (StringUtils.isEmpty(instanceId)) {
+            log.error("Instance provided to attach can not be null");
+        }
+
+        ComputeServiceContext context = iaasInfo.getComputeService()
+                .getContext();
+        String region = ComputeServiceBuilderUtil.extractRegion(iaasInfo);
+        String device = deviceName == null ? "/dev/vdc" : deviceName;
+
+        if (region == null) {
+            log.fatal(String.format("Cannot attach the volume [id]: %s. Extracted region is null for Iaas : %s", volumeId, iaasInfo));
+            return null;
+        }
+
+        NovaApi novaApi = context.unwrapApi(NovaApi.class);
+        VolumeApi volumeApi = novaApi.getVolumeExtensionForZone(region).get();
+        VolumeAttachmentApi volumeAttachmentApi = novaApi.getVolumeAttachmentExtensionForZone(region).get();
+
+        Volume.Status volumeStatus = this.getVolumeStatus(volumeApi, volumeId);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Volume " + volumeId + " is in state " + volumeStatus);
+        }
+
+        if (!(volumeStatus == Volume.Status.AVAILABLE || volumeStatus == Volume.Status.CREATING)) {
+            log.error(String.format("Volume %s can not be attached. Volume status is %s", volumeId, volumeStatus));
+            return null;
+        }
+
+        boolean volumeBecameAvailable = false, volumeBecameAttached = false;
+        try {
+            volumeBecameAvailable = waitForStatus(volumeId, Volume.Status.AVAILABLE, 5);
+        } catch (TimeoutException e) {
+            log.error("[Volume ID] " + volumeId + "did not become AVAILABLE within expected timeout");
+        }
+
+        VolumeAttachment attachment = null;
+        if (volumeBecameAvailable) {
+            attachment = volumeAttachmentApi.attachVolumeToServerAsDevice(volumeId, instanceId, device);
+
+            try {
+                volumeBecameAttached = waitForStatus(volumeId, Volume.Status.IN_USE, 2);
+            } catch (TimeoutException e) {
+                log.error("[Volume ID] " + volumeId + "did not become IN_USE within expected timeout");
+            }
+        }
+        try {
+            // waiting 5seconds till volumes are actually attached.
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        if (attachment == null) {
+			log.fatal(String.format("Volume [id]: %s attachment for instance [id]: %s was unsuccessful. [region] : %s of Iaas : %s", volumeId, instanceId, region, iaasInfo));
+			return null;
+		}
+
+        if(! volumeBecameAttached){
+           log.error(String.format("[Volume ID] %s attachment is called, but not yet became attached", volumeId));
+        }
+
+		log.info(String.format("Volume [id]: %s attachment for instance [id]: %s was successful [status]: Attaching. [region] : %s of Iaas : %s", volumeId, instanceId, region, iaasInfo));
+		return "Attaching";
+	}
+
+	@Override
+	public void detachVolume(String instanceId, String volumeId) {
+		IaasProvider iaasInfo = getIaasProvider();
+
+		ComputeServiceContext context = iaasInfo.getComputeService()
+				.getContext();
+		
+		String region = ComputeServiceBuilderUtil.extractRegion(iaasInfo);
+		
+		if(region == null) {
+			log.fatal(String.format("Cannot detach the volume [id]: %s from the instance [id]: %s. Extracted region is null for Iaas : %s", volumeId, instanceId, iaasInfo));
+			return;
+		}
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Starting to detach volume %s from the instance %s", volumeId, instanceId));
+        }
+
+        NovaApi novaApi = context.unwrapApi(NovaApi.class);
+        VolumeAttachmentApi api = novaApi.getVolumeAttachmentExtensionForZone(region).get();
+        if (api.detachVolumeFromServer(volumeId, instanceId)) {
+        	log.info(String.format("Detachment of Volume [id]: %s from instance [id]: %s was successful. [region] : %s of Iaas : %s", volumeId, instanceId, region, iaasInfo));
+        }else{
+            log.error(String.format("Detachment of Volume [id]: %s from instance [id]: %s was unsuccessful. [volume Status] : %s", volumeId, instanceId, region, iaasInfo));
+        }
+        
+	}
+
+	@Override
+	public void deleteVolume(String volumeId) {
+		IaasProvider iaasInfo = getIaasProvider();
+
+		ComputeServiceContext context = iaasInfo.getComputeService()
+				.getContext();
+		
+		String region = ComputeServiceBuilderUtil.extractRegion(iaasInfo);
+		
+		if(region == null) {
+			log.fatal(String.format("Cannot delete the volume [id]: %s. Extracted region is null for Iaas : %s", volumeId, iaasInfo));
+			return;
+		}
+
+        NovaApi novaApi = context.unwrapApi(NovaApi.class);
+		VolumeApi api = novaApi.getVolumeExtensionForZone(region).get();
+        if (api.delete(volumeId)) {
+        	log.info(String.format("Deletion of Volume [id]: %s was successful. [region] : %s of Iaas : %s", volumeId, region, iaasInfo));
+        }
+	}
+
+    @Override
+    public String getIaasDevice(String device) {
+        return device;
+    }
+
+    private Volume.Status getVolumeStatus(VolumeApi volumeApi, String volumeId){
+        return volumeApi.get(volumeId).getStatus();
+    }
+}