You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/05 11:19:29 UTC
[1/2] Moved complete topology message processor to topology message
processor chain and renamed event processors to message processors
Updated Branches:
refs/heads/master 04add8436 -> f45707a96
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
new file mode 100644
index 0000000..eeae3d3
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberStartedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(MemberStartedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (MemberStartedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
+
+ // Apply service filter
+ if (ServiceFilter.getInstance().isActive()) {
+ if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Apply cluster filter
+ if (ClusterFilter.getInstance().isActive()) {
+ if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
+ if (member.getStatus() == MemberStatus.Starting) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
+
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.Starting);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
deleted file mode 100644
index 7f4428d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class MemberSuspendedEventProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(MemberSuspendedEventProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (MemberSuspendedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
-
- // Apply service filter
- if(ServiceFilter.getInstance().isActive()) {
- if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return true;
- }
- }
-
- // Apply cluster filter
- if(ClusterFilter.getInstance().isActive()) {
- if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return true;
- }
- }
-
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- throw new RuntimeException(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
- }
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- if (member.getStatus() == MemberStatus.Suspended) {
- throw new RuntimeException(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
-
- // Apply changes to the topology
- member.setStatus(MemberStatus.Suspended);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
-
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
new file mode 100644
index 0000000..e72d62c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberSuspendedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(MemberSuspendedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (MemberSuspendedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
+
+ // Apply service filter
+ if (ServiceFilter.getInstance().isActive()) {
+ if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Apply cluster filter
+ if (ClusterFilter.getInstance().isActive()) {
+ if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
+ if (member.getStatus() == MemberStatus.Suspended) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
+
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.Suspended);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
deleted file mode 100644
index b5aac6d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class MemberTerminatedEventProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(MemberTerminatedEventProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (MemberTerminatedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
-
- // Apply service filter
- if(ServiceFilter.getInstance().isActive()) {
- if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return true;
- }
- }
-
- // Apply cluster filter
- if(ClusterFilter.getInstance().isActive()) {
- if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return true;
- }
- }
-
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- throw new RuntimeException(String.format("Service does not exist: [service] %s", event.getServiceName()));
- }
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- if (member.getStatus() == MemberStatus.Terminated) {
- throw new RuntimeException(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
-
- // Apply changes to the topology
- member.setStatus(MemberStatus.Terminated);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
-
- notifyEventListeners(event);
- return true;
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
new file mode 100644
index 0000000..f8d52d6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberTerminatedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(MemberTerminatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (MemberTerminatedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
+
+ // Apply service filter
+ if (ServiceFilter.getInstance().isActive()) {
+ if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Apply cluster filter
+ if (ClusterFilter.getInstance().isActive()) {
+ if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
+ if (member.getStatus() == MemberStatus.Terminated) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
+
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.Terminated);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+
+ notifyEventListeners(event);
+ return true;
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
deleted file mode 100644
index f2c8059..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class ServiceCreatedEventProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(ServiceCreatedEventProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology)object;
-
- if (ServiceCreatedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
-
- // Apply service filter
- if(ServiceFilter.getInstance().isActive()) {
- if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return true;
- }
- }
-
- // Validate event against the existing topology
- if (topology.serviceExists(event.getServiceName())) {
- throw new RuntimeException(String.format("Service already created: [service] %s", event.getServiceName()));
- }
-
- // Apply changes to the topology
- Service service = new Service(event.getServiceName());
- service.addPorts(event.getPorts());
- topology.addService(service);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Service created: [service] %s", event.getServiceName()));
- }
-
- notifyEventListeners(event);
- return true;
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
new file mode 100644
index 0000000..ff768dc
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class ServiceCreatedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(ServiceCreatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (ServiceCreatedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
+
+ // Apply service filter
+ if (ServiceFilter.getInstance().isActive()) {
+ if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event against the existing topology
+ if (topology.serviceExists(event.getServiceName())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service already created: [service] %s", event.getServiceName()));
+ }
+ }
+
+ // Apply changes to the topology
+ Service service = new Service(event.getServiceName());
+ service.addPorts(event.getPorts());
+ topology.addService(service);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Service created: [service] %s", event.getServiceName()));
+ }
+
+ notifyEventListeners(event);
+ return true;
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
deleted file mode 100644
index 3859f60..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class ServiceRemovedEventProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(ServiceRemovedEventProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology)object;
-
- if (ServiceRemovedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
-
- // Apply service filter
- if(ServiceFilter.getInstance().isActive()) {
- if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return true;
- }
- }
-
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- throw new RuntimeException(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
- }
-
- // Apply changes to the topology
- topology.removeService(service);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Service removed: [service] %s", event.getServiceName()));
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
new file mode 100644
index 0000000..5f31ec2
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class ServiceRemovedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(ServiceRemovedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (ServiceRemovedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
+
+ // Apply service filter
+ if (ServiceFilter.getInstance().isActive()) {
+ if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+
+ // Apply changes to the topology
+ topology.removeService(service);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Service removed: [service] %s", event.getServiceName()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
deleted file mode 100644
index 75e395d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyEventProcessorChain.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.listener.EventListener;
-import org.apache.stratos.messaging.listener.topology.*;
-import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
-
-/**
- * Defines default topology message processor chain.
- */
-public class TopologyEventProcessorChain extends MessageProcessorChain {
- private static final Log log = LogFactory.getLog(TopologyEventProcessorChain.class);
-
- private ServiceCreatedEventProcessor serviceCreatedEventProcessor;
- private ServiceRemovedEventProcessor serviceRemovedEventProcessor;
- private ClusterCreatedEventProcessor clusterCreatedEventProcessor;
- private ClusterRemovedEventProcessor clusterRemovedEventProcessor;
- private InstanceSpawnedEventProcessor instanceSpawnedEventProcessor;
- private MemberStartedEventProcessor memberStartedEventProcessor;
- private MemberActivatedEventProcessor memberActivatedEventProcessor;
- private MemberSuspendedEventProcessor memberSuspendedEventProcessor;
- private MemberTerminatedEventProcessor memberTerminatedEventProcessor;
-
- public void initialize() {
- // Add topology event processors
- serviceCreatedEventProcessor = new ServiceCreatedEventProcessor();
- add(serviceCreatedEventProcessor);
-
- serviceRemovedEventProcessor = new ServiceRemovedEventProcessor();
- add(serviceRemovedEventProcessor);
-
- clusterCreatedEventProcessor = new ClusterCreatedEventProcessor();
- add(clusterCreatedEventProcessor);
-
- clusterRemovedEventProcessor = new ClusterRemovedEventProcessor();
- add(clusterRemovedEventProcessor);
-
- instanceSpawnedEventProcessor = new InstanceSpawnedEventProcessor();
- add(instanceSpawnedEventProcessor);
-
- memberStartedEventProcessor = new MemberStartedEventProcessor();
- add(memberStartedEventProcessor);
-
- memberActivatedEventProcessor = new MemberActivatedEventProcessor();
- add(memberActivatedEventProcessor);
-
- memberSuspendedEventProcessor = new MemberSuspendedEventProcessor();
- add(memberSuspendedEventProcessor);
-
- memberTerminatedEventProcessor = new MemberTerminatedEventProcessor();
- add(memberTerminatedEventProcessor);
-
- add(new CompleteTopologyEventIgnoreProcessor());
-
- if(log.isDebugEnabled()) {
- log.debug("Topology message processor chain initialized");
- }
- }
-
- public void addEventListener(EventListener eventListener) {
- if(eventListener instanceof ClusterCreatedEventListener) {
- clusterCreatedEventProcessor.addEventListener(eventListener);
- }
- else if(eventListener instanceof ClusterRemovedEventListener) {
- clusterRemovedEventProcessor.addEventListener(eventListener);
- }
- else if(eventListener instanceof InstanceSpawnedEventListener) {
- instanceSpawnedEventProcessor.addEventListener(eventListener);
- }
- else if(eventListener instanceof MemberActivatedEventListener) {
- memberActivatedEventProcessor.addEventListener(eventListener);
- }
- else if(eventListener instanceof MemberStartedEventListener) {
- memberStartedEventProcessor.addEventListener(eventListener);
- }
- else if(eventListener instanceof MemberSuspendedEventListener) {
- memberSuspendedEventProcessor.addEventListener(eventListener);
- }
- else if(eventListener instanceof MemberTerminatedEventListener) {
- memberTerminatedEventProcessor.addEventListener(eventListener);
- }
- else if(eventListener instanceof ServiceCreatedEventListener) {
- serviceCreatedEventProcessor.addEventListener(eventListener);
- }
- else if(eventListener instanceof ServiceRemovedEventListener) {
- serviceRemovedEventProcessor.addEventListener(eventListener);
- }
- else {
- throw new RuntimeException("Unknown event listener");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
new file mode 100644
index 0000000..1281761
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.topology.*;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Defines default topology message processor chain.
+ */
+public class TopologyMessageProcessorChain extends MessageProcessorChain {
+ private static final Log log = LogFactory.getLog(TopologyMessageProcessorChain.class);
+
+ private CompleteTopologyMessageProcessor completeTopologyMessageProcessor;
+ private ServiceCreatedMessageProcessor serviceCreatedMessageProcessor;
+ private ServiceRemovedMessageProcessor serviceRemovedMessageProcessor;
+ private ClusterCreatedMessageProcessor clusterCreatedMessageProcessor;
+ private ClusterRemovedMessageProcessor clusterRemovedMessageProcessor;
+ private InstanceSpawnedMessageProcessor instanceSpawnedMessageProcessor;
+ private MemberStartedMessageProcessor memberStartedMessageProcessor;
+ private MemberActivatedMessageProcessor memberActivatedMessageProcessor;
+ private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
+ private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
+
+ public void initialize() {
+ // Add topology event processors
+ completeTopologyMessageProcessor = new CompleteTopologyMessageProcessor();
+ add(completeTopologyMessageProcessor);
+
+ serviceCreatedMessageProcessor = new ServiceCreatedMessageProcessor();
+ add(serviceCreatedMessageProcessor);
+
+ serviceRemovedMessageProcessor = new ServiceRemovedMessageProcessor();
+ add(serviceRemovedMessageProcessor);
+
+ clusterCreatedMessageProcessor = new ClusterCreatedMessageProcessor();
+ add(clusterCreatedMessageProcessor);
+
+ clusterRemovedMessageProcessor = new ClusterRemovedMessageProcessor();
+ add(clusterRemovedMessageProcessor);
+
+ instanceSpawnedMessageProcessor = new InstanceSpawnedMessageProcessor();
+ add(instanceSpawnedMessageProcessor);
+
+ memberStartedMessageProcessor = new MemberStartedMessageProcessor();
+ add(memberStartedMessageProcessor);
+
+ memberActivatedMessageProcessor = new MemberActivatedMessageProcessor();
+ add(memberActivatedMessageProcessor);
+
+ memberSuspendedMessageProcessor = new MemberSuspendedMessageProcessor();
+ add(memberSuspendedMessageProcessor);
+
+ memberTerminatedMessageProcessor = new MemberTerminatedMessageProcessor();
+ add(memberTerminatedMessageProcessor);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Topology message processor chain initialized");
+ }
+ }
+
+ public void addEventListener(EventListener eventListener) {
+ if (eventListener instanceof CompleteTopologyEventListener) {
+ completeTopologyMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ClusterCreatedEventListener) {
+ clusterCreatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ClusterRemovedEventListener) {
+ clusterRemovedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof InstanceSpawnedEventListener) {
+ instanceSpawnedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof MemberActivatedEventListener) {
+ memberActivatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof MemberStartedEventListener) {
+ memberStartedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof MemberSuspendedEventListener) {
+ memberSuspendedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof MemberTerminatedEventListener) {
+ memberTerminatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ServiceCreatedEventListener) {
+ serviceCreatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof ServiceRemovedEventListener) {
+ serviceRemovedMessageProcessor.addEventListener(eventListener);
+ } else {
+ throw new RuntimeException("Unknown event listener");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 5afb54b..6f672c4 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -22,7 +22,6 @@ import javax.jms.TextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
import org.apache.stratos.messaging.message.processor.topology.*;
import org.apache.stratos.messaging.util.Constants;
@@ -31,59 +30,26 @@ import org.apache.stratos.messaging.util.Constants;
/**
* Implements logic for processing topology event messages based on a given
* topology process chain.
- *
- * Functionality:
- * - Wait for the complete topology event.
- * - Process messages using the given message processor chain.
*/
public class TopologyEventMessageDelegator implements Runnable {
private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class);
- private CompleteTopologyEventProcessor completeTopEvMsgProcessor;
private MessageProcessorChain processorChain;
private boolean terminated;
public TopologyEventMessageDelegator() {
- this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor();
- this.processorChain = new TopologyEventProcessorChain();
+ this.processorChain = new TopologyMessageProcessorChain();
}
public TopologyEventMessageDelegator(MessageProcessorChain processorChain) {
- this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor();
this.processorChain = processorChain;
}
- public void addCompleteTopologyEventListener(CompleteTopologyEventListener eventListener) {
- completeTopEvMsgProcessor.addEventListener(eventListener);
- }
-
- public void removeCompleteTopologyEventListener(CompleteTopologyEventListener eventListener) {
- completeTopEvMsgProcessor.removeEventListener(eventListener);
- }
-
@Override
public void run() {
try {
- if(log.isInfoEnabled()) {
+ if (log.isInfoEnabled()) {
log.info("Topology event message delegator started");
- log.info("Waiting for the complete topology event message...");
- }
- while (!terminated) {
- try {
- // First take the complete topology event
- TextMessage message = TopologyEventQueue.getInstance().take();
- // Retrieve the header
- String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
- // Retrieve the actual message
- String json = message.getText();
-
- if (completeTopEvMsgProcessor.process(type, json, TopologyManager.getTopology())) {
- break;
- }
-
- } catch (Exception e) {
- throw new RuntimeException("Failed to retrieve the complete topology", e);
- }
}
while (!terminated) {
@@ -92,6 +58,7 @@ public class TopologyEventMessageDelegator implements Runnable {
// Retrieve the header
String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
// Retrieve the actual message
String json = message.getText();
@@ -101,20 +68,20 @@ public class TopologyEventMessageDelegator implements Runnable {
try {
TopologyManager.acquireWriteLock();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Delegating : %s", type));
+ }
processorChain.process(type, json, TopologyManager.getTopology());
} finally {
TopologyManager.releaseWriteLock();
}
} catch (Exception e) {
- String error = "Failed to retrieve topology event message";
- log.error(error, e);
- throw new RuntimeException(error, e);
+ log.error("Failed to retrieve topology event message", e);
}
}
- }
- catch (Exception e) {
- if(log.isErrorEnabled()) {
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
log.error("Topology event message delegator failed", e);
}
}
[2/2] git commit: Moved complete topology message processor to
topology message processor chain and renamed event processors to message
processors
Posted by im...@apache.org.
Moved complete topology message processor to topology message processor chain and renamed event processors to message processors
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/f45707a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/f45707a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/f45707a9
Branch: refs/heads/master
Commit: f45707a965b47f77d0c59c6c8b32d3a0c9447d67
Parents: 04add84
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Dec 5 15:49:00 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 5 15:49:00 2013 +0530
----------------------------------------------------------------------
.../processors/AutoscalerTopologyReceiver.java | 17 +--
.../extension/api/LoadBalancerExtension.java | 17 +--
.../balancer/LoadBalancerTopologyReceiver.java | 15 +-
.../messaging/domain/topology/Topology.java | 9 ++
.../messaging/event/EventObservable.java | 10 ++
.../topology/ClusterCreatedEventProcessor.java | 113 --------------
.../ClusterCreatedMessageProcessor.java | 122 +++++++++++++++
.../topology/ClusterRemovedEventProcessor.java | 110 --------------
.../ClusterRemovedMessageProcessor.java | 119 +++++++++++++++
.../CompleteTopologyEventIgnoreProcessor.java | 58 -------
.../CompleteTopologyEventProcessor.java | 97 ------------
.../CompleteTopologyMessageProcessor.java | 104 +++++++++++++
.../topology/InstanceSpawnedEventProcessor.java | 113 --------------
.../InstanceSpawnedMessageProcessor.java | 126 +++++++++++++++
.../topology/MemberActivatedEventProcessor.java | 134 ----------------
.../MemberActivatedMessageProcessor.java | 152 +++++++++++++++++++
.../topology/MemberStartedEventProcessor.java | 121 ---------------
.../topology/MemberStartedMessageProcessor.java | 137 +++++++++++++++++
.../topology/MemberSuspendedEventProcessor.java | 120 ---------------
.../MemberSuspendedMessageProcessor.java | 136 +++++++++++++++++
.../MemberTerminatedEventProcessor.java | 120 ---------------
.../MemberTerminatedMessageProcessor.java | 136 +++++++++++++++++
.../topology/ServiceCreatedEventProcessor.java | 85 -----------
.../ServiceCreatedMessageProcessor.java | 91 +++++++++++
.../topology/ServiceRemovedEventProcessor.java | 86 -----------
.../ServiceRemovedMessageProcessor.java | 93 ++++++++++++
.../topology/TopologyEventProcessorChain.java | 112 --------------
.../topology/TopologyMessageProcessorChain.java | 107 +++++++++++++
.../topology/TopologyEventMessageDelegator.java | 51 ++-----
29 files changed, 1369 insertions(+), 1342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
index 9a9588c..ab77a79 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
@@ -21,7 +21,6 @@ package org.apache.stratos.autoscaler.topology.processors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
import org.apache.stratos.autoscaler.ClusterContext;
import org.apache.stratos.autoscaler.ClusterMonitor;
import org.apache.stratos.autoscaler.PartitionContext;
@@ -40,10 +39,9 @@ import org.apache.stratos.messaging.listener.topology.MemberStartedEventListener
import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
+import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
import java.util.Collection;
@@ -82,9 +80,8 @@ public class AutoscalerTopologyReceiver implements Runnable {
}
private TopologyEventMessageDelegator createMessageDelegator() {
- TopologyEventProcessorChain processorChain = createEventProcessorChain();
- final TopologyEventMessageDelegator messageDelegator = new TopologyEventMessageDelegator(processorChain);
- messageDelegator.addCompleteTopologyEventListener(new CompleteTopologyEventListener() {
+ TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+ processorChain.addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -99,17 +96,15 @@ public class AutoscalerTopologyReceiver implements Runnable {
finally {
TopologyManager.releaseReadLock();
}
- // Complete topology is only consumed once, remove listener
- messageDelegator.removeCompleteTopologyEventListener(this);
}
});
- return messageDelegator;
+ return new TopologyEventMessageDelegator(processorChain);
}
- private TopologyEventProcessorChain createEventProcessorChain() {
+ private TopologyMessageProcessorChain createEventProcessorChain() {
// Listen to topology events that affect clusters
- TopologyEventProcessorChain processorChain = new TopologyEventProcessorChain();
+ TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
processorChain.addEventListener(new ClusterCreatedEventListener() {
@Override
protected void onEvent(Event event) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 61781c9..95dc039 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.load.balancer.common.topology.TopologyReceiver;
import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.listener.topology.*;
-import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
+import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
@@ -74,9 +74,8 @@ public class LoadBalancerExtension implements Runnable {
}
private TopologyEventMessageDelegator createMessageDelegator() {
- TopologyEventProcessorChain processorChain = createEventProcessorChain();
- final TopologyEventMessageDelegator messageDelegator = new TopologyEventMessageDelegator(processorChain);
- messageDelegator.addCompleteTopologyEventListener(new CompleteTopologyEventListener() {
+ TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+ processorChain.addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
@@ -87,10 +86,6 @@ public class LoadBalancerExtension implements Runnable {
// Start load balancer
loadBalancer.start();
loadBalancerStarted = true;
-
- // Complete topology event is only received once
- // Remove event listener
- messageDelegator.removeCompleteTopologyEventListener(this);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Could not start load balancer", e);
@@ -99,11 +94,11 @@ public class LoadBalancerExtension implements Runnable {
}
}
});
- return messageDelegator;
+ return new TopologyEventMessageDelegator(processorChain);
}
- private TopologyEventProcessorChain createEventProcessorChain() {
- TopologyEventProcessorChain processorChain = new TopologyEventProcessorChain();
+ private TopologyMessageProcessorChain createEventProcessorChain() {
+ TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
processorChain.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
index fba24e7..2ecc5a2 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
@@ -31,7 +31,7 @@ import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListe
import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
+import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
@@ -67,9 +67,8 @@ public class LoadBalancerTopologyReceiver implements Runnable {
}
private TopologyEventMessageDelegator createMessageDelegator() {
- TopologyEventProcessorChain processorChain = createEventProcessorChain();
- final TopologyEventMessageDelegator messageDelegator = new TopologyEventMessageDelegator(processorChain);
- messageDelegator.addCompleteTopologyEventListener(new CompleteTopologyEventListener() {
+ TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+ processorChain.addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
try {
@@ -85,8 +84,6 @@ public class LoadBalancerTopologyReceiver implements Runnable {
finally {
TopologyManager.releaseReadLock();
}
- // Complete topology is only consumed once, remove listener
- messageDelegator.removeCompleteTopologyEventListener(this);
}
private boolean hasActiveMembers(Cluster cluster) {
@@ -98,12 +95,12 @@ public class LoadBalancerTopologyReceiver implements Runnable {
return false;
}
});
- return messageDelegator;
+ return new TopologyEventMessageDelegator(processorChain);
}
- private TopologyEventProcessorChain createEventProcessorChain() {
+ private TopologyMessageProcessorChain createEventProcessorChain() {
// Listen to topology events that affect clusters
- TopologyEventProcessorChain processorChain = new TopologyEventProcessorChain();
+ TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
processorChain.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
index 57e99e8..2c503ee 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
@@ -31,6 +31,7 @@ public class Topology implements Serializable {
private static final long serialVersionUID = -2453583548027402122L;
// Key: Service.serviceName
private Map<String, Service> serviceMap;
+ private boolean initialized;
public Topology() {
this.serviceMap = new HashMap<String, Service>();
@@ -69,4 +70,12 @@ public class Topology implements Serializable {
public void clear() {
this.serviceMap.clear();
}
+
+ public void setInitialized(boolean initialized) {
+ this.initialized = initialized;
+ }
+
+ public boolean isInitialized() {
+ return initialized;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
index 528d426..0a5d18f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/EventObservable.java
@@ -19,6 +19,8 @@
package org.apache.stratos.messaging.event;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.listener.EventListener;
import java.util.Observable;
@@ -28,15 +30,23 @@ import java.util.Observable;
*/
public abstract class EventObservable extends Observable {
+ private static final Log log = LogFactory.getLog(EventObservable.class);
+
public void addEventListener(EventListener eventListener) {
addObserver(eventListener);
}
public void removeEventListener(EventListener eventListener) {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Removing event listeners: [event-listener] %s", eventListener.getClass().getName()));
+ }
deleteObserver(eventListener);
}
public void notifyEventListeners(Event event) {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Notifying event listeners: [event] %s", event.getClass().getName()));
+ }
setChanged();
notifyObservers(event);
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
deleted file mode 100644
index a4d09ba..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class ClusterCreatedEventProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(ClusterCreatedEventProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology)object;
-
- if (ClusterCreatedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
-
- // Apply service filter
- if(ServiceFilter.getInstance().isActive()) {
- if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return true;
- }
- }
-
- // Apply cluster filter
- if(ClusterFilter.getInstance().isActive()) {
- if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return true;
- }
- }
-
- // Validate event properties
- if(StringUtils.isBlank(event.getHostName())) {
- throw new RuntimeException("Hostname not found in cluster created event");
- }
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- throw new RuntimeException(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
- }
- if (service.clusterExists(event.getClusterId())) {
- throw new RuntimeException(String.format("Cluster already exists in service: [service] %s [cluster] %s",
- event.getServiceName(),
- event.getClusterId()));
- }
-
- // Apply changes to the topology
- Cluster cluster = new Cluster(event.getServiceName(), event.getClusterId(), event.getAutoscalingPolicyName());
- cluster.addHostName(event.getHostName());
- cluster.setTenantRange(event.getTenantRange());
-
- service.addCluster(cluster);
- if (log.isInfoEnabled()) {
- log.info(String.format("Cluster created: [service] %s [cluster] %s [hostname] %s",
- event.getServiceName(), event.getClusterId(), event.getHostName()));
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
new file mode 100644
index 0000000..8a8d394
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class ClusterCreatedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(ClusterCreatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (ClusterCreatedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
+
+ // Apply service filter
+ if (ServiceFilter.getInstance().isActive()) {
+ if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Apply cluster filter
+ if (ClusterFilter.getInstance().isActive()) {
+ if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event properties
+ if (StringUtils.isBlank(event.getHostName())) {
+ throw new RuntimeException("Hostname not found in cluster created event");
+ }
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ if (service.clusterExists(event.getClusterId())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
+ event.getClusterId()));
+ }
+ return false;
+ }
+
+ // Apply changes to the topology
+ Cluster cluster = new Cluster(event.getServiceName(), event.getClusterId(), event.getAutoscalingPolicyName());
+ cluster.addHostName(event.getHostName());
+ cluster.setTenantRange(event.getTenantRange());
+
+ service.addCluster(cluster);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Cluster created: [service] %s [cluster] %s [hostname] %s",
+ event.getServiceName(), event.getClusterId(), event.getHostName()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
deleted file mode 100644
index 295923f..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class ClusterRemovedEventProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(ClusterRemovedEventProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology)object;
-
- if (ClusterRemovedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
-
- // Apply service filter
- if(ServiceFilter.getInstance().isActive()) {
- if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return true;
- }
- }
-
- // Apply cluster filter
- if(ClusterFilter.getInstance().isActive()) {
- if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return true;
- }
- }
-
- // Validate event properties
- if(StringUtils.isBlank(event.getHostName())) {
- throw new RuntimeException("Hostname not found in cluster removed event");
- }
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- throw new RuntimeException(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
- }
- if (!service.clusterExists(event.getClusterId())) {
- throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s [hostname] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getHostName()));
- }
-
- // Apply changes to the topology
- service.removeCluster(event.getClusterId());
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
-
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
new file mode 100644
index 0000000..b93273f
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class ClusterRemovedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(ClusterRemovedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (ClusterRemovedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
+
+ // Apply service filter
+ if (ServiceFilter.getInstance().isActive()) {
+ if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Apply cluster filter
+ if (ClusterFilter.getInstance().isActive()) {
+ if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event properties
+ if (StringUtils.isBlank(event.getHostName())) {
+ throw new RuntimeException("Hostname not found in cluster removed event");
+ }
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ if (!service.clusterExists(event.getClusterId())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s [hostname] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getHostName()));
+ }
+ return false;
+ }
+
+ // Apply changes to the topology
+ service.removeCluster(event.getClusterId());
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java
deleted file mode 100644
index 5e6d47c..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventIgnoreProcessor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * An event processor to ignore complete topology event in a given message processor chain.
- */
-public class CompleteTopologyEventIgnoreProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(CompleteTopologyEventIgnoreProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology)object;
-
- if (CompleteTopologyEvent.class.getName().equals(type)) {
- if(log.isDebugEnabled()) {
- log.debug("Complete topology event ignored");
- }
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- }
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
deleted file mode 100644
index c61a2a5..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class CompleteTopologyEventProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(CompleteTopologyEventProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (CompleteTopologyEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
-
- // Apply service filter
- if (ServiceFilter.getInstance().isActive()) {
- // Add services included in service filter
- for (Service service : event.getTopology().getServices()) {
- if (ServiceFilter.getInstance().included(service.getServiceName())) {
- topology.addService(service);
- }
- else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", service.getServiceName()));
- }
- }
- }
- } else {
- // Add all services
- topology.addServices(event.getTopology().getServices());
- }
-
- // Apply cluster filter
- if (ClusterFilter.getInstance().isActive()) {
- for (Service service : topology.getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (ClusterFilter.getInstance().excluded(cluster.getClusterId())) {
- service.removeCluster(cluster.getClusterId());
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", cluster.getClusterId()));
- }
- }
- }
- }
- }
-
- if (log.isInfoEnabled()) {
- log.info("Topology initialized");
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- }
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
new file mode 100644
index 0000000..5cea64e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class CompleteTopologyMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(CompleteTopologyMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (CompleteTopologyEvent.class.getName().equals(type)) {
+ // Return if topology has already initialized
+ if (topology.isInitialized()) {
+ return false;
+ }
+
+ // Parse complete message and build event
+ CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
+
+ // Apply service filter
+ if (ServiceFilter.getInstance().isActive()) {
+ // Add services included in service filter
+ for (Service service : event.getTopology().getServices()) {
+ if (ServiceFilter.getInstance().included(service.getServiceName())) {
+ topology.addService(service);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", service.getServiceName()));
+ }
+ }
+ }
+ } else {
+ // Add all services
+ topology.addServices(event.getTopology().getServices());
+ }
+
+ // Apply cluster filter
+ if (ClusterFilter.getInstance().isActive()) {
+ for (Service service : topology.getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (ClusterFilter.getInstance().excluded(cluster.getClusterId())) {
+ service.removeCluster(cluster.getClusterId());
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", cluster.getClusterId()));
+ }
+ }
+ }
+ }
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Topology initialized");
+ }
+
+ // Set topology initialized
+ topology.setInitialized(true);
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ }
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
deleted file mode 100644
index 0d2f0e2..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class InstanceSpawnedEventProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(InstanceSpawnedEventProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology)object;
-
- if (InstanceSpawnedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
-
- // Apply service filter
- if(ServiceFilter.getInstance().isActive()) {
- if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return true;
- }
- }
-
- // Apply cluster filter
- if(ClusterFilter.getInstance().isActive()) {
- if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return true;
- }
- }
-
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- throw new RuntimeException(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
- }
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
- if (cluster.memberExists(event.getMemberId())) {
- throw new RuntimeException(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
-
- // Apply changes to the topology
- Member member = new Member(event.getServiceName(), event.getClusterId(), event.getMemberId());
- member.setStatus(MemberStatus.Created);
- member.setPartitionId(event.getPartitionId());
- cluster.addMember(member);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
new file mode 100644
index 0000000..f614782
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class InstanceSpawnedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(InstanceSpawnedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (InstanceSpawnedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
+
+ // Apply service filter
+ if (ServiceFilter.getInstance().isActive()) {
+ if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Apply cluster filter
+ if (ClusterFilter.getInstance().isActive()) {
+ if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ if (cluster.memberExists(event.getMemberId())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
+
+ // Apply changes to the topology
+ Member member = new Member(event.getServiceName(), event.getClusterId(), event.getMemberId());
+ member.setStatus(MemberStatus.Created);
+ member.setPartitionId(event.getPartitionId());
+ cluster.addMember(member);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
deleted file mode 100644
index def9545..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class MemberActivatedEventProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(MemberActivatedEventProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology)object;
-
- if (MemberActivatedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
-
- // Apply service filter
- if(ServiceFilter.getInstance().isActive()) {
- if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return true;
- }
- }
-
- // Apply cluster filter
- if(ClusterFilter.getInstance().isActive()) {
- if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return true;
- }
- }
-
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- throw new RuntimeException(String.format("Service does not exist: [service] %s", event.getServiceName()));
- }
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- if (member.getStatus() == MemberStatus.Activated) {
- throw new RuntimeException(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- if((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
- throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
- throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
-
- // Apply changes to the topology
- member.addPorts(event.getPorts());
- member.setMemberIp(event.getMemberIp());
- member.setStatus(MemberStatus.Activated);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
new file mode 100644
index 0000000..e299b7d
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberActivatedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(MemberActivatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (MemberActivatedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
+
+ // Apply service filter
+ if (ServiceFilter.getInstance().isActive()) {
+ if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Apply cluster filter
+ if (ClusterFilter.getInstance().isActive()) {
+ if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event properties
+ if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
+ throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
+ throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
+ if (member.getStatus() == MemberStatus.Activated) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
+
+ // Apply changes to the topology
+ member.addPorts(event.getPorts());
+ member.setMemberIp(event.getMemberIp());
+ member.setStatus(MemberStatus.Activated);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f45707a9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
deleted file mode 100644
index 849efcc..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
-import org.apache.stratos.messaging.util.Util;
-
-public class MemberStartedEventProcessor extends MessageProcessor {
-
- private static final Log log = LogFactory.getLog(MemberStartedEventProcessor.class);
- private MessageProcessor nextProcessor;
-
- @Override
- public void setNext(MessageProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
-
- if (MemberStartedEvent.class.getName().equals(type)) {
- // Parse complete message and build event
- MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
-
- // Apply service filter
- if(ServiceFilter.getInstance().isActive()) {
- if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return true;
- }
- }
-
- // Apply cluster filter
- if(ClusterFilter.getInstance().isActive()) {
- if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return true;
- }
- }
-
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- throw new RuntimeException(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
- }
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- throw new RuntimeException(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- throw new RuntimeException(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- if (member.getStatus() == MemberStatus.Starting) {
- throw new RuntimeException(String.format("Member already started: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
-
- // Apply changes to the topology
- member.setStatus(MemberStatus.Starting);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
-
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
- }
- }
- }
-}