You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2014/10/07 15:40:32 UTC
[3/4] initial changes for hierarchical topology locking
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
index c007343..94b9650 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ClusterCreatedMessageProcessor extends MessageProcessor {
@@ -41,83 +42,97 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Topology topology = (Topology) object;
if (ClusterCreatedEvent.class.getName().equals(type)) {
// Return if topology has not been initialized
- if (!topology.isInitialized())
+ if (!topology.isInitialized()) {
return false;
+ }
// Parse complete message and build event
ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
- }
+ TopologyManager.acquireReadLockForServices();
+ TopologyManager.acquireWriteLockForService(event.getServiceName());
+ try {
+ return doProcess(event, topology);
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } finally {
+ TopologyManager.releaseWriteLockForService(event.getServiceName());
+ TopologyManager.releaseReadLockForServices();
}
- // Validate event properties
- Cluster cluster = event.getCluster();
- if(cluster == null) {
- String msg = "Cluster object of cluster created event is null.";
- log.error(msg);
- throw new RuntimeException(msg);
- }
- if (cluster.getHostNames().isEmpty()) {
- throw new RuntimeException("Host name/s not found in cluster created event");
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ }
+ }
+
+ private boolean doProcess (ClusterCreatedEvent event,Topology topology) {
+
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- if (service.clusterExists(event.getClusterId())) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
- event.getClusterId()));
- }
- } else {
-
- // Apply changes to the topology
- service.addCluster(cluster);
- if (log.isInfoEnabled()) {
- log.info(String.format("Cluster created: %s",
- cluster.toString()));
- }
- }
+ }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+ }
+ return false;
+ }
+ }
+ // Validate event properties
+ Cluster cluster = event.getCluster();
+ if(cluster == null) {
+ String msg = "Cluster object of cluster created event is null.";
+ log.error(msg);
+ throw new RuntimeException(msg);
+ }
+ if (cluster.getHostNames().isEmpty()) {
+ throw new RuntimeException("Host name/s not found in cluster created event");
+ }
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ if (service.clusterExists(event.getClusterId())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
+ event.getClusterId()));
+ }
} else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+
+ // Apply changes to the topology
+ service.addCluster(cluster);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Cluster created: %s",
+ cluster.toString()));
}
}
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
index f125c54..8629363 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor {
@@ -49,64 +50,77 @@ public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor {
ClusterMaintenanceModeEvent event = (ClusterMaintenanceModeEvent) Util.
jsonToObject(message, ClusterMaintenanceModeEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireReadLockForServices();
+ TopologyManager.acquireWriteLockForService(event.getServiceName());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForService(event.getServiceName());
+ TopologyManager.releaseReadLockForServices();
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (ClusterMaintenanceModeEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
+ }
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
- event.getClusterId()));
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
- } else {
- // Apply changes to the topology
- cluster.setStatus(Status.In_Maintenance);
- if (log.isInfoEnabled()) {
- log.info(String.format("Cluster updated as maintenance mode: %s",
- cluster.toString()));
- }
- }
+ return false;
+ }
+ }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
+ event.getClusterId()));
+ }
} else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ // Apply changes to the topology
+ cluster.setStatus(Status.In_Maintenance);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Cluster updated as maintenance mode: %s",
+ cluster.toString()));
}
}
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
index 69ef5b0..1dfb929 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ClusterRemovedMessageProcessor extends MessageProcessor {
@@ -50,65 +51,79 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireReadLockForServices();
+ TopologyManager.acquireWriteLockForService(event.getServiceName());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForService(event.getServiceName());
+ TopologyManager.releaseReadLockForServices();
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (ClusterRemovedEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
+ }
- // Notify event listeners before removing the cluster object
- notifyEventListeners(event);
-
- if (!service.clusterExists(event.getClusterId())) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(),
- event.getClusterId()));
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
- } else {
-
- // Apply changes to the topology
- service.removeCluster(event.getClusterId());
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
+ return false;
+ }
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
}
+ return false;
+ }
+
+ // Notify event listeners before removing the cluster object
+ notifyEventListeners(event);
- return true;
+ if (!service.clusterExists(event.getClusterId())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(),
+ event.getClusterId()));
+ }
} else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+
+ // Apply changes to the topology
+ service.removeCluster(event.getClusterId());
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
}
}
+
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
index 135bdae..6d5cb8f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
import java.util.ArrayList;
@@ -49,102 +50,20 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
if (CompleteTopologyEvent.class.getName().equals(type)) {
// Parse complete message and build event
CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
-
- // if topology has not already initialized
- if (!topology.isInitialized()) {
-
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- // Add services included in service filter
- for (Service service : event.getTopology().getServices()) {
- if (TopologyServiceFilter.getInstance()
- .serviceNameIncluded(service.getServiceName())) {
- topology.addService(service);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Service is excluded: [service] %s",
- service.getServiceName()));
- }
- }
- }
- } else {
- // Add all services
- topology.addServices(event.getTopology().getServices());
- }
-
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- for (Service service : topology.getServices()) {
- List<Cluster> clustersToRemove = new ArrayList<Cluster>();
- for (Cluster cluster : service.getClusters()) {
- if (TopologyClusterFilter.getInstance()
- .clusterIdExcluded(cluster.getClusterId())) {
- clustersToRemove.add(cluster);
- }
- }
- for (Cluster cluster : clustersToRemove) {
- service.removeCluster(cluster);
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Cluster is excluded: [cluster] %s",
- cluster.getClusterId()));
- }
- }
- }
- }
-
- // Apply member filter
- if (TopologyMemberFilter.getInstance().isActive()) {
- for (Service service : topology.getServices()) {
- for (Cluster cluster : service.getClusters()) {
- List<Member> membersToRemove = new ArrayList<Member>();
- for (Member member : cluster.getMembers()) {
- if (TopologyMemberFilter.getInstance()
- .lbClusterIdExcluded(
- member.getLbClusterId())) {
- membersToRemove.add(member);
- }
- }
- for (Member member : membersToRemove) {
- cluster.removeMember(member);
- if (log.isDebugEnabled()) {
- log.debug(String
- .format("Member is excluded: [member] %s [lb-cluster-id] %s",
- member.getMemberId(),
- member.getLbClusterId()));
- }
- }
- }
- }
- }
-
- // add existing Applications to Topology
- Collection<Application> applications = event.getTopology().getApplications();
- if (applications != null && !applications.isEmpty()) {
- for (Application application : applications) {
- topology.addApplication(application);
- if (log.isDebugEnabled()) {
- log.debug("Application with id [ " + application.getId() + " ] added to Topology");
- }
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("No Application information found in Complete Topology event");
- }
- }
- if (log.isInfoEnabled()) {
- log.info("Topology initialized");
- }
+ if (!topology.isInitialized()) {
+ TopologyManager.acquireWriteLock();
+
+ try {
+ return doProcess(event, topology);
- // Set topology initialized
- topology.setInitialized(true);
- }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ } else {
+ return true;
+ }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
@@ -153,4 +72,99 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
return false;
}
}
+
+ private boolean doProcess (CompleteTopologyEvent event, Topology topology) {
+
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ // Add services included in service filter
+ for (Service service : event.getTopology().getServices()) {
+ if (TopologyServiceFilter.getInstance()
+ .serviceNameIncluded(service.getServiceName())) {
+ topology.addService(service);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Service is excluded: [service] %s",
+ service.getServiceName()));
+ }
+ }
+ }
+ } else {
+ // Add all services
+ topology.addServices(event.getTopology().getServices());
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ for (Service service : topology.getServices()) {
+ List<Cluster> clustersToRemove = new ArrayList<Cluster>();
+ for (Cluster cluster : service.getClusters()) {
+ if (TopologyClusterFilter.getInstance()
+ .clusterIdExcluded(cluster.getClusterId())) {
+ clustersToRemove.add(cluster);
+ }
+ }
+ for (Cluster cluster : clustersToRemove) {
+ service.removeCluster(cluster);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Cluster is excluded: [cluster] %s",
+ cluster.getClusterId()));
+ }
+ }
+ }
+ }
+
+ // Apply member filter
+ if (TopologyMemberFilter.getInstance().isActive()) {
+ for (Service service : topology.getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ List<Member> membersToRemove = new ArrayList<Member>();
+ for (Member member : cluster.getMembers()) {
+ if (TopologyMemberFilter.getInstance()
+ .lbClusterIdExcluded(
+ member.getLbClusterId())) {
+ membersToRemove.add(member);
+ }
+ }
+ for (Member member : membersToRemove) {
+ cluster.removeMember(member);
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Member is excluded: [member] %s [lb-cluster-id] %s",
+ member.getMemberId(),
+ member.getLbClusterId()));
+ }
+ }
+ }
+ }
+ }
+
+ // add existing Applications to Topology
+ Collection<Application> applications = event.getTopology().getApplications();
+ if (applications != null && !applications.isEmpty()) {
+ for (Application application : applications) {
+ topology.addApplication(application);
+ if (log.isDebugEnabled()) {
+ log.debug("Application with id [ " + application.getId() + " ] added to Topology");
+ }
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("No Application information found in Complete Topology event");
+ }
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Topology initialized");
+ }
+
+ // Set topology initialized
+ topology.setInitialized(true);
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
index 627d9a9..7200431 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
@@ -21,11 +21,9 @@ package org.apache.stratos.messaging.message.processor.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent;
import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
-import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
/**
@@ -53,34 +51,16 @@ public class GroupActivatedProcessor extends MessageProcessor {
GroupActivatedEvent event = (GroupActivatedEvent) Util.
jsonToObject(message, GroupActivatedEvent.class);
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroup(event.getGroupId());
+ TopologyManager.acquireReadLockForApplications();
+ TopologyManager.acquireWriteLockForApplication(event.getAppId());
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- }
- } else {
- // Apply changes to the topology
- group.setStatus(Status.Activated);
- if (log.isInfoEnabled()) {
- log.info(String.format("Group updated as activated : %s",
- group.toString()));
- }
- }
+ try {
+ return doProcess(event, topology);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+ } finally {
+ TopologyManager.releaseWriteLockForApplication(event.getAppId());
+ TopologyManager.releaseReadLockForApplications();
+ }
} else {
if (nextProcessor != null) {
@@ -91,4 +71,36 @@ public class GroupActivatedProcessor extends MessageProcessor {
}
}
}
+
+ private boolean doProcess (GroupActivatedEvent event,Topology topology) {
+
+ // Validate event against the existing topology
+ Application application = topology.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroup(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ }
+ } else {
+ // Apply changes to the topology
+ group.setStatus(Status.Activated);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Group updated as activated : %s",
+ group.toString()));
+ }
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
index 8e4e1b1..2d3f8b3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class InstanceSpawnedMessageProcessor extends MessageProcessor {
@@ -50,92 +51,103 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (InstanceSpawnedEvent event,Topology topology){
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
- }
- return false;
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
+ return false;
}
+ }
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
}
return false;
}
- if (cluster.memberExists(event.getMemberId())) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- Member member = new Member(event.getServiceName(), event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), event.getMemberId());
- member.setStatus(MemberStatus.Created);
- member.setMemberPublicIp(event.getMemberPublicIp());
- member.setMemberIp(event.getMemberIp());
- member.setLbClusterId(event.getLbClusterId());
- member.setProperties(event.getProperties());
- cluster.addMember(member);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ if (cluster.memberExists(event.getMemberId())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
+ // Apply changes to the topology
+ Member member = new Member(event.getServiceName(), event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), event.getMemberId());
+ member.setStatus(MemberStatus.Created);
+ member.setMemberPublicIp(event.getMemberPublicIp());
+ member.setMemberIp(event.getMemberIp());
+ member.setLbClusterId(event.getLbClusterId());
+ member.setProperties(event.getProperties());
+ cluster.addMember(member);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
index a5d701d..ec1b5ec 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberActivatedMessageProcessor extends MessageProcessor {
@@ -54,111 +55,123 @@ public class MemberActivatedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess (MemberActivatedEvent event,Topology topology) {
+
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
+ return false;
}
+ }
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
+ return false;
}
+ }
- // Validate event properties
- if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
- throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
+ // Validate event properties
+ if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
+ throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
+ throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
}
- if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
- throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
event.getServiceName(),
event.getClusterId(),
event.getMemberId()));
}
+ return false;
+ }
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
- }
- return false;
- }
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
- return false;
- }
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
return false;
}
+ }
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
- }
+ if (member.getStatus() == MemberStatus.Activated) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
- if (member.getStatus() == MemberStatus.Activated) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- member.addPorts(event.getPorts());
- member.setMemberIp(event.getMemberIp());
- member.setStatus(MemberStatus.Activated);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- }
+ // Apply changes to the topology
+ member.addPorts(event.getPorts());
+ member.setMemberIp(event.getMemberIp());
+ member.setStatus(MemberStatus.Activated);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
index b6dc489..b252a61 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberMaintenanceModeProcessor extends MessageProcessor {
@@ -51,98 +52,110 @@ public class MemberMaintenanceModeProcessor extends MessageProcessor {
MemberMaintenanceModeEvent event = (MemberMaintenanceModeEvent) Util.
jsonToObject(message, MemberMaintenanceModeEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (MemberMaintenanceModeEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- return false;
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
}
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
+ return false;
}
+ }
- if (member.getStatus() == MemberStatus.In_Maintenance) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already updated as In_Maintenance: " +
- "[service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- member.setStatus(MemberStatus.In_Maintenance);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member updated as In_Maintenance: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (member.getStatus() == MemberStatus.In_Maintenance) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already updated as In_Maintenance: " +
+ "[service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.In_Maintenance);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member updated as In_Maintenance: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
index 92115aa..f0c3580 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{
@@ -50,98 +51,111 @@ public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{
MemberReadyToShutdownEvent event = (MemberReadyToShutdownEvent) Util.
jsonToObject(message, MemberReadyToShutdownEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (MemberReadyToShutdownEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- return false;
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
}
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
+ return false;
}
+ }
- if (member.getStatus() == MemberStatus.ReadyToShutDown) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already updated as Ready to Shutdown: " +
- "[service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- member.setStatus(MemberStatus.ReadyToShutDown);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (member.getStatus() == MemberStatus.ReadyToShutDown) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already updated as Ready to Shutdown: " +
+ "[service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.ReadyToShutDown);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
index 4d93957..508ec39 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberStartedMessageProcessor extends MessageProcessor {
@@ -54,97 +55,109 @@ public class MemberStartedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (MemberStartedEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- return false;
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
}
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
+ return false;
}
+ }
- if (member.getStatus() == MemberStatus.Starting) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- member.setStatus(MemberStatus.Starting);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (member.getStatus() == MemberStatus.Starting) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.Starting);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}