You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/01/03 08:58:15 UTC
git commit: introducing new state to member as readytoshutdown -
STRATOS-330
Updated Branches:
refs/heads/master 5102bd8bf -> 7fab5047c
introducing new state to member as readytoshutdown - STRATOS-330
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7fab5047
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7fab5047
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7fab5047
Branch: refs/heads/master
Commit: 7fab5047c59b390a2ea5828916f87d2a0f6ce1d4
Parents: 5102bd8
Author: rekathiru <rt...@gmail.com>
Authored: Fri Jan 3 13:28:10 2014 +0530
Committer: rekathiru <rt...@gmail.com>
Committed: Fri Jan 3 13:28:10 2014 +0530
----------------------------------------------------------------------
.../topology/MemberReadyToShutdownEvent.java | 61 ++++++++
.../MemberReadyToShutdownEventListener.java | 25 ++++
.../MemberReadyToShutdownMessageProcessor.java | 146 +++++++++++++++++++
.../topology/TopologyMessageProcessorChain.java | 6 +
4 files changed, 238 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java
new file mode 100644
index 0000000..7c93b3e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java
@@ -0,0 +1,61 @@
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+public class MemberReadyToShutdownEvent extends TopologyEvent implements Serializable {
+ private final String serviceName;
+ private final String clusterId;
+ private final String networkPartitionId;
+ private final String partitionId;
+ private final String memberId;
+ private MemberStatus status;
+ private Properties properties;
+
+ public MemberReadyToShutdownEvent(String serviceName, String clusterId,
+ String networkPartitionId, String partitionId, String memberId) {
+ this.serviceName = serviceName;
+ this.clusterId = clusterId;
+ this.networkPartitionId = networkPartitionId;
+ this.partitionId = partitionId;
+ this.memberId = memberId;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public String getMemberId() {
+ return memberId;
+ }
+
+ public MemberStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(MemberStatus status) {
+ this.status = status;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ public String getPartitionId() {
+ return partitionId;
+ }
+
+ public String getNetworkPartitionId() {
+ return networkPartitionId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java
new file mode 100644
index 0000000..6eeea37
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.topology;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class MemberReadyToShutdownEventListener extends EventListener {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
new file mode 100644
index 0000000..b2e43b8
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{
+ private static final Log log = LogFactory.getLog(MemberReadyToShutdownMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (MemberReadyToShutdownEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ MemberReadyToShutdownEvent event = (MemberReadyToShutdownEvent) Util.
+ jsonToObject(message, MemberReadyToShutdownEvent.class);
+
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
+
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
+ }
+ return false;
+ }
+ }
+
+ if (member.getStatus() == MemberStatus.ReadyToShutDown) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already updated as Ready to Shutdown: " +
+ "[service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
+
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.ReadyToShutDown);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index 1281761..83dde30 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -39,6 +39,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
private InstanceSpawnedMessageProcessor instanceSpawnedMessageProcessor;
private MemberStartedMessageProcessor memberStartedMessageProcessor;
private MemberActivatedMessageProcessor memberActivatedMessageProcessor;
+ private MemberReadyToShutdownMessageProcessor memberReadyToShutdownProcessor;
private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
@@ -68,6 +69,9 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
memberActivatedMessageProcessor = new MemberActivatedMessageProcessor();
add(memberActivatedMessageProcessor);
+ memberReadyToShutdownProcessor = new MemberReadyToShutdownMessageProcessor();
+ add(memberReadyToShutdownProcessor);
+
memberSuspendedMessageProcessor = new MemberSuspendedMessageProcessor();
add(memberSuspendedMessageProcessor);
@@ -92,6 +96,8 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
memberActivatedMessageProcessor.addEventListener(eventListener);
} else if (eventListener instanceof MemberStartedEventListener) {
memberStartedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof MemberReadyToShutdownEventListener) {
+ memberReadyToShutdownProcessor.addEventListener(eventListener);
} else if (eventListener instanceof MemberSuspendedEventListener) {
memberSuspendedMessageProcessor.addEventListener(eventListener);
} else if (eventListener instanceof MemberTerminatedEventListener) {