You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/10/31 08:51:35 UTC
[02/10] renaming application.status topic to applications
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java
new file mode 100644
index 0000000..9520f4e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.AppClusterTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class AppClusterTerminatingMessageProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(AppClusterTerminatingMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (AppClusterTerminatingEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ AppClusterTerminatingEvent event = (AppClusterTerminatingEvent) Util.
+ jsonToObject(message, AppClusterTerminatingEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received AppStatusClusterTerminatingEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java
new file mode 100644
index 0000000..1524b00
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.application.status.*;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Application Status processor chain is to handle the list processors to parse the application
+ * status.
+ */
+public class AppStatusMessageProcessorChain extends MessageProcessorChain {
+ private static final Log log = LogFactory.getLog(AppStatusMessageProcessorChain.class);
+
+ private AppStatusClusterCreatedMessageProcessor clusterCreatedMessageProcessor;
+ private AppStatusClusterActivatedMessageProcessor clusterActivatedMessageProcessor;
+ private AppStatusClusterInactivateMessageProcessor clusterInActivateMessageProcessor;
+ private AppStatusClusterTerminatingMessageProcessor clusterTerminatingMessageProcessor;
+ private AppStatusClusterTerminatedMessageProcessor clusterTerminatedMessageProcessor;
+ private AppStatusGroupCreatedMessageProcessor groupCreatedMessageProcessor;
+ private AppStatusGroupActivatedMessageProcessor groupActivatedMessageProcessor;
+ private AppStatusGroupInactivatedMessageProcessor groupInActivateMessageProcessor;
+ private AppStatusApplicationActivatedMessageProcessor appActivatedMessageProcessor;
+ private AppStatusApplicationCreatedMessageProcessor applicationStatusAppCreatedMessageProcessor;
+ private AppStatusApplicationInactivatedMessageProcessor applicationStatusAppInActivatedMessageProcessor;
+ private AppStatusApplicationTerminatedMessageProcessor applicationStatusAppTerminatedMessageProcessor;
+ private AppStatusApplicationTerminatingMessageProcessor applicationStatusAppTerminatingMessageProcessor;
+
+ private AppStatusGroupTerminatedMessageProcessor groupTerminatedMessageProcessor;
+ private AppStatusGroupTerminatingMessageProcessor groupTerminatingMessageProcessor;
+
+ public void initialize() {
+ // Add instance notifier event processors
+ clusterCreatedMessageProcessor= new AppStatusClusterCreatedMessageProcessor();
+ add(clusterCreatedMessageProcessor);
+
+ clusterActivatedMessageProcessor = new AppStatusClusterActivatedMessageProcessor();
+ add(clusterActivatedMessageProcessor);
+
+ clusterInActivateMessageProcessor = new AppStatusClusterInactivateMessageProcessor();
+ add(clusterInActivateMessageProcessor);
+
+ clusterTerminatingMessageProcessor = new AppStatusClusterTerminatingMessageProcessor();
+ add(clusterTerminatingMessageProcessor);
+
+ clusterTerminatedMessageProcessor = new AppStatusClusterTerminatedMessageProcessor();
+ add(clusterTerminatedMessageProcessor);
+
+ groupCreatedMessageProcessor = new AppStatusGroupCreatedMessageProcessor();
+ add(groupCreatedMessageProcessor);
+
+ groupActivatedMessageProcessor = new AppStatusGroupActivatedMessageProcessor();
+ add(groupActivatedMessageProcessor);
+
+ groupInActivateMessageProcessor = new AppStatusGroupInactivatedMessageProcessor();
+ add(groupInActivateMessageProcessor);
+
+ appActivatedMessageProcessor = new AppStatusApplicationActivatedMessageProcessor();
+ add(appActivatedMessageProcessor);
+
+ applicationStatusAppCreatedMessageProcessor = new AppStatusApplicationCreatedMessageProcessor();
+ this.add(applicationStatusAppCreatedMessageProcessor);
+
+ applicationStatusAppInActivatedMessageProcessor = new AppStatusApplicationInactivatedMessageProcessor();
+ this.add(applicationStatusAppInActivatedMessageProcessor);
+
+ applicationStatusAppTerminatedMessageProcessor = new AppStatusApplicationTerminatedMessageProcessor();
+ this.add(applicationStatusAppTerminatedMessageProcessor);
+
+ applicationStatusAppTerminatingMessageProcessor = new AppStatusApplicationTerminatingMessageProcessor();
+ this.add(applicationStatusAppTerminatingMessageProcessor);
+
+ groupTerminatedMessageProcessor = new AppStatusGroupTerminatedMessageProcessor();
+ this.add(groupTerminatedMessageProcessor);
+
+ groupTerminatingMessageProcessor = new AppStatusGroupTerminatingMessageProcessor();
+ this.add(groupTerminatingMessageProcessor);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Instance notifier message processor chain initialized");
+ }
+ }
+
+ public void addEventListener(EventListener eventListener) {
+ if(eventListener instanceof AppStatusClusterCreatedEventListener) {
+ clusterCreatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof AppStatusClusterActivatedEventListener) {
+ clusterActivatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof AppStatusClusterInactivateEventListener) {
+ clusterInActivateMessageProcessor.addEventListener(eventListener);
+ } else if(eventListener instanceof AppStatusGroupCreatedEventListener) {
+ groupCreatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof AppStatusGroupActivatedEventListener) {
+ groupActivatedMessageProcessor.addEventListener(eventListener);
+ } else if(eventListener instanceof AppStatusClusterTerminatedEventListener){
+ clusterTerminatedMessageProcessor.addEventListener(eventListener);
+ } else if(eventListener instanceof AppStatusClusterTerminatingEventListener){
+ clusterTerminatingMessageProcessor.addEventListener(eventListener);
+ }else if (eventListener instanceof AppStatusGroupInactivateEventListener) {
+ groupInActivateMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof AppStatusApplicationActivatedEventListener) {
+ appActivatedMessageProcessor.addEventListener(eventListener);
+ } else if(eventListener instanceof AppStatusApplicationInactivatedEventListener){
+ applicationStatusAppInActivatedMessageProcessor.addEventListener(eventListener);
+ } else if(eventListener instanceof AppStatusApplicationCreatedEventListener){
+ applicationStatusAppCreatedMessageProcessor.addEventListener(eventListener);
+ } else if(eventListener instanceof AppStatusApplicationTerminatingEventListener){
+ applicationStatusAppTerminatingMessageProcessor.addEventListener(eventListener);
+ } else if(eventListener instanceof AppStatusApplicationTerminatedEventListener){
+ applicationStatusAppTerminatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof AppStatusGroupTerminatingEventListener){
+ groupTerminatingMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof AppStatusGroupTerminatedEventListener){
+ groupTerminatedMessageProcessor.addEventListener(eventListener);
+ } else
+ {
+ throw new RuntimeException("Unknown event listener " + eventListener.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java
new file mode 100644
index 0000000..71bd50e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.ApplicationActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationActivatedMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(ApplicationActivatedMessageProcessor.class);
+
+
+ private MessageProcessor nextProcessor;
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (ApplicationActivatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ ApplicationActivatedEvent event =
+ (ApplicationActivatedEvent) Util.jsonToObject(message, ApplicationActivatedEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received ApplicationActivatedEvent in application status topic: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to process group activated message " +
+ "using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
new file mode 100644
index 0000000..db5d777
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.ApplicationCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationCreatedMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(ApplicationCreatedMessageProcessor.class);
+
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (ApplicationCreatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ ApplicationCreatedEvent event =
+ (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received ApplicationCreated Event in application status topic: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to process group activated message " +
+ "using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
new file mode 100644
index 0000000..d8f2aac
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.ApplicationInactivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationInactivatedMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(ApplicationInactivatedMessageProcessor.class);
+
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (ApplicationInactivatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ ApplicationInactivatedEvent event =
+ (ApplicationInactivatedEvent) Util.jsonToObject(message, ApplicationInactivatedEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received ApplicationInActivatedEvent in application status topic: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to process group activated message " +
+ "using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
new file mode 100644
index 0000000..a121ffb
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(ApplicationTerminatedMessageProcessor.class);
+
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (ApplicationTerminatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ ApplicationTerminatedEvent event =
+ (ApplicationTerminatedEvent) Util.jsonToObject(message, ApplicationTerminatedEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received ApplicationTerminatedEvent in application status topic: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to process group activated message " +
+ "using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
new file mode 100644
index 0000000..280de2c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.ApplicationTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(ApplicationTerminatingMessageProcessor.class);
+
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (ApplicationTerminatingEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ ApplicationTerminatingEvent event =
+ (ApplicationTerminatingEvent) Util.jsonToObject(message, ApplicationTerminatingEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received ApplicationTerminatingEvent in application status topic: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to process group activated message " +
+ "using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
new file mode 100644
index 0000000..02ddda8
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.GroupActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class GroupActivatedMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(GroupActivatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (GroupActivatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ GroupActivatedEvent event =
+ (GroupActivatedEvent) Util.jsonToObject(message, GroupActivatedEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received GroupActivatedEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to process group activated message " +
+ "using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
new file mode 100644
index 0000000..d04d7f9
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.GroupCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class GroupCreatedMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(GroupCreatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (GroupCreatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ GroupCreatedEvent event =
+ (GroupCreatedEvent) Util.jsonToObject(message, GroupCreatedEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received AppStatusGroupCreatedEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to process group created message " +
+ "using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
new file mode 100644
index 0000000..6cf2587
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.AppStatusGroupInactivateEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class GroupInactivatedMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(GroupInactivatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (AppStatusGroupInactivateEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ AppStatusGroupInactivateEvent event =
+ (AppStatusGroupInactivateEvent) Util.jsonToObject(message, AppStatusGroupInactivateEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received GroupInActivateEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to process group in activated message " +
+ "using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
new file mode 100644
index 0000000..a917a14
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.GroupTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class GroupTerminatedMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(GroupTerminatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (GroupTerminatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ GroupTerminatedEvent event =
+ (GroupTerminatedEvent) Util.jsonToObject(message, GroupTerminatedEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received GroupTerminatingEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to process group in GroupTerminatingEvent message " +
+ "using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
new file mode 100644
index 0000000..63c055d
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.applications.GroupTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class GroupTerminatingMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(GroupTerminatingMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (GroupTerminatingEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ GroupTerminatingEvent event =
+ (GroupTerminatingEvent) Util.jsonToObject(message, GroupTerminatingEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received GroupTerminatingEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to process group in GroupTerminatingEvent message " +
+ "using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}