You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/01/03 08:58:15 UTC

git commit: introducing new state to member as readytoshutdown - STRATOS-330

Updated Branches:
  refs/heads/master 5102bd8bf -> 7fab5047c


introducing new state to member as readytoshutdown -  STRATOS-330


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7fab5047
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7fab5047
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7fab5047

Branch: refs/heads/master
Commit: 7fab5047c59b390a2ea5828916f87d2a0f6ce1d4
Parents: 5102bd8
Author: rekathiru <rt...@gmail.com>
Authored: Fri Jan 3 13:28:10 2014 +0530
Committer: rekathiru <rt...@gmail.com>
Committed: Fri Jan 3 13:28:10 2014 +0530

----------------------------------------------------------------------
 .../topology/MemberReadyToShutdownEvent.java    |  61 ++++++++
 .../MemberReadyToShutdownEventListener.java     |  25 ++++
 .../MemberReadyToShutdownMessageProcessor.java  | 146 +++++++++++++++++++
 .../topology/TopologyMessageProcessorChain.java |   6 +
 4 files changed, 238 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java
new file mode 100644
index 0000000..7c93b3e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java
@@ -0,0 +1,61 @@
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+public class MemberReadyToShutdownEvent extends TopologyEvent implements Serializable {
+    private final String serviceName;
+    private final String clusterId;
+    private final String networkPartitionId;
+    private final String partitionId;
+    private final String memberId;
+    private MemberStatus status;
+    private Properties properties;
+
+    public MemberReadyToShutdownEvent(String serviceName, String clusterId,
+                                      String networkPartitionId, String partitionId, String memberId) {
+        this.serviceName = serviceName;
+        this.clusterId = clusterId;
+        this.networkPartitionId = networkPartitionId;
+        this.partitionId = partitionId;
+        this.memberId = memberId;
+    }
+
+     public String getServiceName() {
+        return serviceName;
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public String getMemberId() {
+        return memberId;
+    }
+
+    public MemberStatus getStatus() {
+        return status;
+    }
+
+    public void setStatus(MemberStatus status) {
+        this.status = status;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+	public String getPartitionId() {
+		return partitionId;
+	}
+
+    public String getNetworkPartitionId() {
+        return networkPartitionId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java
new file mode 100644
index 0000000..6eeea37
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.topology;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class MemberReadyToShutdownEventListener extends EventListener {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
new file mode 100644
index 0000000..b2e43b8
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{
+    private static final Log log = LogFactory.getLog(MemberReadyToShutdownMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (MemberReadyToShutdownEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            MemberReadyToShutdownEvent event = (MemberReadyToShutdownEvent) Util.
+                                            jsonToObject(message, MemberReadyToShutdownEvent.class);
+
+            // Apply service filter
+            if (TopologyServiceFilter.getInstance().isActive()) {
+                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (TopologyClusterFilter.getInstance().isActive()) {
+                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] %s",
+                            event.getServiceName()));
+                }
+                return false;
+            }
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                            event.getServiceName(), event.getClusterId()));
+                }
+                return false;
+            }
+            Member member = cluster.getMember(event.getMemberId());
+            if (member == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+
+            // Apply member filter
+            if(TopologyMemberFilter.getInstance().isActive()) {
+                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            if (member.getStatus() == MemberStatus.ReadyToShutDown) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member already updated as Ready to Shutdown: " +
+                            "[service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.ReadyToShutDown);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index 1281761..83dde30 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -39,6 +39,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
     private InstanceSpawnedMessageProcessor instanceSpawnedMessageProcessor;
     private MemberStartedMessageProcessor memberStartedMessageProcessor;
     private MemberActivatedMessageProcessor memberActivatedMessageProcessor;
+    private MemberReadyToShutdownMessageProcessor memberReadyToShutdownProcessor;
     private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
     private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
 
@@ -68,6 +69,9 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
         memberActivatedMessageProcessor = new MemberActivatedMessageProcessor();
         add(memberActivatedMessageProcessor);
 
+        memberReadyToShutdownProcessor = new MemberReadyToShutdownMessageProcessor();
+        add(memberReadyToShutdownProcessor);
+
         memberSuspendedMessageProcessor = new MemberSuspendedMessageProcessor();
         add(memberSuspendedMessageProcessor);
 
@@ -92,6 +96,8 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
             memberActivatedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof MemberStartedEventListener) {
             memberStartedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof MemberReadyToShutdownEventListener) {
+            memberReadyToShutdownProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof MemberSuspendedEventListener) {
             memberSuspendedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof MemberTerminatedEventListener) {