You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/12/05 14:57:27 UTC

stratos git commit: renaming the class

Repository: stratos
Updated Branches:
  refs/heads/master 24ba6982f -> 3309e9998


renaming the class


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/3309e999
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/3309e999
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/3309e999

Branch: refs/heads/master
Commit: 3309e9998b61fef24278a7a17046399d263f766a
Parents: 24ba698
Author: reka <rt...@gmail.com>
Authored: Fri Dec 5 19:27:17 2014 +0530
Committer: reka <rt...@gmail.com>
Committed: Fri Dec 5 19:27:17 2014 +0530

----------------------------------------------------------------------
 .../GroupInstanceTerminatedProcessor.java       | 121 +++++++++++++++++++
 1 file changed, 121 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/3309e999/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatedProcessor.java
new file mode 100644
index 0000000..9fe8919
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInstanceTerminatedProcessor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.event.applications.GroupInstanceTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInstanceTerminatedProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupInstanceTerminatedProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Applications applications = (Applications) object;
+
+        if (GroupInstanceTerminatedEvent.class.getName().equals(type)) {
+            // Return if applications has not been initialized
+            if (!applications.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupInstanceTerminatedEvent event = (GroupInstanceTerminatedEvent) Util.
+                    jsonToObject(message, GroupInstanceTerminatedEvent.class);
+
+            ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, applications);
+
+            } finally {
+                ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, applications);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess(GroupInstanceTerminatedEvent event, Applications applications) {
+
+        // Validate event against the existing applications
+        Application application = applications.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+                return false;
+            }
+        } else {
+            GroupInstance context = group.getInstanceContexts(event.getInstanceId());
+            if(context == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Group Instance not exists in Group: [AppId] %s [groupId] %s " +
+                                    "[instanceId] %s", event.getAppId(), event.getGroupId(),
+                            event.getInstanceId()));
+                    return false;
+                }
+            }
+            // Apply changes to the topology
+            GroupStatus status = GroupStatus.Terminated;
+            if (!context.isStateTransitionValid(status)) {
+                log.error("Invalid State Transition from " + context.getStatus() + " to " +
+                        status);
+                return false;
+            }
+            context.setStatus(status);
+
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}