You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2014/10/07 15:40:31 UTC
[2/4] initial changes for hierarchical topology locking
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
index 67c9b67..4473add 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberSuspendedMessageProcessor extends MessageProcessor {
@@ -54,96 +55,108 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (MemberSuspendedEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- return false;
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
}
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
+ return false;
}
+ }
- if (member.getStatus() == MemberStatus.Suspended) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- member.setStatus(MemberStatus.Suspended);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (member.getStatus() == MemberStatus.Suspended) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.Suspended);
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
index 5b5cbc9..3619b53 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
@@ -29,6 +29,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberTerminatedMessageProcessor extends MessageProcessor {
@@ -53,87 +54,99 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (MemberTerminatedEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Member member = cluster.getMember(event.getMemberId());
- if(member != null) {
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if(member != null) {
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
+ return false;
}
}
+ }
- // Notify event listeners before removing member object
- notifyEventListeners(event);
+ // Notify event listeners before removing member object
+ notifyEventListeners(event);
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
- // Remove member from the cluster
- cluster.removeMember(member);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
-
- return true;
} else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ // Remove member from the cluster
+ cluster.removeMember(member);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
index 2c216f0..1c4be8f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ServiceCreatedMessageProcessor extends MessageProcessor {
@@ -43,44 +44,21 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor {
if (ServiceCreatedEvent.class.getName().equals(type)) {
// Return if topology has not been initialized
- if (!topology.isInitialized())
+ if (!topology.isInitialized()) {
return false;
+ }
// Parse complete message and build event
ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
- }
+ TopologyManager.acquireWriteLockForServices();
+ try {
+ return doProcess(event, topology);
- // Validate event against the existing topology
- if (topology.serviceExists(event.getServiceName())) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service already created: [service] %s", event.getServiceName()));
- }
- } else {
-
- // Apply changes to the topology
- Service service = new Service(event.getServiceName(), event.getServiceType());
- service.addPorts(event.getPorts());
- topology.addService(service);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Service created: [service] %s", event.getServiceName()));
- }
+ } finally {
+ TopologyManager.releaseWriteLockForServices();
}
-
- notifyEventListeners(event);
- return true;
-
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
@@ -90,4 +68,40 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor {
}
}
}
+
+ private boolean doProcess (ServiceCreatedEvent event, Topology topology) {
+
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event against the existing topology
+ if (topology.serviceExists(event.getServiceName())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service already created: [service] %s", event.getServiceName()));
+ }
+ } else {
+
+ // Apply changes to the topology
+ Service service = new Service(event.getServiceName(), event.getServiceType());
+ service.addPorts(event.getPorts());
+ topology.addService(service);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Service created: [service] %s", event.getServiceName()));
+ }
+ }
+
+
+ notifyEventListeners(event);
+ return true;
+
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
index 2c0bc70..a38cbdc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ServiceRemovedMessageProcessor extends MessageProcessor {
@@ -49,38 +50,14 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
- }
-
- // Notify event listeners before removing service object
- notifyEventListeners(event);
+ TopologyManager.acquireWriteLockForServices();
+ try {
+ return doProcess(event, topology);
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
- }
- } else {
-
- // Apply changes to the topology
- topology.removeService(service);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Service removed: [service] %s", event.getServiceName()));
- }
+ } finally {
+ TopologyManager.releaseWriteLockForServices();
}
- return true;
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
@@ -90,4 +67,40 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
}
}
}
+
+ private boolean doProcess (ServiceRemovedEvent event, Topology topology) {
+
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Notify event listeners before removing service object
+ notifyEventListeners(event);
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ } else {
+
+ // Apply changes to the topology
+ topology.removeService(service);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Service removed: [service] %s", event.getServiceName()));
+ }
+ }
+
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index e3ddfa3..db9e8b1 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -47,7 +47,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
private GroupActivatedProcessor groupActivatedProcessor;
- private CompositeApplicationRemovedMessageProcessor compositeApplicationRemovedMessageProcessor;
+ //private CompositeApplicationRemovedMessageProcessor compositeApplicationRemovedMessageProcessor;
private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
private ApplicationRemovedMessageProcessor applicationRemovedMessageProcessor;
private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor;
@@ -109,11 +109,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
add(applicationActivatedMessageProcessor);
if (log.isDebugEnabled()) {
- log.debug("Grouping: added applicationCreatedMessageProcessor, applicationRemovedMessageProcessor: " +
- applicationCreatedMessageProcessor + " / " + applicationRemovedMessageProcessor);
- }
-
- if (log.isDebugEnabled()) {
log.debug("Topology message processor chain initialized X1");
}
}
@@ -153,9 +148,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
applicationCreatedMessageProcessor.addEventListener(eventListener);
} else if (eventListener instanceof ApplicationRemovedEventListener) {
applicationRemovedMessageProcessor.addEventListener(eventListener);
- if (log.isDebugEnabled()) {
- log.debug("Grouping: added eventlistener to applicationCreatedMessageProcessor: " + eventListener);
- }
} else if (eventListener instanceof ApplicationActivatedEventListener) {
applicationActivatedMessageProcessor.addEventListener(eventListener);
} else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 9cc8f78..218c441 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -69,15 +69,15 @@ class TopologyEventMessageDelegator implements Runnable {
log.debug(String.format("Topology event message [%s] received from queue: %s", type, messageQueue.getClass()));
}
- try {
- TopologyManager.acquireWriteLock();
+// try {
+// TopologyManager.acquireWriteLock();
if (log.isDebugEnabled()) {
log.debug(String.format("Delegating topology event message: %s", type));
}
processorChain.process(type, json, TopologyManager.getTopology());
- } finally {
- TopologyManager.releaseWriteLock();
- }
+// } finally {
+// TopologyManager.releaseWriteLock();
+// }
} catch (Exception e) {
log.error("Failed to retrieve topology event message", e);
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
index 5df66bd..2ffd7f6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
@@ -21,7 +21,13 @@ package org.apache.stratos.messaging.message.receiver.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
+
+import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
@@ -30,43 +36,454 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* Usage:
* Acquire a relevant lock and invoke the getTopology() method inside a try block.
* Once processing is done release the lock using a finally block.
+ *
+ * Acquiring Locks:
+ *
+ * Stratos supports hierarchical locking. As per the practice, we need to lock the
+ * hierarchy from root level till the relevant sub tree.
+ *
+ * Acquire a write lock:
+ *
+ * From root level, acquire read lock, and acquire a write lock only for the
+ * relevant sub tree.
+ *
+ * Acquire a read lock:
+ *
+ * From root level, acquire read locks till the relevant sub tree
+ *
+ * Examples -
+ *
+ * Example 1: Acquiring write lock for a Cluster to modify the Cluster object -
+ * acquiring:
+ * 1. acquire read lock for all Services
+ * 2. acquire read lock for the particular Service, to which the cluster belongs to
+ * 3. acquire write lock for the Cluster
+ *
+ * releasing:
+ * 1. release write lock for the Cluster
+ * 2. release read lock for the particular Service
+ * 3. release read lock for all Services
+ *
+ * Example 2: Acquiring write lock to add a new Cluster object -
+ * acquiring:
+ * 1. acquire read lock for all Services
+ * 2. acquire write lock for the particular Service, to which the cluster belongs to
+ *
+ * releasing:
+ * 1. release write lock for the particular Service
+ * 2. release read lock for all Services
+ *
+ * Example 3: Acquiring read lock to read Cluster information
+ * acquiring:
+ * 1. acquire read lock for all Services
+ * 2. acquire read lock for the particular Service, to which the cluster belongs to
+ * 3. acquire read lock for the relevant Cluster
+ *
+ * releasing:
+ * 1. release read lock for the relevant Cluster
+ * 2. release read lock for the particular Service
+ * 3. release read lock for all Services
+ *
+ * Example 4: Acquiring the write lock to add a deploy a Cartridge (add a new Service)
+ * acquire:
+ * 1. acquire write lock for all Services
+ *
+ * release:
+ * 1. release write lock for all Services
*/
public class TopologyManager {
private static final Log log = LogFactory.getLog(TopologyManager.class);
private static volatile Topology topology;
+ private static final TopologyLockHierarchy topologyLockHierarchy =
+ TopologyLockHierarchy.getInstance();
private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
+ // Top level locks - should be used to lock the entire Topology
+
public static void acquireReadLock() {
if(log.isDebugEnabled()) {
- log.debug("Read lock acquired");
+ log.debug("Read lock acquired for Topology");
}
readLock.lock();
}
public static void releaseReadLock() {
if(log.isDebugEnabled()) {
- log.debug("Read lock released");
+ log.debug("Read lock released for Topology");
}
readLock.unlock();
}
public static void acquireWriteLock() {
if(log.isDebugEnabled()) {
- log.debug("Write lock acquired");
+ log.debug("Write lock acquired for Topology");
}
writeLock.lock();
}
public static void releaseWriteLock() {
if(log.isDebugEnabled()) {
- log.debug("Write lock released");
+ log.debug("Write lock released for Topology");
}
writeLock.unlock();
}
+ // Application, Service and Cluster read locks
+
+ public static void acquireReadLockForApplications() {
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock acquired for Applications");
+ }
+ topologyLockHierarchy.getApplicatioLock().acquireReadLock();
+ }
+
+ public static void releaseReadLockForApplications() {
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock released for Applications");
+ }
+ topologyLockHierarchy.getApplicatioLock().releaseReadLock();
+ }
+
+ public static void acquireReadLockForServices() {
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock acquired for Services");
+ }
+ topologyLockHierarchy.getServiceLock().acquireReadLock();
+ }
+
+ public static void releaseReadLockForServices() {
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock released for Services");
+ }
+ topologyLockHierarchy.getServiceLock().releaseReadLock();
+ }
+
+ // Application, Service and Cluster write locks
+
+ public static void acquireWriteLockForApplications() {
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock acquired for Applications");
+ }
+ topologyLockHierarchy.getApplicatioLock().acquireWriteLock();
+ }
+
+ public static void releaseWriteLockForApplications() {
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock released for Applications");
+ }
+ topologyLockHierarchy.getApplicatioLock().releaseWritelock();
+ }
+
+ public static void acquireWriteLockForServices() {
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock acquired for Services");
+ }
+ topologyLockHierarchy.getServiceLock().acquireWriteLock();
+ }
+
+ public static void releaseWriteLockForServices() {
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock released for Services");
+ }
+ topologyLockHierarchy.getServiceLock().releaseWritelock();
+ }
+
+ public static void acquireReadLockForService (String serviceName) {
+
+ // acquire read lock for all Services
+ acquireReadLockForServices();
+
+ TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+ if (topologyServiceLock == null) {
+ handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+ } else {
+ topologyServiceLock.acquireReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock acquired for Service " + serviceName);
+ }
+ }
+ }
+
+ public static void releaseReadLockForService (String serviceName) {
+
+ TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+ if (topologyServiceLock == null) {
+ handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+ } else {
+ topologyServiceLock.releaseReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock released for Service " + serviceName);
+ }
+ }
+
+ // release read lock for all Services
+ releaseReadLockForServices();
+ }
+
+ public static void acquireWriteLockForService (String serviceName) {
+
+ // acquire read lock for all Applications
+ acquireReadLockForServices();
+
+ TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+ if (topologyServiceLock == null) {
+ handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+ } else {
+ topologyServiceLock.acquireWriteLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock acquired for Service " + serviceName);
+ }
+ }
+ }
+
+ public static void releaseWriteLockForService (String serviceName) {
+
+ TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+ if (topologyServiceLock == null) {
+ handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+ } else {
+ topologyServiceLock.releaseWritelock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock released for Service " + serviceName);
+ }
+ }
+
+ // release read lock for all Services
+ releaseReadLockForServices();
+ }
+
+ public static void acquireReadLockForCluster (String serviceName, String clusterId) {
+
+ // acquire read lock for the relevant Services
+ acquireReadLockForService(serviceName);
+
+ TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+ if (topologyClusterLock == null) {
+ handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+ } else {
+ // acquire read lock for the relevant Cluster
+ topologyClusterLock.acquireReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock acquired for Cluster " + clusterId);
+ }
+ }
+ }
+
+ public static void releaseReadLockForCluster (String serviceName, String clusterId) {
+
+ TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+ if (topologyClusterLock == null) {
+ handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+ } else {
+ // release read lock for the relevant Cluster
+ topologyClusterLock.releaseReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock released for Cluster " + clusterId);
+ }
+ }
+
+ // release read lock for relevant Service
+ releaseReadLockForService(serviceName);
+ }
+
+ public static void acquireWriteLockForCluster (String serviceName, String clusterId) {
+
+ // acquire read lock for the relevant Services
+ acquireReadLockForService(serviceName);
+
+ TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+ if (topologyClusterLock == null) {
+ handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+ } else {
+ topologyClusterLock.acquireWriteLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock acquired for Cluster " + clusterId);
+ }
+ }
+ }
+
+ public static void releaseWriteLockForCluster (String serviceName, String clusterId) {
+
+ TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+ if (topologyClusterLock == null) {
+ handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+ } else {
+ topologyClusterLock.releaseWritelock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock released for Cluster " + clusterId);
+ }
+ }
+
+ // release read lock for relevant Service
+ releaseReadLockForService(serviceName);
+ }
+
+ public static void acquireReadLockForApplication (String appId) {
+
+ // acquire read lock for all Applications
+ acquireReadLockForApplications();
+
+ // get the Application's cluster's and acquire read locks
+ Application application = topology.getApplication(appId);
+ if (application == null) {
+ log.warn("Application " + appId + " is not found in the Topology");
+
+ } else {
+
+ Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+ if (clusterData != null && !clusterData.isEmpty()) {
+ for (ClusterDataHolder clusterDataHolder : clusterData) {
+ // acquire read locks for services and clusters
+ acquireReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+ }
+
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("No Cluster Data found in Application " + appId);
+ }
+ }
+ }
+
+ TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+ if (topologyAppLock == null) {
+ handleLockNotFound("Topology lock not found for Application " + appId);
+
+ } else {
+ // now, lock Application
+ topologyAppLock.acquireReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock acquired for Application " + appId);
+ }
+ }
+ }
+
+ public static void releaseReadLockForApplication (String appId) {
+
+ TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+ if (topologyAppLock == null) {
+ handleLockNotFound("Topology lock not found for Application " + appId);
+
+ } else {
+ // release App lock
+ topologyAppLock.releaseReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock released for Application " + appId);
+ }
+ }
+
+ // release read lock for all Applications
+ releaseReadLockForApplications();
+
+ // get the Application's cluster information
+ Application application = topology.getApplication(appId);
+ if (application == null) {
+ log.warn("Application " + appId + " is not found in the Topology");
+
+ } else {
+ Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+ if (clusterData != null && !clusterData.isEmpty()) {
+ for (ClusterDataHolder clusterDataHolder : clusterData) {
+ // release read locks for clusters and services
+ releaseReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("No Cluster Data found in Application " + appId);
+ }
+ }
+ }
+ }
+
+ public static synchronized void acquireWriteLockForApplication (String appId) {
+
+ // acquire read lock for all Applications
+ acquireReadLockForApplications();
+
+ // get the Application's cluster's and acquire read locks
+ Application application = topology.getApplication(appId);
+ if (application == null) {
+ log.warn("Application " + appId + " is not found in the Topology");
+
+ } else {
+ Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+ if (clusterData != null && !clusterData.isEmpty()) {
+ for (ClusterDataHolder clusterDataHolder : clusterData) {
+ acquireWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("No Cluster Data found in Application " + appId);
+ }
+ }
+ }
+
+ TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+ if (topologyAppLock == null) {
+ handleLockNotFound("Topology lock not found for Application " + appId);
+
+ } else {
+ // now, lock Application
+ topologyAppLock.acquireWriteLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock acquired for Application " + appId);
+ }
+ }
+ }
+
+ public static synchronized void releaseWriteLockForApplication (String appId) {
+
+ TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+ if (topologyAppLock == null) {
+ handleLockNotFound("Topology lock not found for Application " + appId);
+
+ } else {
+ // release App lock
+ topologyAppLock.releaseWritelock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock released for Application " + appId);
+ }
+ }
+
+ // release read lock for all Applications
+ releaseReadLockForApplications();
+
+ // get the Application's cluster's and acquire read
+ Application application = topology.getApplication(appId);
+ if (application == null) {
+ log.warn("Application " + appId + " is not found in the Topology");
+
+ } else {
+ Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+ if (clusterData != null && !clusterData.isEmpty()) {
+ for (ClusterDataHolder clusterDataHolder : clusterData) {
+ // release read locks for clusters and services
+ releaseWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("No Cluster Data found in Application " + appId);
+ }
+ }
+ }
+ }
+
+ private static void handleLockNotFound (String errorMsg) {
+ log.warn(errorMsg);
+ //throw new RuntimeException(errorMsg);
+ }
+
public static Topology getTopology() {
if (topology == null) {
synchronized (TopologyManager.class){