You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/11/03 08:32:31 UTC
[16/50] [abbrv] removing application status and adding applications,
cluster status topic
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java
new file mode 100644
index 0000000..69918e2
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterInactivateEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ClusterStatusClusterInactivateMessageProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(ClusterStatusClusterInactivateMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (ClusterStatusClusterInactivateEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ ClusterStatusClusterInactivateEvent event = (ClusterStatusClusterInactivateEvent) Util.
+ jsonToObject(message, ClusterStatusClusterInactivateEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received ClusterInActivateEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java
new file mode 100644
index 0000000..64b6c7b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ClusterStatusClusterTerminatedMessageProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(ClusterStatusClusterTerminatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (ClusterStatusClusterTerminatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ ClusterStatusClusterTerminatedEvent event = (ClusterStatusClusterTerminatedEvent) Util.
+ jsonToObject(message, ClusterStatusClusterTerminatedEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received AppStatusClusterTerminatedEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java
new file mode 100644
index 0000000..c161dd5
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ClusterStatusClusterTerminatingMessageProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(ClusterStatusClusterTerminatingMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (ClusterStatusClusterTerminatingEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ ClusterStatusClusterTerminatingEvent event = (ClusterStatusClusterTerminatingEvent) Util.
+ jsonToObject(message, ClusterStatusClusterTerminatingEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received AppStatusClusterTerminatingEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java
new file mode 100644
index 0000000..29556ec
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.cluster.status.*;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * This is to keep track of the processors for the cluster status topic.
+ */
+public class ClusterStatusMessageProcessorChain extends MessageProcessorChain {
+ private static final Log log = LogFactory.getLog(ClusterStatusMessageProcessorChain.class);
+
+
+ private ClusterStatusClusterActivatedMessageProcessor clusterActivatedMessageProcessor;
+ private ClusterStatusClusterCreatedMessageProcessor clusterCreatedMessageProcessor;
+ private ClusterStatusClusterInactivateMessageProcessor clusterInactivateMessageProcessor;
+ private ClusterStatusClusterTerminatedMessageProcessor clusterTerminatedMessageProcessor;
+ private ClusterStatusClusterTerminatingMessageProcessor clusterTerminatingMessageProcessor;
+ @Override
+ protected void initialize() {
+ clusterCreatedMessageProcessor = new ClusterStatusClusterCreatedMessageProcessor();
+ add(clusterCreatedMessageProcessor);
+
+ clusterActivatedMessageProcessor = new ClusterStatusClusterActivatedMessageProcessor();
+ add(clusterActivatedMessageProcessor);
+
+ clusterInactivateMessageProcessor = new ClusterStatusClusterInactivateMessageProcessor();
+ add(clusterInactivateMessageProcessor);
+
+ clusterTerminatedMessageProcessor = new ClusterStatusClusterTerminatedMessageProcessor();
+ add(clusterTerminatedMessageProcessor);
+
+ clusterTerminatingMessageProcessor = new ClusterStatusClusterTerminatingMessageProcessor();
+ add(clusterTerminatingMessageProcessor);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Cluster status message processor chain initialized");
+ }
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener) {
+ if(eventListener instanceof ClusterStatusClusterCreatedEventListener) {
+ clusterCreatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ClusterStatusClusterInactivateEventListener) {
+ clusterInactivateMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ClusterStatusClusterActivatedEventListener) {
+ clusterActivatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ClusterStatusClusterTerminatingEventListener) {
+ clusterTerminatingMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ClusterStatusClusterTerminatedEventListener) {
+ clusterTerminatedMessageProcessor.addEventListener(eventListener);
+ } else {
+ throw new RuntimeException("Unknown event listener " + eventListener.toString());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
deleted file mode 100644
index 341b402..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationActivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor responsible to process the application activation even and update the Topology.
- */
-public class ApplicationActivatedMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(ApplicationActivatedMessageProcessor.class);
-
-
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (ApplicationActivatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
- return false;
-
- // Parse complete message and build event
- ApplicationActivatedEvent event = (ApplicationActivatedEvent) Util.
- jsonToObject(message, ApplicationActivatedEvent.class);
-
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, topology);
-
- } finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess(ApplicationActivatedEvent event, Topology topology) {
-
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- } else {
- // Apply changes to the topology
- if (!application.isStateTransitionValid(ApplicationStatus.Active)) {
- log.error("Invalid State transfer from [ " + application.getStatus() +
- " ] to [ " + ApplicationStatus.Active + " ]");
- }
- application.setStatus(ApplicationStatus.Active);
-
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
deleted file mode 100644
index 079cb90..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-import java.util.Set;
-
-public class ApplicationCreatedMessageProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(ApplicationCreatedMessageProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
-
- Topology topology = (Topology) object;
-
- if (ApplicationCreatedEvent.class.getName().equals(type)) {
- if (!topology.isInitialized()) {
- return false;
- }
-
- ApplicationCreatedEvent event = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
- if (event == null) {
- log.error("Unable to convert the JSON message to ApplicationCreatedEvent");
- return false;
- }
-
- TopologyUpdater.acquireWriteLockForApplications();
- // since the Clusters will also get modified, acquire write locks for each Service Type
- Set<ClusterDataHolder> clusterDataHolders = event.getApplication().getClusterDataRecursively();
- if (clusterDataHolders != null) {
- for (ClusterDataHolder clusterData : clusterDataHolders) {
- TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType());
- }
- }
-
- try {
- return doProcess(event, topology);
-
- } finally {
- if (clusterDataHolders != null) {
- for (ClusterDataHolder clusterData : clusterDataHolders) {
- TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType());
- }
- }
- TopologyUpdater.releaseWriteLockForApplications();
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess (ApplicationCreatedEvent event,Topology topology) {
-
- // check if required properties are available
- if (event.getApplication() == null) {
- String errorMsg = "Application object of application created event is invalid";
- log.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
-
- if (event.getApplication().getUniqueIdentifier() == null || event.getApplication().getUniqueIdentifier().isEmpty()) {
- String errorMsg = "App id of application created event is invalid: [ " + event.getApplication().getUniqueIdentifier() + " ]";
- log.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
-
- // check if an Application with same name exists in topology
- if (topology.applicationExists(event.getApplication().getUniqueIdentifier())) {
- log.warn("Application with id [ " + event.getApplication().getUniqueIdentifier() + " ] already exists in Topology");
-
- } else {
- // add application and the clusters to Topology
- for(Cluster cluster: event.getClusterList()) {
- topology.getService(cluster.getServiceName()).addCluster(cluster);
- }
- topology.addApplication(event.getApplication());
- }
-
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java
deleted file mode 100644
index 8c88324..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor responsible to process the application Inactivation even and update the Topology.
- */
-public class ApplicationInactivatedMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(ApplicationInactivatedMessageProcessor.class);
-
-
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (ApplicationInactivatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
- return false;
-
- // Parse complete message and build event
- ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util.
- jsonToObject(message, ApplicationInactivatedEvent.class);
-
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, topology);
-
- } finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) {
-
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- } else {
- // Apply changes to the topology
- if (!application.isStateTransitionValid(ApplicationStatus.Inactive)) {
- log.error("Invalid State transfer from [ " + application.getStatus() +
- " ] to [ " + ApplicationStatus.Inactive + " ]");
- }
- application.setStatus(ApplicationStatus.Inactive);
-
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java
deleted file mode 100644
index 2dd3ea7..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.ApplicationTerminatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-import java.util.Set;
-
-/**
- * This processor responsible to process the application Inactivation even and update the Topology.
- */
-public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(ApplicationTerminatedMessageProcessor.class);
-
-
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (ApplicationTerminatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
- return false;
-
- // Parse complete message and build event
- ApplicationTerminatedEvent event = (ApplicationTerminatedEvent) Util.
- jsonToObject(message, ApplicationTerminatedEvent.class);
-
- TopologyUpdater.acquireWriteLockForApplications();
- Set<ClusterDataHolder> clusterDataHolders = event.getClusterData();
- if (clusterDataHolders != null) {
- for (ClusterDataHolder clusterData : clusterDataHolders) {
- TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType());
- }
- }
-
- try {
- return doProcess(event, topology);
-
- } finally {
- TopologyUpdater.releaseWriteLockForApplications();
- if (clusterDataHolders != null) {
- for (ClusterDataHolder clusterData : clusterDataHolders) {
- TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType());
- }
- }
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess (ApplicationTerminatedEvent event, Topology topology) {
-
- // check if required properties are available
- if (event.getAppId() == null) {
- String errorMsg = "Application Id of application removed event is invalid";
- log.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
-
- if (event.getTenantDomain()== null) {
- String errorMsg = "Application tenant domain of application removed event is invalid";
- log.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
-
- // check if an Application with same name exists in topology
- String appId = event.getAppId();
- if (topology.applicationExists(appId)) {
- log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it");
- topology.removeApplication(appId);
- }
-
- if (event.getClusterData() != null) {
- // remove the Clusters from the Topology
- for (ClusterDataHolder clusterData : event.getClusterData()) {
- Service service = topology.getService(clusterData.getServiceType());
- if (service != null) {
- service.removeCluster(clusterData.getClusterId());
- if (log.isDebugEnabled()) {
- log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology");
- }
- } else {
- log.warn("Service " + clusterData.getServiceType() + " not found in Topology!");
- }
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug("ApplicationRemovedMessageProcessor notifying listener ");
- }
-
- notifyEventListeners(event);
- return true;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java
deleted file mode 100644
index 032be79..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationTerminatingEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor responsible to process the application Inactivation even and update the Topology.
- */
-public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(ApplicationTerminatingMessageProcessor.class);
-
-
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (ApplicationTerminatingEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
- return false;
-
- // Parse complete message and build event
- ApplicationTerminatingEvent event = (ApplicationTerminatingEvent) Util.
- jsonToObject(message, ApplicationTerminatingEvent.class);
-
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, topology);
-
- } finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess (ApplicationTerminatingEvent event, Topology topology) {
-
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- } else {
- // Apply changes to the topology
- if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) {
- log.error("Invalid State transfer from [ " + application.getStatus() +
- " ] to [ " + ApplicationStatus.Terminating + " ]");
- }
- application.setStatus(ApplicationStatus.Terminating);
-
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java
deleted file mode 100644
index f368dd1..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.ApplicationUndeployedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-import java.util.Set;
-
-public class ApplicationUndeployedMessageProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(ApplicationUndeployedMessageProcessor.class);
-
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
-
- Topology topology = (Topology) object;
-
- if (ApplicationUndeployedEvent.class.getName().equals(type)) {
- if (!topology.isInitialized()) {
- return false;
- }
-
- ApplicationUndeployedEvent event = (ApplicationUndeployedEvent)
- Util.jsonToObject(message, ApplicationUndeployedEvent.class);
- if (event == null) {
- log.error("Unable to convert the JSON message to ApplicationUndeployedEvent");
- return false;
- }
-
- // get write lock for the application and relevant Clusters
- TopologyUpdater.acquireWriteLockForApplication(event.getApplicationId());
- Set<ClusterDataHolder> clusterDataHolders = event.getClusterData();
- if (clusterDataHolders != null) {
- for (ClusterDataHolder clusterData : clusterDataHolders) {
- TopologyUpdater.acquireWriteLockForCluster(clusterData.getServiceType(),
- clusterData.getClusterId());
- }
- }
-
- try {
- return doProcess(event, topology);
-
- } finally {
- // remove locks
- if (clusterDataHolders != null) {
- for (ClusterDataHolder clusterData : clusterDataHolders) {
- TopologyUpdater.releaseWriteLockForCluster(clusterData.getServiceType(),
- clusterData.getClusterId());
- }
- }
- TopologyUpdater.releaseWriteLockForApplication(event.getApplicationId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format
- ("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess (ApplicationUndeployedEvent event, Topology topology) {
-
- // update the application status to Terminating
- Application application = topology.getApplication(event.getApplicationId());
- // check and update application status to 'Terminating'
- if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) {
- log.error("Invalid state transfer from " + application.getStatus() + " to " + ApplicationStatus.Terminating);
- }
- // for now anyway update the status forcefully
- application.setStatus(ApplicationStatus.Terminating);
-
- // update all the Clusters' statuses to 'Terminating'
- Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
- // update the Cluster statuses to Terminating
- for (ClusterDataHolder clusterDataHolder : clusterData) {
- Service service = topology.getService(clusterDataHolder.getServiceType());
- if (service != null) {
- Cluster aCluster = service.getCluster(clusterDataHolder.getClusterId());
- if (aCluster != null) {
- // validate state transition
- if (!aCluster.isStateTransitionValid(ClusterStatus.Terminating)) {
- log.error("Invalid state transfer from " + aCluster.getStatus() + " to "
- + ClusterStatus.Terminating);
- }
- // for now anyway update the status forcefully
- aCluster.setStatus(ClusterStatus.Terminating);
-
- } else {
- log.warn("Unable to find Cluster with cluster id " + clusterDataHolder.getClusterId() +
- " in Topology");
- }
-
- } else {
- log.warn("Unable to remove cluster with cluster id: " + clusterDataHolder.getClusterId() + " from Topology, " +
- " associated Service [ " + clusterDataHolder.getServiceType() + " ] npt found");
- }
- }
-
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
deleted file mode 100644
index 09b9062..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupActivatedProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(GroupActivatedProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (GroupActivatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
- return false;
-
- // Parse complete message and build event
- GroupActivatedEvent event = (GroupActivatedEvent) Util.
- jsonToObject(message, GroupActivatedEvent.class);
-
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, topology);
-
- } finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess (GroupActivatedEvent event,Topology topology) {
-
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroupRecursively(event.getGroupId());
-
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- }
- } else {
- // Apply changes to the topology
- if (!group.isStateTransitionValid(GroupStatus.Active)) {
- log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
- }
- group.setStatus(GroupStatus.Active);
-
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
deleted file mode 100644
index d053f5d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.GroupCreatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupCreatedProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(GroupCreatedProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (GroupCreatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
- return false;
-
- // Parse complete message and build event
- GroupCreatedEvent event = (GroupCreatedEvent) Util.
- jsonToObject(message, GroupCreatedEvent.class);
-
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, topology);
-
- } finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess (GroupCreatedEvent event,Topology topology) {
-
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroupRecursively(event.getGroupId());
-
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- }
- } else {
- // Apply changes to the topology
- if (!group.isStateTransitionValid(GroupStatus.Created)) {
- log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Created + " " +
- "for Group " + group.getAlias());
- }
- group.setStatus(GroupStatus.Created);
-
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java
deleted file mode 100644
index add40b4..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.GroupInactivateEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupInActivateProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(GroupInActivateProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (GroupInactivateEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
- return false;
-
- // Parse complete message and build event
- GroupInactivateEvent event = (GroupInactivateEvent) Util.
- jsonToObject(message, GroupInactivateEvent.class);
-
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, topology);
-
- } finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess(GroupInactivateEvent event, Topology topology) {
-
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroupRecursively(event.getGroupId());
-
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- }
- } else {
- group.setStatus(GroupStatus.Inactive);
- if (log.isInfoEnabled()) {
- log.info(String.format("Group updated as in-activated : %s",
- group.getUniqueIdentifier()));
- }
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java
deleted file mode 100644
index 767ff71..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupTerminatedProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(GroupTerminatedProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (GroupTerminatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
- return false;
-
- // Parse complete message and build event
- GroupTerminatedEvent event = (GroupTerminatedEvent) Util.
- jsonToObject(message, GroupTerminatedEvent.class);
-
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, topology);
-
- } finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess (GroupTerminatedEvent event,Topology topology) {
-
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroupRecursively(event.getGroupId());
-
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- }
- } else {
- // Apply changes to the topology
- if (!group.isStateTransitionValid(GroupStatus.Terminated)) {
- log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Terminated);
- }
- group.setStatus(GroupStatus.Terminated);
-
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java
deleted file mode 100644
index 5b15532..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupTerminatingProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(GroupTerminatingProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (GroupTerminatingEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
- return false;
-
- // Parse complete message and build event
- GroupTerminatingEvent event = (GroupTerminatingEvent) Util.
- jsonToObject(message, GroupTerminatingEvent.class);
-
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, topology);
-
- } finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess (GroupTerminatingEvent event,Topology topology) {
-
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroupRecursively(event.getGroupId());
-
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- }
- } else {
- // Apply changes to the topology
- if (!group.isStateTransitionValid(GroupStatus.Terminating)) {
- log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
- }
- group.setStatus(GroupStatus.Terminating);
-
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index d8d25d9..048e9a3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -22,7 +22,6 @@ package org.apache.stratos.messaging.message.processor.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.listener.EventListener;
-import org.apache.stratos.messaging.listener.applications.ApplicationUndeployedEventListener;
import org.apache.stratos.messaging.listener.topology.*;
import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
@@ -47,17 +46,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
private MemberMaintenanceModeProcessor memberMaintenanceModeProcessor;
private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
- private GroupCreatedProcessor groupCreatedProcessor;
- private GroupActivatedProcessor groupActivatedProcessor;
- private GroupInActivateProcessor groupInActivateProcessor;
- private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
- private ApplicationUndeployedMessageProcessor applicationUndeployedMessageProcessor;
- private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor;
- private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor;
- private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor;
- private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor;
- private GroupTerminatingProcessor groupTerminatingProcessor;
- private GroupTerminatedProcessor groupTerminatedProcessor;
private ClusterTerminatingProcessor clusterTerminatingProcessor;
private ClusterTerminatedProcessor clusterTerminatedProcessor;
@@ -111,39 +99,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
memberTerminatedMessageProcessor = new MemberTerminatedMessageProcessor();
add(memberTerminatedMessageProcessor);
- groupCreatedProcessor = new GroupCreatedProcessor();
- add(groupCreatedProcessor);
-
- groupActivatedProcessor = new GroupActivatedProcessor();
- add(groupActivatedProcessor);
-
- groupInActivateProcessor = new GroupInActivateProcessor();
- add(groupInActivateProcessor);
-
- groupTerminatingProcessor = new GroupTerminatingProcessor();
- add(groupTerminatingProcessor);
-
- groupTerminatedProcessor = new GroupTerminatedProcessor();
- add(groupTerminatedProcessor);
-
- applicationCreatedMessageProcessor = new ApplicationCreatedMessageProcessor();
- add(applicationCreatedMessageProcessor);
-
- applicationUndeployedMessageProcessor = new ApplicationUndeployedMessageProcessor();
- add(applicationUndeployedMessageProcessor);
-
- applicationActivatedMessageProcessor = new ApplicationActivatedMessageProcessor();
- add(applicationActivatedMessageProcessor);
-
- applicationInactivatedMessageProcessor = new ApplicationInactivatedMessageProcessor();
- add(applicationInactivatedMessageProcessor);
-
- applicationTerminatedMessageProcessor = new ApplicationTerminatedMessageProcessor();
- add(applicationTerminatedMessageProcessor);
-
- applicationTerminatingMessageProcessor = new ApplicationTerminatingMessageProcessor();
- add(applicationTerminatingMessageProcessor);
-
if (log.isDebugEnabled()) {
log.debug("Topology message processor chain initialized X1");
}
@@ -182,30 +137,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
serviceRemovedMessageProcessor.addEventListener(eventListener);
} else if (eventListener instanceof MemberMaintenanceListener) {
memberMaintenanceModeProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof GroupActivatedEventListener) {
- groupActivatedProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof GroupCreatedEventListener) {
- groupCreatedProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof GroupInActivateEventListener) {
- groupInActivateProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof GroupTerminatedEventListener){
- groupTerminatedProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof GroupTerminatingEventListener){
- groupTerminatingProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof ApplicationCreatedEventListener) {
- applicationCreatedMessageProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof ApplicationUndeployedEventListener) {
- applicationUndeployedMessageProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof ApplicationActivatedEventListener) {
- applicationActivatedMessageProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof ApplicationInActivateEventListener){
- applicationInactivatedMessageProcessor.addEventListener(eventListener);
- } else if(eventListener instanceof ApplicationTerminatedEventListener){
- applicationTerminatedMessageProcessor.addEventListener(eventListener);
- } else if(eventListener instanceof ApplicationTerminatingEventListener){
- applicationTerminatingMessageProcessor.addEventListener(eventListener);
- }
- else {
+ } else {
throw new RuntimeException("Unknown event listener");
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java
deleted file mode 100644
index 9eda9e0..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.receiver.application.status;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.listener.EventListener;
-import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
-import org.apache.stratos.messaging.message.processor.application.status.AppStatusMessageProcessorChain;
-import org.apache.stratos.messaging.util.Constants;
-
-import javax.jms.TextMessage;
-
-public class ApplicationStatusEventMessageDelegator implements Runnable {
- private static final Log log = LogFactory.getLog(ApplicationStatusEventMessageDelegator.class);
-
- private ApplicationStatusEventMessageQueue messageQueue;
- private MessageProcessorChain processorChain;
- private boolean terminated;
-
- public ApplicationStatusEventMessageDelegator(ApplicationStatusEventMessageQueue messageQueue) {
- this.messageQueue = messageQueue;
- this.processorChain = new AppStatusMessageProcessorChain();
- }
-
- public void addEventListener(EventListener eventListener) {
- processorChain.addEventListener(eventListener);
- }
-
- @Override
- public void run() {
- try {
- if (log.isInfoEnabled()) {
- log.info("Application status event message delegator started");
- }
-
- while (!terminated) {
- try {
- TextMessage message = messageQueue.take();
-
- // Retrieve the header
- String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
-
- // Retrieve the actual message
- String json = message.getText();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Application status event message received from queue: %s", type));
- }
-
- // Delegate message to message processor chain
- if (log.isDebugEnabled()) {
- log.debug(String.format("Delegating application status event message: %s", type));
- }
- processorChain.process(type, json, null);
- } catch (Exception e) {
- log.error("Failed to retrieve application status event message", e);
- }
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("Application status event message delegator failed", e);
- }
- }
- }
-
- /**
- * Terminate topology event message delegator thread.
- */
- public void terminate() {
- terminated = true;
- }
-
-
- private EventMessage jsonToEventMessage(String json) {
-
- EventMessage event = new EventMessage();
- String message;
-
- //split the message to 3 parts using ':' first is class name, second contains the text 'message' and the third contains
- //message
- String[] MessageParts = json.split(":", 3);
-
- String eventType = MessageParts[0].trim();
- eventType = eventType.substring(eventType.indexOf("\"") + 1, eventType.lastIndexOf("\""));
- if(log.isDebugEnabled()){
- log.debug(String.format("Extracted [event type] %s", eventType));
- }
-
- event.setEventName(eventType);
- String messageTag = MessageParts[1];
- messageTag = messageTag.substring(messageTag.indexOf("\"") + 1, messageTag.lastIndexOf("\""));
-
- if("message".equals(messageTag)){
- message = MessageParts[2].trim();
- //Remove trailing bracket twice to get the message
- message = message.substring(0, message.lastIndexOf("}")).trim();
- message = message.substring(0, message.lastIndexOf("}")).trim();
- if(message.indexOf('{') == 0 && message.indexOf('}') == message.length() - 1){
- if(log.isDebugEnabled()) {
- log.debug(String.format("[Extracted message] %s ", message));
- }
- event.setMessage(message);
- return event;
- }
- }
- return null;
- }
-
- private class EventMessage {
- private String eventName;
- private String message;
-
- private String getEventName() {
- return eventName;
- }
-
- private void setEventName(String eventName) {
- this.eventName = eventName;
- }
-
- public String getMessage() {
- return message;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
- }
-}