You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/12/05 14:40:28 UTC
[2/5] stratos git commit: adding instance prefix to events, ,
processor and listener
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatedMessageProcessor.java
new file mode 100644
index 0000000..c7a39b5
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatedMessageProcessor.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.event.applications.ApplicationInstanceTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor responsible to process the application Inactivation even and update the Topology.
+ */
+public class ApplicationInstanceTerminatedMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(ApplicationInstanceTerminatedMessageProcessor.class);
+
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Applications applications = (Applications) object;
+
+ if (ApplicationInstanceTerminatedEvent.class.getName().equals(type)) {
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ ApplicationInstanceTerminatedEvent event = (ApplicationInstanceTerminatedEvent) Util.
+ jsonToObject(message, ApplicationInstanceTerminatedEvent.class);
+
+ ApplicationsUpdater.acquireWriteLockForApplications();
+
+ try {
+ return doProcess(event, applications);
+
+ } finally {
+ ApplicationsUpdater.releaseWriteLockForApplications();
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, applications);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess(ApplicationInstanceTerminatedEvent event, Applications applications) {
+
+ // check if required properties are available
+ if (event.getAppId() == null) {
+ String errorMsg = "Application Id of application removed event is invalid";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ // check if an Application with same name exists in applications
+ String appId = event.getAppId();
+ if (applications.applicationExists(appId)) {
+ log.warn("Application with id [ " + appId + " ] still exists in Applications, removing it");
+ applications.removeApplication(appId);
+ }
+
+ notifyEventListeners(event);
+ return true;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatingMessageProcessor.java
new file mode 100644
index 0000000..7679091
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInstanceTerminatingMessageProcessor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.instance.ApplicationInstance;
+import org.apache.stratos.messaging.event.applications.ApplicationInstanceTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor responsible to process the application Inactivation even and update the Topology.
+ */
+public class ApplicationInstanceTerminatingMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(ApplicationInstanceTerminatingMessageProcessor.class);
+
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Applications applications = (Applications) object;
+
+ if (ApplicationInstanceTerminatingEvent.class.getName().equals(type)) {
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ ApplicationInstanceTerminatingEvent event = (ApplicationInstanceTerminatingEvent) Util.
+ jsonToObject(message, ApplicationInstanceTerminatingEvent.class);
+
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, applications);
+
+ } finally {
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, applications);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess(ApplicationInstanceTerminatingEvent event, Applications applications) {
+
+ // Validate event against the existing applications
+ Application application = applications.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ } else {
+ // Apply changes to the applications
+ ApplicationInstance context = application.getInstanceContexts(event.getInstanceId());
+ if(context == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application Instance not exists in Group: [AppId] %s" +
+ "[instanceId] %s", event.getAppId(), event.getInstanceId()));
+ return false;
+ }
+ }
+ ApplicationStatus status = ApplicationStatus.Terminating;
+ if (!context.isStateTransitionValid(status)) {
+ log.error("Invalid State transfer from [ " + context.getStatus() +
+ " ] to [ " + status + " ]");
+ }
+ context.setStatus(status);
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
deleted file mode 100644
index a6d2cd5..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.event.applications.ApplicationInstanceTerminatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor responsible to process the application Inactivation even and update the Topology.
- */
-public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(ApplicationTerminatedMessageProcessor.class);
-
-
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
-
- @Override
- public boolean process(String type, String message, Object object) {
- Applications applications = (Applications) object;
-
- if (ApplicationInstanceTerminatedEvent.class.getName().equals(type)) {
- // Return if applications has not been initialized
- if (!applications.isInitialized())
- return false;
-
- // Parse complete message and build event
- ApplicationInstanceTerminatedEvent event = (ApplicationInstanceTerminatedEvent) Util.
- jsonToObject(message, ApplicationInstanceTerminatedEvent.class);
-
- ApplicationsUpdater.acquireWriteLockForApplications();
-
- try {
- return doProcess(event, applications);
-
- } finally {
- ApplicationsUpdater.releaseWriteLockForApplications();
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, applications);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess(ApplicationInstanceTerminatedEvent event, Applications applications) {
-
- // check if required properties are available
- if (event.getAppId() == null) {
- String errorMsg = "Application Id of application removed event is invalid";
- log.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
-
- // check if an Application with same name exists in applications
- String appId = event.getAppId();
- if (applications.applicationExists(appId)) {
- log.warn("Application with id [ " + appId + " ] still exists in Applications, removing it");
- applications.removeApplication(appId);
- }
-
- notifyEventListeners(event);
- return true;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
deleted file mode 100644
index 33ad474..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.instance.ApplicationInstance;
-import org.apache.stratos.messaging.event.applications.ApplicationInstanceTerminatingEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor responsible to process the application Inactivation even and update the Topology.
- */
-public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
- private static final Log log =
- LogFactory.getLog(ApplicationTerminatingMessageProcessor.class);
-
-
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
-
- @Override
- public boolean process(String type, String message, Object object) {
- Applications applications = (Applications) object;
-
- if (ApplicationInstanceTerminatingEvent.class.getName().equals(type)) {
- // Return if applications has not been initialized
- if (!applications.isInitialized())
- return false;
-
- // Parse complete message and build event
- ApplicationInstanceTerminatingEvent event = (ApplicationInstanceTerminatingEvent) Util.
- jsonToObject(message, ApplicationInstanceTerminatingEvent.class);
-
- ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, applications);
-
- } finally {
- ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, applications);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess(ApplicationInstanceTerminatingEvent event, Applications applications) {
-
- // Validate event against the existing applications
- Application application = applications.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- } else {
- // Apply changes to the applications
- ApplicationInstance context = application.getInstanceContexts(event.getInstanceId());
- if(context == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application Instance not exists in Group: [AppId] %s" +
- "[instanceId] %s", event.getAppId(), event.getInstanceId()));
- return false;
- }
- }
- ApplicationStatus status = ApplicationStatus.Terminating;
- if (!context.isStateTransitionValid(status)) {
- log.error("Invalid State transfer from [ " + context.getStatus() +
- " ] to [ " + status + " ]");
- }
- context.setStatus(status);
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
index a420eea..b88a3ec 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
@@ -31,52 +31,52 @@ import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
public class ApplicationsMessageProcessorChain extends MessageProcessorChain {
private static final Log log = LogFactory.getLog(ApplicationsMessageProcessorChain.class);
- private GroupResetProcessor groupCreatedMessageProcessor;
- private GroupActivatedProcessor groupActivatedMessageProcessor;
- private GroupInActivateProcessor groupInActivateMessageProcessor;
- private GroupTerminatedProcessor groupTerminatedProcessor;
- private GroupTerminatingProcessor groupTerminatingProcessor;
- private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor;
+ private GroupInstanceCreatedProcessor groupCreatedMessageProcessor;
+ private GroupInstanceActivatedProcessor groupActivatedMessageProcessor;
+ private GroupInstanceInActivateProcessor groupInActivateMessageProcessor;
+ private GroupInstanceTerminatedProcessor groupTerminatedProcessor;
+ private GroupInstanceTerminatingProcessor groupTerminatingProcessor;
+ private ApplicationInstanceActivatedMessageProcessor applicationActivatedMessageProcessor;
private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
- private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor;
- private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor;
- private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor;
+ private ApplicationInstanceInactivatedMessageProcessor applicationInactivatedMessageProcessor;
+ private ApplicationInstanceTerminatedMessageProcessor applicationTerminatedMessageProcessor;
+ private ApplicationInstanceTerminatingMessageProcessor applicationTerminatingMessageProcessor;
private CompleteApplicationsMessageProcessor completeApplicationsMessageProcessor;
public void initialize() {
// Add instance notifier event processors
- groupCreatedMessageProcessor = new GroupResetProcessor();
+ groupCreatedMessageProcessor = new GroupInstanceCreatedProcessor();
add(groupCreatedMessageProcessor);
- groupActivatedMessageProcessor = new GroupActivatedProcessor();
+ groupActivatedMessageProcessor = new GroupInstanceActivatedProcessor();
add(groupActivatedMessageProcessor);
- groupInActivateMessageProcessor = new GroupInActivateProcessor();
+ groupInActivateMessageProcessor = new GroupInstanceInActivateProcessor();
add(groupInActivateMessageProcessor);
- groupTerminatedProcessor = new GroupTerminatedProcessor();
+ groupTerminatedProcessor = new GroupInstanceTerminatedProcessor();
add(groupTerminatedProcessor);
- groupTerminatingProcessor = new GroupTerminatingProcessor();
+ groupTerminatingProcessor = new GroupInstanceTerminatingProcessor();
add(groupTerminatingProcessor);
- applicationActivatedMessageProcessor = new ApplicationActivatedMessageProcessor();
+ applicationActivatedMessageProcessor = new ApplicationInstanceActivatedMessageProcessor();
add(applicationActivatedMessageProcessor);
applicationCreatedMessageProcessor = new ApplicationCreatedMessageProcessor();
add(applicationCreatedMessageProcessor);
- applicationInactivatedMessageProcessor = new ApplicationInactivatedMessageProcessor();
+ applicationInactivatedMessageProcessor = new ApplicationInstanceInactivatedMessageProcessor();
add(applicationInactivatedMessageProcessor);
- applicationTerminatingMessageProcessor = new ApplicationTerminatingMessageProcessor();
+ applicationTerminatingMessageProcessor = new ApplicationInstanceTerminatingMessageProcessor();
add(applicationTerminatingMessageProcessor);
completeApplicationsMessageProcessor = new CompleteApplicationsMessageProcessor();
add(completeApplicationsMessageProcessor);
- applicationTerminatedMessageProcessor = new ApplicationTerminatedMessageProcessor();
+ applicationTerminatedMessageProcessor = new ApplicationInstanceTerminatedMessageProcessor();
add(applicationTerminatedMessageProcessor);
if (log.isDebugEnabled()) {
@@ -86,25 +86,25 @@ public class ApplicationsMessageProcessorChain extends MessageProcessorChain {
public void addEventListener(EventListener eventListener) {
- if (eventListener instanceof GroupResetEventListener) {
+ if (eventListener instanceof GroupInstanceCreatedEventListener) {
groupCreatedMessageProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof GroupInactivateEventListener) {
+ } else if (eventListener instanceof GroupInstanceInactivateEventListener) {
groupInActivateMessageProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof GroupActivatedEventListener) {
+ } else if (eventListener instanceof GroupInstanceActivatedEventListener) {
groupActivatedMessageProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof GroupTerminatingEventListener) {
+ } else if (eventListener instanceof GroupInstanceTerminatingEventListener) {
groupTerminatingProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof GroupTerminatedEventListener) {
+ } else if (eventListener instanceof GroupInstanceTerminatedEventListener) {
groupTerminatedProcessor.addEventListener(eventListener);
} else if (eventListener instanceof ApplicationCreatedEventListener) {
applicationCreatedMessageProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof ApplicationActivatedEventListener) {
+ } else if (eventListener instanceof ApplicationInstanceActivatedEventListener) {
applicationActivatedMessageProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof ApplicationInactivatedEventListener) {
+ } else if (eventListener instanceof ApplicationInstanceInactivatedEventListener) {
applicationInactivatedMessageProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof ApplicationTerminatingEventListener) {
+ } else if (eventListener instanceof ApplicationInstanceTerminatingEventListener) {
applicationTerminatingMessageProcessor.addEventListener(eventListener);
- } else if (eventListener instanceof ApplicationTerminatedEventListener) {
+ } else if (eventListener instanceof ApplicationInstanceTerminatedEventListener) {
applicationTerminatedMessageProcessor.addEventListener(eventListener);
} else if (eventListener instanceof CompleteApplicationsEventListener) {
completeApplicationsMessageProcessor.addEventListener(eventListener);
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
deleted file mode 100644
index f37a7f4..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.event.applications.GroupActivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupActivatedProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(GroupActivatedProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Applications applications = (Applications) object;
-
- if (GroupActivatedEvent.class.getName().equals(type)) {
- // Return if applications has not been initialized
- if (!applications.isInitialized())
- return false;
-
- // Parse complete message and build event
- GroupActivatedEvent event = (GroupActivatedEvent) Util.
- jsonToObject(message, GroupActivatedEvent.class);
-
- ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, applications);
-
- } finally {
- ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, applications);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess(GroupActivatedEvent event, Applications applications) {
-
- // Validate event against the existing topology
- Application application = applications.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroupRecursively(event.getGroupId());
-
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- return false;
- }
- } else {
- GroupInstance context = group.getInstanceContexts(event.getInstanceId());
- if(context == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
- "[instanceId] %s", event.getAppId(), event.getGroupId(),
- event.getInstanceId()));
- return false;
- }
- }
- // Apply changes to the topology
- GroupStatus status = GroupStatus.Active;
- if (!context.isStateTransitionValid(status)) {
- log.error("Invalid State Transition from " + context.getStatus() + " to " +
- status);
- return false;
- }
- context.setStatus(status);
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
deleted file mode 100644
index 41a2881..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.event.applications.GroupInactivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupInActivateProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(GroupInActivateProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Applications applications = (Applications) object;
-
- if (GroupInactivatedEvent.class.getName().equals(type)) {
- // Return if applications has not been initialized
- if (!applications.isInitialized())
- return false;
-
- // Parse complete message and build event
- GroupInactivatedEvent event = (GroupInactivatedEvent) Util.
- jsonToObject(message, GroupInactivatedEvent.class);
-
- ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, applications);
-
- } finally {
- ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, applications);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess(GroupInactivatedEvent event, Applications applications) {
-
- // Validate event against the existing applications
- Application application = applications.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroupRecursively(event.getGroupId());
-
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- return false;
- }
- } else {
- GroupInstance context = group.getInstanceContexts(event.getInstanceId());
- if(context == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
- "[instanceId] %s", event.getAppId(), event.getGroupId(),
- event.getInstanceId()));
- return false;
- }
- }
- // Apply changes to the topology
- GroupStatus status = GroupStatus.Inactive;
- if (!context.isStateTransitionValid(status)) {
- log.error("Invalid State Transition from " + context.getStatus() + " to " +
- status);
- return false;
- }
- context.setStatus(status);
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceActivatedProcessor.java
new file mode 100644
index 0000000..c74d176
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceActivatedProcessor.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.event.applications.GroupInstanceActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInstanceActivatedProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(GroupInstanceActivatedProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Applications applications = (Applications) object;
+
+ if (GroupInstanceActivatedEvent.class.getName().equals(type)) {
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ GroupInstanceActivatedEvent event = (GroupInstanceActivatedEvent) Util.
+ jsonToObject(message, GroupInstanceActivatedEvent.class);
+
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, applications);
+
+ } finally {
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, applications);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess(GroupInstanceActivatedEvent event, Applications applications) {
+
+ // Validate event against the existing topology
+ Application application = applications.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroupRecursively(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ return false;
+ }
+ } else {
+ GroupInstance context = group.getInstanceContexts(event.getInstanceId());
+ if(context == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
+ "[instanceId] %s", event.getAppId(), event.getGroupId(),
+ event.getInstanceId()));
+ return false;
+ }
+ }
+ // Apply changes to the topology
+ GroupStatus status = GroupStatus.Active;
+ if (!context.isStateTransitionValid(status)) {
+ log.error("Invalid State Transition from " + context.getStatus() + " to " +
+ status);
+ return false;
+ }
+ context.setStatus(status);
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceCreatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceCreatedProcessor.java
new file mode 100644
index 0000000..aa17fbf
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceCreatedProcessor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.event.applications.GroupInstanceCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInstanceCreatedProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(GroupInstanceCreatedProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Applications applications = (Applications) object;
+
+ if (GroupInstanceCreatedEvent.class.getName().equals(type)) {
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ GroupInstanceCreatedEvent event = (GroupInstanceCreatedEvent) Util.
+ jsonToObject(message, GroupInstanceCreatedEvent.class);
+
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, applications);
+
+ } finally {
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, applications);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess(GroupInstanceCreatedEvent event, Applications applications) {
+
+ // Validate event against the existing applications
+ Application application = applications.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroupRecursively(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ return false;
+ }
+ } else {
+ // Apply changes to the applications
+ String instanceId = event.getGroupInstance().getInstanceId();
+ GroupInstance context = group.getInstanceContexts(instanceId);
+ if(context == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
+ "[instanceId] %s", event.getAppId(), event.getGroupId(),
+ instanceId));
+ return false;
+ }
+ }
+ // Apply changes to the topology
+ group.addInstance(instanceId, event.getGroupInstance());
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceInActivateProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceInActivateProcessor.java
new file mode 100644
index 0000000..c1e00bf
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceInActivateProcessor.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.event.applications.GroupInstanceInactivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInstanceInActivateProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(GroupInstanceInActivateProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Applications applications = (Applications) object;
+
+ if (GroupInstanceInactivatedEvent.class.getName().equals(type)) {
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ GroupInstanceInactivatedEvent event = (GroupInstanceInactivatedEvent) Util.
+ jsonToObject(message, GroupInstanceInactivatedEvent.class);
+
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, applications);
+
+ } finally {
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, applications);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess(GroupInstanceInactivatedEvent event, Applications applications) {
+
+ // Validate event against the existing applications
+ Application application = applications.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroupRecursively(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ return false;
+ }
+ } else {
+ GroupInstance context = group.getInstanceContexts(event.getInstanceId());
+ if(context == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
+ "[instanceId] %s", event.getAppId(), event.getGroupId(),
+ event.getInstanceId()));
+ return false;
+ }
+ }
+ // Apply changes to the topology
+ GroupStatus status = GroupStatus.Inactive;
+ if (!context.isStateTransitionValid(status)) {
+ log.error("Invalid State Transition from " + context.getStatus() + " to " +
+ status);
+ return false;
+ }
+ context.setStatus(status);
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatingProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatingProcessor.java
new file mode 100644
index 0000000..e3a8708
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatingProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.event.applications.GroupInstanceTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInstanceTerminatingProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(GroupInstanceTerminatingProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Applications applications = (Applications) object;
+
+ if (GroupInstanceTerminatingEvent.class.getName().equals(type)) {
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ GroupInstanceTerminatingEvent event = (GroupInstanceTerminatingEvent) Util.
+ jsonToObject(message, GroupInstanceTerminatingEvent.class);
+
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, applications);
+
+ } finally {
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, applications);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess(GroupInstanceTerminatingEvent event, Applications applications) {
+
+ // Validate event against the existing applications
+ Application application = applications.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroupRecursively(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ return false;
+ }
+ } else {
+ // Apply changes to the applications
+ GroupInstance context = group.getInstanceContexts(event.getInstanceId());
+ if(context == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
+ "[instanceId] %s", event.getAppId(), event.getGroupId(),
+ event.getInstanceId()));
+ return false;
+ }
+ }
+ // Apply changes to the topology
+ GroupStatus status = GroupStatus.Terminating;
+ if (!context.isStateTransitionValid(status)) {
+ log.error("Invalid State Transition from " + context.getStatus() + " to " +
+ status);
+ return false;
+ }
+ context.setStatus(status);
+
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupResetProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupResetProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupResetProcessor.java
deleted file mode 100644
index f999ff3..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupResetProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.event.applications.GroupResetEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupResetProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(GroupResetProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Applications applications = (Applications) object;
-
- if (GroupResetEvent.class.getName().equals(type)) {
- // Return if applications has not been initialized
- if (!applications.isInitialized())
- return false;
-
- // Parse complete message and build event
- GroupResetEvent event = (GroupResetEvent) Util.
- jsonToObject(message, GroupResetEvent.class);
-
- ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, applications);
-
- } finally {
- ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, applications);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess(GroupResetEvent event, Applications applications) {
-
- // Validate event against the existing applications
- Application application = applications.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroupRecursively(event.getGroupId());
-
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- return false;
- }
- } else {
- // Apply changes to the applications
- GroupInstance context = group.getInstanceContexts(event.getInstanceId());
- if(context == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
- "[instanceId] %s", event.getAppId(), event.getGroupId(),
- event.getInstanceId()));
- return false;
- }
- }
- // Apply changes to the topology
- GroupStatus status = GroupStatus.Inactive;
- if (!context.isStateTransitionValid(status)) {
- log.error("Invalid State Transition from " + context.getStatus() + " to " +
- status);
- return false;
- }
- context.setStatus(status);
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
deleted file mode 100644
index d263174..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.event.applications.GroupTerminatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupTerminatedProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(GroupTerminatedProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Applications applications = (Applications) object;
-
- if (GroupTerminatedEvent.class.getName().equals(type)) {
- // Return if applications has not been initialized
- if (!applications.isInitialized())
- return false;
-
- // Parse complete message and build event
- GroupTerminatedEvent event = (GroupTerminatedEvent) Util.
- jsonToObject(message, GroupTerminatedEvent.class);
-
- ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, applications);
-
- } finally {
- ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, applications);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess(GroupTerminatedEvent event, Applications applications) {
-
- // Validate event against the existing applications
- Application application = applications.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroupRecursively(event.getGroupId());
-
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- return false;
- }
- } else {
- GroupInstance context = group.getInstanceContexts(event.getInstanceId());
- if(context == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
- "[instanceId] %s", event.getAppId(), event.getGroupId(),
- event.getInstanceId()));
- return false;
- }
- }
- // Apply changes to the topology
- GroupStatus status = GroupStatus.Terminated;
- if (!context.isStateTransitionValid(status)) {
- log.error("Invalid State Transition from " + context.getStatus() + " to " +
- status);
- return false;
- }
- context.setStatus(status);
-
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
deleted file mode 100644
index 69df56b..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.event.applications.GroupTerminatingEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupTerminatingProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(GroupTerminatingProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Applications applications = (Applications) object;
-
- if (GroupTerminatingEvent.class.getName().equals(type)) {
- // Return if applications has not been initialized
- if (!applications.isInitialized())
- return false;
-
- // Parse complete message and build event
- GroupTerminatingEvent event = (GroupTerminatingEvent) Util.
- jsonToObject(message, GroupTerminatingEvent.class);
-
- ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
-
- try {
- return doProcess(event, applications);
-
- } finally {
- ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, applications);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess(GroupTerminatingEvent event, Applications applications) {
-
- // Validate event against the existing applications
- Application application = applications.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroupRecursively(event.getGroupId());
-
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- return false;
- }
- } else {
- // Apply changes to the applications
- GroupInstance context = group.getInstanceContexts(event.getInstanceId());
- if(context == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
- "[instanceId] %s", event.getAppId(), event.getGroupId(),
- event.getInstanceId()));
- return false;
- }
- }
- // Apply changes to the topology
- GroupStatus status = GroupStatus.Terminating;
- if (!context.isStateTransitionValid(status)) {
- log.error("Invalid State Transition from " + context.getStatus() + " to " +
- status);
- return false;
- }
- context.setStatus(status);
-
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/cf1b3727/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
deleted file mode 100644
index 56f98cc..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.instance.ClusterInstance;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
-import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the cluster activated event
- */
-public class ClusterActivatedProcessor extends MessageProcessor {
- private static final Log log = LogFactory.getLog(ClusterActivatedProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
-
- Topology topology = (Topology) object;
-
- if (ClusterActivatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized()) {
- return false;
- }
-
- // Parse complete message and build event
- ClusterActivatedEvent event = (ClusterActivatedEvent) Util.
- jsonToObject(message, ClusterActivatedEvent.class);
-
- TopologyUpdater.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
- try {
- return doProcess(event, topology);
-
- } finally {
- TopologyUpdater.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
- }
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-
- private boolean doProcess(ClusterActivatedEvent event, Topology topology) {
-
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
- }
-
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
- }
-
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
- }
- return false;
- }
- Cluster cluster = service.getCluster(event.getClusterId());
-
- if (cluster == null) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
- event.getClusterId()));
- return false;
- }
- } else {
- // Apply changes to the topology
- ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
- if(context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return false;
- }
- ClusterStatus status = ClusterStatus.Active;
- if (!context.isStateTransitionValid(status)) {
- log.error("Invalid State Transition from " + context.getStatus() + " to " + status);
- }
- context.setStatus(status);
-
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- }
-
-}