You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/10/31 08:51:41 UTC
[08/10] removing application status and adding applications,
cluster status topic
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
index db5d777..92d61ba 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
@@ -16,48 +16,106 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.stratos.messaging.message.processor.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.ApplicationCreatedEvent;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
-public class ApplicationCreatedMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(ApplicationCreatedMessageProcessor.class);
+import java.util.Set;
+public class ApplicationCreatedMessageProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(ApplicationCreatedMessageProcessor.class);
private MessageProcessor nextProcessor;
@Override
public void setNext(MessageProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
-
}
@Override
public boolean process(String type, String message, Object object) {
+
+ Topology topology = (Topology) object;
+
if (ApplicationCreatedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- ApplicationCreatedEvent event =
- (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
+ if (!topology.isInitialized()) {
+ return false;
+ }
- if (log.isDebugEnabled()) {
- log.debug("Received ApplicationCreated Event in application status topic: " + event.toString());
+ ApplicationCreatedEvent event = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
+ if (event == null) {
+ log.error("Unable to convert the JSON message to ApplicationCreatedEvent");
+ return false;
}
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+
+ TopologyUpdater.acquireWriteLockForApplications();
+ // since the Clusters will also get modified, acquire write locks for each Service Type
+ Set<ClusterDataHolder> clusterDataHolders = event.getApplication().getClusterDataRecursively();
+ if (clusterDataHolders != null) {
+ for (ClusterDataHolder clusterData : clusterDataHolders) {
+ TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType());
+ }
+ }
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ if (clusterDataHolders != null) {
+ for (ClusterDataHolder clusterData : clusterDataHolders) {
+ TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType());
+ }
+ }
+ TopologyUpdater.releaseWriteLockForApplications();
+ }
+
} else {
if (nextProcessor != null) {
- return nextProcessor.process(type, message, object);
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
} else {
- throw new RuntimeException(
- String.format("Failed to process group activated message " +
- "using available message processors: [type] %s [body] %s", type, message));
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
}
+
+ private boolean doProcess (ApplicationCreatedEvent event,Topology topology) {
+
+ // check if required properties are available
+ if (event.getApplication() == null) {
+ String errorMsg = "Application object of application created event is invalid";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ if (event.getApplication().getUniqueIdentifier() == null || event.getApplication().getUniqueIdentifier().isEmpty()) {
+ String errorMsg = "App id of application created event is invalid: [ " + event.getApplication().getUniqueIdentifier() + " ]";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ // check if an Application with same name exists in topology
+ if (topology.applicationExists(event.getApplication().getUniqueIdentifier())) {
+ log.warn("Application with id [ " + event.getApplication().getUniqueIdentifier() + " ] already exists in Topology");
+
+ } else {
+ // add application and the clusters to Topology
+ for(Cluster cluster: event.getClusterList()) {
+ topology.getService(cluster.getServiceName()).addCluster(cluster);
+ }
+ topology.addApplication(event.getApplication());
+ }
+
+ notifyEventListeners(event);
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
index d8f2aac..91eae8c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
@@ -20,10 +20,17 @@ package org.apache.stratos.messaging.message.processor.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.ApplicationInactivatedEvent;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
+/**
+ * This processor responsible to process the application Inactivation even and update the Topology.
+ */
public class ApplicationInactivatedMessageProcessor extends MessageProcessor {
private static final Log log =
LogFactory.getLog(ApplicationInactivatedMessageProcessor.class);
@@ -34,30 +41,64 @@ public class ApplicationInactivatedMessageProcessor extends MessageProcessor {
@Override
public void setNext(MessageProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
-
}
+
@Override
public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
if (ApplicationInactivatedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
// Parse complete message and build event
- ApplicationInactivatedEvent event =
- (ApplicationInactivatedEvent) Util.jsonToObject(message, ApplicationInactivatedEvent.class);
+ ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util.
+ jsonToObject(message, ApplicationInactivatedEvent.class);
- if (log.isDebugEnabled()) {
- log.debug("Received ApplicationInActivatedEvent in application status topic: " + event.toString());
+ TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
}
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+
} else {
if (nextProcessor != null) {
- return nextProcessor.process(type, message, object);
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
} else {
- throw new RuntimeException(
- String.format("Failed to process group activated message " +
- "using available message processors: [type] %s [body] %s", type, message));
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) {
+
+ // Validate event against the existing topology
+ Application application = topology.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
}
+ return false;
+ } else {
+ // Apply changes to the topology
+ if (!application.isStateTransitionValid(ApplicationStatus.Inactive)) {
+ log.error("Invalid State transfer from [ " + application.getStatus() +
+ " ] to [ " + ApplicationStatus.Inactive + " ]");
+ }
+ application.setStatus(ApplicationStatus.Inactive);
+
}
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
index a121ffb..8cd2182 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
@@ -20,10 +20,18 @@ package org.apache.stratos.messaging.message.processor.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.ApplicationTerminatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
+import java.util.Set;
+
+/**
+ * This processor responsible to process the application Inactivation even and update the Topology.
+ */
public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
private static final Log log =
LogFactory.getLog(ApplicationTerminatedMessageProcessor.class);
@@ -34,30 +42,95 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
@Override
public void setNext(MessageProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
-
}
+
@Override
public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
if (ApplicationTerminatedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
// Parse complete message and build event
- ApplicationTerminatedEvent event =
- (ApplicationTerminatedEvent) Util.jsonToObject(message, ApplicationTerminatedEvent.class);
+ ApplicationTerminatedEvent event = (ApplicationTerminatedEvent) Util.
+ jsonToObject(message, ApplicationTerminatedEvent.class);
- if (log.isDebugEnabled()) {
- log.debug("Received ApplicationTerminatedEvent in application status topic: " + event.toString());
+ TopologyUpdater.acquireWriteLockForApplications();
+ Set<ClusterDataHolder> clusterDataHolders = event.getClusterData();
+ if (clusterDataHolders != null) {
+ for (ClusterDataHolder clusterData : clusterDataHolders) {
+ TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType());
+ }
}
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyUpdater.releaseWriteLockForApplications();
+ if (clusterDataHolders != null) {
+ for (ClusterDataHolder clusterData : clusterDataHolders) {
+ TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType());
+ }
+ }
+ }
+
} else {
if (nextProcessor != null) {
- return nextProcessor.process(type, message, object);
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
} else {
- throw new RuntimeException(
- String.format("Failed to process group activated message " +
- "using available message processors: [type] %s [body] %s", type, message));
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
}
+
+ private boolean doProcess (ApplicationTerminatedEvent event, Topology topology) {
+
+ // check if required properties are available
+ if (event.getAppId() == null) {
+ String errorMsg = "Application Id of application removed event is invalid";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ if (event.getTenantDomain()== null) {
+ String errorMsg = "Application tenant domain of application removed event is invalid";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ // check if an Application with same name exists in topology
+ String appId = event.getAppId();
+ if (topology.applicationExists(appId)) {
+ log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it");
+ topology.removeApplication(appId);
+ }
+
+ if (event.getClusterData() != null) {
+ // remove the Clusters from the Topology
+ for (ClusterDataHolder clusterData : event.getClusterData()) {
+ Service service = topology.getService(clusterData.getServiceType());
+ if (service != null) {
+ service.removeCluster(clusterData.getClusterId());
+ if (log.isDebugEnabled()) {
+ log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology");
+ }
+ } else {
+ log.warn("Service " + clusterData.getServiceType() + " not found in Topology!");
+ }
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("ApplicationRemovedMessageProcessor notifying listener ");
+ }
+
+ notifyEventListeners(event);
+ return true;
+
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
index 280de2c..057d013 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
@@ -20,10 +20,17 @@ package org.apache.stratos.messaging.message.processor.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.ApplicationTerminatingEvent;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ApplicationTerminatingEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
+/**
+ * This processor responsible to process the application Inactivation even and update the Topology.
+ */
public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
private static final Log log =
LogFactory.getLog(ApplicationTerminatingMessageProcessor.class);
@@ -34,30 +41,64 @@ public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
@Override
public void setNext(MessageProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
-
}
+
@Override
public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
if (ApplicationTerminatingEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
// Parse complete message and build event
- ApplicationTerminatingEvent event =
- (ApplicationTerminatingEvent) Util.jsonToObject(message, ApplicationTerminatingEvent.class);
+ ApplicationTerminatingEvent event = (ApplicationTerminatingEvent) Util.
+ jsonToObject(message, ApplicationTerminatingEvent.class);
- if (log.isDebugEnabled()) {
- log.debug("Received ApplicationTerminatingEvent in application status topic: " + event.toString());
+ TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
}
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+
} else {
if (nextProcessor != null) {
- return nextProcessor.process(type, message, object);
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
} else {
- throw new RuntimeException(
- String.format("Failed to process group activated message " +
- "using available message processors: [type] %s [body] %s", type, message));
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess (ApplicationTerminatingEvent event, Topology topology) {
+
+ // Validate event against the existing topology
+ Application application = topology.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
}
+ return false;
+ } else {
+ // Apply changes to the topology
+ if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) {
+ log.error("Invalid State transfer from [ " + application.getStatus() +
+ " ] to [ " + ApplicationStatus.Terminating + " ]");
+ }
+ application.setStatus(ApplicationStatus.Terminating);
+
}
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java
new file mode 100644
index 0000000..7e91ab8
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.ApplicationUndeployedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+import java.util.Set;
+
+public class ApplicationUndeployedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(ApplicationUndeployedMessageProcessor.class);
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+
+ Topology topology = (Topology) object;
+
+ if (ApplicationUndeployedEvent.class.getName().equals(type)) {
+ if (!topology.isInitialized()) {
+ return false;
+ }
+
+ ApplicationUndeployedEvent event = (ApplicationUndeployedEvent)
+ Util.jsonToObject(message, ApplicationUndeployedEvent.class);
+ if (event == null) {
+ log.error("Unable to convert the JSON message to ApplicationUndeployedEvent");
+ return false;
+ }
+
+ // get write lock for the application and relevant Clusters
+ TopologyUpdater.acquireWriteLockForApplication(event.getApplicationId());
+ Set<ClusterDataHolder> clusterDataHolders = event.getClusterData();
+ if (clusterDataHolders != null) {
+ for (ClusterDataHolder clusterData : clusterDataHolders) {
+ TopologyUpdater.acquireWriteLockForCluster(clusterData.getServiceType(),
+ clusterData.getClusterId());
+ }
+ }
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ // remove locks
+ if (clusterDataHolders != null) {
+ for (ClusterDataHolder clusterData : clusterDataHolders) {
+ TopologyUpdater.releaseWriteLockForCluster(clusterData.getServiceType(),
+ clusterData.getClusterId());
+ }
+ }
+ TopologyUpdater.releaseWriteLockForApplication(event.getApplicationId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format
+ ("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess (ApplicationUndeployedEvent event, Topology topology) {
+
+ // update the application status to Terminating
+ Application application = topology.getApplication(event.getApplicationId());
+ // check and update application status to 'Terminating'
+ if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) {
+ log.error("Invalid state transfer from " + application.getStatus() + " to " + ApplicationStatus.Terminating);
+ }
+ // for now anyway update the status forcefully
+ application.setStatus(ApplicationStatus.Terminating);
+
+ // update all the Clusters' statuses to 'Terminating'
+ Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+ // update the Cluster statuses to Terminating
+ for (ClusterDataHolder clusterDataHolder : clusterData) {
+ Service service = topology.getService(clusterDataHolder.getServiceType());
+ if (service != null) {
+ Cluster aCluster = service.getCluster(clusterDataHolder.getClusterId());
+ if (aCluster != null) {
+ // validate state transition
+ if (!aCluster.isStateTransitionValid(ClusterStatus.Terminating)) {
+ log.error("Invalid state transfer from " + aCluster.getStatus() + " to "
+ + ClusterStatus.Terminating);
+ }
+ // for now anyway update the status forcefully
+ aCluster.setStatus(ClusterStatus.Terminating);
+
+ } else {
+ log.warn("Unable to find Cluster with cluster id " + clusterDataHolder.getClusterId() +
+ " in Topology");
+ }
+
+ } else {
+ log.warn("Unable to remove cluster with cluster id: " + clusterDataHolder.getClusterId() + " from Topology, " +
+ " associated Service [ " + clusterDataHolder.getServiceType() + " ] npt found");
+ }
+ }
+
+ notifyEventListeners(event);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
new file mode 100644
index 0000000..e75b604
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.applications.*;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Application Status processor chain is to handle the list processors to parse the application
+ * status.
+ */
+public class ApplicationsMessageProcessorChain extends MessageProcessorChain {
+ private static final Log log = LogFactory.getLog(ApplicationsMessageProcessorChain.class);
+
+ private GroupCreatedProcessor groupCreatedMessageProcessor;
+ private GroupActivatedProcessor groupActivatedMessageProcessor;
+ private GroupInActivateProcessor groupInActivateMessageProcessor;
+ private GroupTerminatedProcessor groupTerminatedProcessor;
+ private GroupTerminatingProcessor groupTerminatingProcessor;
+ private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor;
+ private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
+ private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor;
+ private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor;
+ private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor;
+ private ApplicationUndeployedMessageProcessor applicationUndeployedMessageProcessor;
+
+ public void initialize() {
+ // Add instance notifier event processors
+
+ groupCreatedMessageProcessor = new GroupCreatedProcessor();
+ add(groupCreatedMessageProcessor);
+
+ groupActivatedMessageProcessor = new GroupActivatedProcessor();
+ add(groupActivatedMessageProcessor);
+
+ groupInActivateMessageProcessor = new GroupInActivateProcessor();
+ add(groupInActivateMessageProcessor);
+
+ groupTerminatedProcessor = new GroupTerminatedProcessor();
+ add(groupTerminatedProcessor);
+
+ groupTerminatingProcessor = new GroupTerminatingProcessor();
+ add(groupTerminatingProcessor);
+
+ applicationActivatedMessageProcessor = new ApplicationActivatedMessageProcessor();
+ add(applicationActivatedMessageProcessor);
+
+ applicationCreatedMessageProcessor = new ApplicationCreatedMessageProcessor();
+ add(applicationCreatedMessageProcessor);
+
+ applicationInactivatedMessageProcessor = new ApplicationInactivatedMessageProcessor();
+ add(applicationInactivatedMessageProcessor);
+
+ applicationTerminatingMessageProcessor = new ApplicationTerminatingMessageProcessor();
+ add(applicationTerminatingMessageProcessor);
+
+ applicationTerminatedMessageProcessor = new ApplicationTerminatedMessageProcessor();
+ add(applicationTerminatedMessageProcessor);
+
+ applicationUndeployedMessageProcessor = new ApplicationUndeployedMessageProcessor();
+ add(applicationUndeployedMessageProcessor);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Instance notifier message processor chain initialized");
+ }
+ }
+
+ public void addEventListener(EventListener eventListener) {
+
+ if(eventListener instanceof GroupCreatedEventListener) {
+ groupCreatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof GroupInactivateEventListener) {
+ groupInActivateMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof GroupActivatedEventListener) {
+ groupActivatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof GroupTerminatingEventListener) {
+ groupTerminatingProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof GroupTerminatedEventListener) {
+ groupTerminatedProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ApplicationCreatedEventListener) {
+ applicationCreatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ApplicationActivatedEventListener) {
+ applicationActivatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ApplicationInactivatedEventListener) {
+ applicationInactivatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ApplicationTerminatingEventListener) {
+ applicationTerminatingMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ApplicationTerminatedEventListener) {
+ applicationTerminatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ApplicationUndeployedEventListener) {
+ applicationUndeployedMessageProcessor.addEventListener(eventListener);
+ } else {
+ throw new RuntimeException("Unknown event listener " + eventListener.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
deleted file mode 100644
index 02ddda8..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.GroupActivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-public class GroupActivatedMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(GroupActivatedMessageProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- if (GroupActivatedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- GroupActivatedEvent event =
- (GroupActivatedEvent) Util.jsonToObject(message, GroupActivatedEvent.class);
-
- if (log.isDebugEnabled()) {
- log.debug("Received GroupActivatedEvent: " + event.toString());
- }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- return nextProcessor.process(type, message, object);
- } else {
- throw new RuntimeException(
- String.format("Failed to process group activated message " +
- "using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
new file mode 100644
index 0000000..845e933
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupActivatedProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(GroupActivatedProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (GroupActivatedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ GroupActivatedEvent event = (GroupActivatedEvent) Util.
+ jsonToObject(message, GroupActivatedEvent.class);
+
+ TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess (GroupActivatedEvent event,Topology topology) {
+
+ // Validate event against the existing topology
+ Application application = topology.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroupRecursively(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ }
+ } else {
+ // Apply changes to the topology
+ if (!group.isStateTransitionValid(GroupStatus.Active)) {
+ log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
+ }
+ group.setStatus(GroupStatus.Active);
+
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
deleted file mode 100644
index d04d7f9..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.GroupCreatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-public class GroupCreatedMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(GroupCreatedMessageProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- if (GroupCreatedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- GroupCreatedEvent event =
- (GroupCreatedEvent) Util.jsonToObject(message, GroupCreatedEvent.class);
-
- if (log.isDebugEnabled()) {
- log.debug("Received AppStatusGroupCreatedEvent: " + event.toString());
- }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- return nextProcessor.process(type, message, object);
- } else {
- throw new RuntimeException(
- String.format("Failed to process group created message " +
- "using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java
new file mode 100644
index 0000000..47d4457
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.GroupCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupCreatedProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(GroupCreatedProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (GroupCreatedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ GroupCreatedEvent event = (GroupCreatedEvent) Util.
+ jsonToObject(message, GroupCreatedEvent.class);
+
+ TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess (GroupCreatedEvent event,Topology topology) {
+
+ // Validate event against the existing topology
+ Application application = topology.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroupRecursively(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ }
+ } else {
+ // Apply changes to the topology
+ if (!group.isStateTransitionValid(GroupStatus.Created)) {
+ log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Created + " " +
+ "for Group " + group.getAlias());
+ }
+ group.setStatus(GroupStatus.Created);
+
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
new file mode 100644
index 0000000..063a3de
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.GroupInactivateEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInActivateProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(GroupInActivateProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (GroupInactivateEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ GroupInactivateEvent event = (GroupInactivateEvent) Util.
+ jsonToObject(message, GroupInactivateEvent.class);
+
+ TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess(GroupInactivateEvent event, Topology topology) {
+
+ // Validate event against the existing topology
+ Application application = topology.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroupRecursively(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ }
+ } else {
+ group.setStatus(GroupStatus.Inactive);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Group updated as in-activated : %s",
+ group.getUniqueIdentifier()));
+ }
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
deleted file mode 100644
index 6cf2587..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.AppStatusGroupInactivateEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-public class GroupInactivatedMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(GroupInactivatedMessageProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- if (AppStatusGroupInactivateEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- AppStatusGroupInactivateEvent event =
- (AppStatusGroupInactivateEvent) Util.jsonToObject(message, AppStatusGroupInactivateEvent.class);
-
- if (log.isDebugEnabled()) {
- log.debug("Received GroupInActivateEvent: " + event.toString());
- }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- return nextProcessor.process(type, message, object);
- } else {
- throw new RuntimeException(
- String.format("Failed to process group in activated message " +
- "using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
deleted file mode 100644
index a917a14..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.GroupTerminatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-public class GroupTerminatedMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(GroupTerminatedMessageProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- if (GroupTerminatedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- GroupTerminatedEvent event =
- (GroupTerminatedEvent) Util.jsonToObject(message, GroupTerminatedEvent.class);
-
- if (log.isDebugEnabled()) {
- log.debug("Received GroupTerminatingEvent: " + event.toString());
- }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- return nextProcessor.process(type, message, object);
- } else {
- throw new RuntimeException(
- String.format("Failed to process group in GroupTerminatingEvent message " +
- "using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
new file mode 100644
index 0000000..3de0914
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupTerminatedProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(GroupTerminatedProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (GroupTerminatedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ GroupTerminatedEvent event = (GroupTerminatedEvent) Util.
+ jsonToObject(message, GroupTerminatedEvent.class);
+
+ TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess (GroupTerminatedEvent event,Topology topology) {
+
+ // Validate event against the existing topology
+ Application application = topology.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroupRecursively(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ }
+ } else {
+ // Apply changes to the topology
+ if (!group.isStateTransitionValid(GroupStatus.Terminated)) {
+ log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Terminated);
+ }
+ group.setStatus(GroupStatus.Terminated);
+
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
deleted file mode 100644
index 63c055d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.GroupTerminatingEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-public class GroupTerminatingMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(GroupTerminatingMessageProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- if (GroupTerminatingEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- GroupTerminatingEvent event =
- (GroupTerminatingEvent) Util.jsonToObject(message, GroupTerminatingEvent.class);
-
- if (log.isDebugEnabled()) {
- log.debug("Received GroupTerminatingEvent: " + event.toString());
- }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- return nextProcessor.process(type, message, object);
- } else {
- throw new RuntimeException(
- String.format("Failed to process group in GroupTerminatingEvent message " +
- "using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
new file mode 100644
index 0000000..e124b7b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupTerminatingProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(GroupTerminatingProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (GroupTerminatingEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ GroupTerminatingEvent event = (GroupTerminatingEvent) Util.
+ jsonToObject(message, GroupTerminatingEvent.class);
+
+ TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess (GroupTerminatingEvent event,Topology topology) {
+
+ // Validate event against the existing topology
+ Application application = topology.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroupRecursively(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ }
+ } else {
+ // Apply changes to the topology
+ if (!group.isStateTransitionValid(GroupStatus.Terminating)) {
+ log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
+ }
+ group.setStatus(GroupStatus.Terminating);
+
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java
new file mode 100644
index 0000000..694c3f3
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ClusterStatusClusterActivatedMessageProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(ClusterStatusClusterActivatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (ClusterStatusClusterActivatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ ClusterStatusClusterActivatedEvent event = (ClusterStatusClusterActivatedEvent) Util.
+ jsonToObject(message, ClusterStatusClusterActivatedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received ClusterActivatedEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java
new file mode 100644
index 0000000..9b4780b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ClusterStatusClusterCreatedMessageProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(ClusterStatusClusterCreatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (ClusterStatusClusterCreatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ ClusterStatusClusterCreatedEvent event = (ClusterStatusClusterCreatedEvent) Util.
+ jsonToObject(message, ClusterStatusClusterCreatedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received AppStatusClusterCreatedEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(String.format("Failed to process cluster created message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}