You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2014/10/07 15:40:30 UTC
[1/4] git commit: hierarchical locking contd.
Repository: stratos
Updated Branches:
refs/heads/4.0.0-grouping 2536b30b3 -> 622ce9fb6
hierarchical locking contd.
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/622ce9fb
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/622ce9fb
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/622ce9fb
Branch: refs/heads/4.0.0-grouping
Commit: 622ce9fb6aa9b6ab4613626b32c805ff74d868cd
Parents: 4ace39c
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Tue Oct 7 19:08:25 2014 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Tue Oct 7 19:08:54 2014 +0530
----------------------------------------------------------------------
.../impl/CloudControllerServiceImpl.java | 6 -
.../controller/topology/TopologyBuilder.java | 7 +-
.../topology/TopologyEventPublisher.java | 6 +-
.../event/topology/ApplicationRemovedEvent.java | 15 +-
.../ApplicationActivatedMessageProcessor.java | 2 -
.../ApplicationCreatedMessageProcessor.java | 9 +
.../ApplicationRemovedMessageProcessor.java | 23 ++
.../topology/ClusterActivatedProcessor.java | 2 -
.../ClusterCreatedMessageProcessor.java | 2 -
.../ClusterMaintenanceModeMessageProcessor.java | 2 -
.../ClusterRemovedMessageProcessor.java | 2 -
.../topology/GroupActivatedProcessor.java | 2 -
.../receiver/topology/TopologyManager.java | 80 -------
.../topology/locking/TopologyLockingTest.java | 209 +++++++++++++++++++
14 files changed, 263 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
index a6f74df..df2c217 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
@@ -35,7 +35,6 @@ import org.apache.stratos.cloud.controller.interfaces.Iaas;
import org.apache.stratos.cloud.controller.persist.Deserializer;
import org.apache.stratos.cloud.controller.pojo.*;
import org.apache.stratos.cloud.controller.pojo.application.ApplicationContext;
-import org.apache.stratos.cloud.controller.pojo.payload.MetaDataHolder;
import org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisher;
import org.apache.stratos.cloud.controller.registry.RegistryManager;
import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
@@ -49,11 +48,6 @@ import org.apache.stratos.messaging.domain.topology.ConfigCompositeApplication;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.util.Constants;
-import org.apache.stratos.metadata.client.DefaultMetaDataServiceClient;
-import org.apache.stratos.metadata.client.MetaDataServiceClient;
-import org.apache.stratos.metadata.client.config.MetaDataClientConfig;
-import org.apache.stratos.metadata.client.exception.MetaDataServiceClientExeption;
-import org.apache.stratos.metadata.client.exception.RestClientException;
import org.jclouds.compute.ComputeService;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeMetadataBuilder;
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
index 30b1b00..4aa7023 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
@@ -664,12 +664,13 @@ public class TopologyBuilder {
if (!topology.applicationExists(applicationId)) {
log.warn("Application with id [ " + applicationId + " ] doesn't exist in Topology");
- TopologyEventPublisher.sendApplicationRemovedEvent(applicationId, tenantId, tenantDomain);
+ //TopologyEventPublisher.sendApplicationRemovedEvent(applicationId, tenantId, tenantDomain);
} else {
Application application = topology.getApplication(applicationId);
+ Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
// remove clusters
- for (ClusterDataHolder clusterDataHolder : application.getClusterDataRecursively()) {
+ for (ClusterDataHolder clusterDataHolder : clusterData) {
Service service = topology.getService(clusterDataHolder.getServiceType());
if (service != null) {
// remove Cluster
@@ -696,7 +697,7 @@ public class TopologyBuilder {
log.info("Removed application [ " + applicationId + " ] from Topology");
- TopologyEventPublisher.sendApplicationRemovedEvent(applicationId, tenantId, tenantDomain);
+ TopologyEventPublisher.sendApplicationRemovedEvent(applicationId, clusterData, tenantId, tenantDomain);
}
} finally {
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
index 53efeaa..a6d8350 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
@@ -39,6 +39,7 @@ import org.apache.stratos.messaging.util.Constants;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
/**
* this is to send the relevant events from cloud controller to topology topic
@@ -99,13 +100,14 @@ public class TopologyEventPublisher {
publishEvent(new ApplicationCreatedEvent(application, clusters));
}
- public static void sendApplicationRemovedEvent(String applicationId, int tenantId, String tenantDomain) {
+ public static void sendApplicationRemovedEvent(String applicationId, Set<ClusterDataHolder> clusterData,
+ int tenantId, String tenantDomain) {
if(log.isInfoEnabled() || log.isDebugEnabled()) {
log.info("Publishing Application removed event: " + applicationId + " tenantId: " + tenantId);
}
- publishEvent(new ApplicationRemovedEvent(applicationId, tenantId, tenantDomain));
+ publishEvent(new ApplicationRemovedEvent(applicationId, clusterData, tenantId, tenantDomain));
}
public static void sendClusterRemovedEvent(ClusterContext ctxt, String deploymentPolicy) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationRemovedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationRemovedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationRemovedEvent.java
index 1e51321..8b0f2cb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationRemovedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationRemovedEvent.java
@@ -19,6 +19,11 @@
package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
+
+import java.util.Set;
+
public class ApplicationRemovedEvent extends TopologyEvent {
/**
@@ -26,11 +31,15 @@ public class ApplicationRemovedEvent extends TopologyEvent {
*/
private static final long serialVersionUID = -5499420725533165623L;
private String applicationId;
+ private Set<ClusterDataHolder> clusterData;
+
private String tenantDomain;
private int tenantId;
- public ApplicationRemovedEvent (String applicationId, int tenantId, String tenantDomain) {
+ public ApplicationRemovedEvent (String applicationId, Set<ClusterDataHolder> clusterData,
+ int tenantId, String tenantDomain) {
this.applicationId = applicationId;
+ this.clusterData = clusterData;
this.tenantId = tenantId;
this.tenantDomain = tenantDomain;
}
@@ -46,4 +55,8 @@ public class ApplicationRemovedEvent extends TopologyEvent {
public String getTenantDomain() {
return tenantDomain;
}
+
+ public Set<ClusterDataHolder> getClusterData() {
+ return clusterData;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
index 803a871..4211830 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
@@ -57,7 +57,6 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor {
ApplicationActivatedEvent event = (ApplicationActivatedEvent) Util.
jsonToObject(message, ApplicationActivatedEvent.class);
- TopologyManager.acquireReadLockForApplications();
TopologyManager.acquireWriteLockForApplication(event.getAppId());
try {
@@ -65,7 +64,6 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor {
} finally {
TopologyManager.releaseWriteLockForApplication(event.getAppId());
- TopologyManager.releaseReadLockForApplications();
}
} else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
index 4368bd7..c47867e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
@@ -22,12 +22,15 @@ package org.apache.stratos.messaging.message.processor.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
+import java.util.Set;
+
public class ApplicationCreatedMessageProcessor extends MessageProcessor {
private static final Log log = LogFactory.getLog(ApplicationCreatedMessageProcessor.class);
@@ -55,11 +58,17 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
}
TopologyManager.acquireWriteLockForApplications();
+ for (ClusterDataHolder clusterData : event.getApplication().getClusterDataRecursively()) {
+ TopologyManager.acquireWriteLockForService(clusterData.getServiceType());
+ }
try {
return doProcess(event, topology);
} finally {
+ for (ClusterDataHolder clusterData : event.getApplication().getClusterDataRecursively()) {
+ TopologyManager.releaseWriteLockForService(clusterData.getServiceType());
+ }
TopologyManager.releaseWriteLockForApplications();
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
index ed6c2d4..629f21b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
@@ -21,6 +21,8 @@ package org.apache.stratos.messaging.message.processor.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
+import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ApplicationRemovedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -62,12 +64,18 @@ public class ApplicationRemovedMessageProcessor extends MessageProcessor {
}
TopologyManager.acquireWriteLockForApplications();
+ for (ClusterDataHolder clusterData : event.getClusterData()) {
+ TopologyManager.acquireWriteLockForService(clusterData.getServiceType());
+ }
try {
return doProcess(event, topology);
} finally {
TopologyManager.releaseWriteLockForApplications();
+ for (ClusterDataHolder clusterData : event.getClusterData()) {
+ TopologyManager.releaseWriteLockForService(clusterData.getServiceType());
+ }
}
} else {
@@ -102,6 +110,21 @@ public class ApplicationRemovedMessageProcessor extends MessageProcessor {
topology.removeApplication(appId);
}
+ if (event.getClusterData() != null) {
+ // remove the Clusters from the Topology
+ for (ClusterDataHolder clusterData : event.getClusterData()) {
+ Service service = topology.getService(clusterData.getServiceType());
+ if (service != null) {
+ service.removeCluster(clusterData.getClusterId());
+ if (log.isDebugEnabled()) {
+ log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology");
+ }
+ } else {
+ log.warn("Service " + clusterData.getServiceType() + " not found in Topology!");
+ }
+ }
+ }
+
if (log.isDebugEnabled()) {
log.debug("ApplicationRemovedMessageProcessor notifying listener ");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
index 78f772b..601cfb2 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
@@ -58,14 +58,12 @@ public class ClusterActivatedProcessor extends MessageProcessor {
ClusterActivatedEvent event = (ClusterActivatedEvent) Util.
jsonToObject(message, ClusterActivatedEvent.class);
- TopologyManager.acquireReadLockForServices();
TopologyManager.acquireWriteLockForService(event.getServiceName());
try {
return doProcess(event, topology);
} finally {
TopologyManager.releaseWriteLockForService(event.getServiceName());
- TopologyManager.releaseReadLockForServices();
}
} else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
index 94b9650..6eddfd3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -53,14 +53,12 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
- TopologyManager.acquireReadLockForServices();
TopologyManager.acquireWriteLockForService(event.getServiceName());
try {
return doProcess(event, topology);
} finally {
TopologyManager.releaseWriteLockForService(event.getServiceName());
- TopologyManager.releaseReadLockForServices();
}
} else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
index 8629363..0b10504 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
@@ -50,14 +50,12 @@ public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor {
ClusterMaintenanceModeEvent event = (ClusterMaintenanceModeEvent) Util.
jsonToObject(message, ClusterMaintenanceModeEvent.class);
- TopologyManager.acquireReadLockForServices();
TopologyManager.acquireWriteLockForService(event.getServiceName());
try {
return doProcess(event, topology);
} finally {
TopologyManager.releaseWriteLockForService(event.getServiceName());
- TopologyManager.releaseReadLockForServices();
}
} else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
index 1dfb929..fb45cd3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -51,14 +51,12 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
- TopologyManager.acquireReadLockForServices();
TopologyManager.acquireWriteLockForService(event.getServiceName());
try {
return doProcess(event, topology);
} finally {
TopologyManager.releaseWriteLockForService(event.getServiceName());
- TopologyManager.releaseReadLockForServices();
}
} else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
index 7200431..3bf5fad 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
@@ -51,7 +51,6 @@ public class GroupActivatedProcessor extends MessageProcessor {
GroupActivatedEvent event = (GroupActivatedEvent) Util.
jsonToObject(message, GroupActivatedEvent.class);
- TopologyManager.acquireReadLockForApplications();
TopologyManager.acquireWriteLockForApplication(event.getAppId());
try {
@@ -59,7 +58,6 @@ public class GroupActivatedProcessor extends MessageProcessor {
} finally {
TopologyManager.releaseWriteLockForApplication(event.getAppId());
- TopologyManager.releaseReadLockForApplications();
}
} else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
index 2ffd7f6..ed3b16a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
@@ -21,13 +21,10 @@ package org.apache.stratos.messaging.message.receiver.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Application;
-import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
-import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
@@ -335,27 +332,6 @@ public class TopologyManager {
// acquire read lock for all Applications
acquireReadLockForApplications();
- // get the Application's cluster's and acquire read locks
- Application application = topology.getApplication(appId);
- if (application == null) {
- log.warn("Application " + appId + " is not found in the Topology");
-
- } else {
-
- Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
- if (clusterData != null && !clusterData.isEmpty()) {
- for (ClusterDataHolder clusterDataHolder : clusterData) {
- // acquire read locks for services and clusters
- acquireReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
- }
-
- } else {
- if (log.isDebugEnabled()) {
- log.debug("No Cluster Data found in Application " + appId);
- }
- }
- }
-
TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
if (topologyAppLock == null) {
handleLockNotFound("Topology lock not found for Application " + appId);
@@ -385,25 +361,6 @@ public class TopologyManager {
// release read lock for all Applications
releaseReadLockForApplications();
-
- // get the Application's cluster information
- Application application = topology.getApplication(appId);
- if (application == null) {
- log.warn("Application " + appId + " is not found in the Topology");
-
- } else {
- Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
- if (clusterData != null && !clusterData.isEmpty()) {
- for (ClusterDataHolder clusterDataHolder : clusterData) {
- // release read locks for clusters and services
- releaseReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("No Cluster Data found in Application " + appId);
- }
- }
- }
}
public static synchronized void acquireWriteLockForApplication (String appId) {
@@ -411,24 +368,6 @@ public class TopologyManager {
// acquire read lock for all Applications
acquireReadLockForApplications();
- // get the Application's cluster's and acquire read locks
- Application application = topology.getApplication(appId);
- if (application == null) {
- log.warn("Application " + appId + " is not found in the Topology");
-
- } else {
- Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
- if (clusterData != null && !clusterData.isEmpty()) {
- for (ClusterDataHolder clusterDataHolder : clusterData) {
- acquireWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("No Cluster Data found in Application " + appId);
- }
- }
- }
-
TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
if (topologyAppLock == null) {
handleLockNotFound("Topology lock not found for Application " + appId);
@@ -458,25 +397,6 @@ public class TopologyManager {
// release read lock for all Applications
releaseReadLockForApplications();
-
- // get the Application's cluster's and acquire read
- Application application = topology.getApplication(appId);
- if (application == null) {
- log.warn("Application " + appId + " is not found in the Topology");
-
- } else {
- Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
- if (clusterData != null && !clusterData.isEmpty()) {
- for (ClusterDataHolder clusterDataHolder : clusterData) {
- // release read locks for clusters and services
- releaseWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("No Cluster Data found in Application " + appId);
- }
- }
- }
}
private static void handleLockNotFound (String errorMsg) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/topology/locking/TopologyLockingTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/topology/locking/TopologyLockingTest.java b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/topology/locking/TopologyLockingTest.java
new file mode 100644
index 0000000..5d33904
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/topology/locking/TopologyLockingTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.topology.locking;
+
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.junit.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+//@RunWith(ConcurrentRunner.class)
+//@Concurrent(threads = 12)
+public class TopologyLockingTest {
+
+ private static Topology topology;
+
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ System.out.println("Setting up TopologyLockingTest");
+ topology = TopologyManager.getTopology();
+
+ //add Services
+ topology.addService(new Service("service1", ServiceType.SingleTenant));
+ topology.addService(new Service("service2", ServiceType.SingleTenant));
+ topology.addService(new Service("service3", ServiceType.SingleTenant));
+ topology.addService(new Service("service4", ServiceType.SingleTenant));
+
+ // add Clusters
+ topology.getService("service1").addCluster(new Cluster("service1", "service1.cluster1.domain",
+ "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+ topology.getService("service1").addCluster(new Cluster("service1", "service1.cluster2.domain",
+ "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+
+ topology.getService("service2").addCluster(new Cluster("service2", "service2.cluster1.domain",
+ "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+ topology.getService("service2").addCluster(new Cluster("service2", "service2.cluster2.domain",
+ "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+
+ topology.getService("service3").addCluster(new Cluster("service3", "service3.cluster1.domain",
+ "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+ topology.getService("service3").addCluster(new Cluster("service3", "service3.cluster2.domain",
+ "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+
+ topology.getService("service4").addCluster(new Cluster("service4", "service4.cluster1.domain",
+ "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+ topology.getService("service4").addCluster(new Cluster("service4", "service4.cluster2.domain",
+ "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+
+ // Create Application 1
+ Application app1 = new Application("app1");
+ Map<String, ClusterDataHolder> clusterDataMap1 = new HashMap<String, ClusterDataHolder>();
+ clusterDataMap1.put("alias1", new ClusterDataHolder("service1", "service1.cluster1.domain"));
+ clusterDataMap1.put("alias2", new ClusterDataHolder("service1", "service1.cluster2.domain"));
+ clusterDataMap1.put("alias3", new ClusterDataHolder("service2", "service2.cluster1.domain"));
+ clusterDataMap1.put("alias4", new ClusterDataHolder("service2", "service2.cluster2.domain"));
+
+ // add cluster data to Application
+ app1.setClusterData(clusterDataMap1);
+
+ // add Applicaiton to Topology
+ topology.addApplication(app1);
+
+ // Create Application 2
+ Application app2 = new Application("app2");
+ Map<String, ClusterDataHolder> clusterDataMap2 = new HashMap<String, ClusterDataHolder>();
+ clusterDataMap2.put("alias5", new ClusterDataHolder("service3", "service3.cluster1.domain"));
+ clusterDataMap2.put("alias6", new ClusterDataHolder("service3", "service3.cluster2.domain"));
+ clusterDataMap2.put("alias7", new ClusterDataHolder("service4", "service4.cluster1.domain"));
+ clusterDataMap2.put("alias8", new ClusterDataHolder("service4", "service4.cluster2.domain"));
+
+ // add cluster data to Application
+ app2.setClusterData(clusterDataMap2);
+
+ // add Applicaiton to Topology
+ topology.addApplication(app2);
+ }
+
+ @Test
+ public void testAqcuireAndReleaseReadLocksForServices1To2 () {
+
+ TopologyManager.acquireReadLockForService("service1");
+ TopologyManager.acquireReadLockForService("service2");
+
+ TopologyManager.releaseReadLockForService("service1");
+ TopologyManager.releaseReadLockForService("service2");
+ }
+
+ @Test
+ public void testAqcuireAndReleaseReadLocksForServices3To4 () {
+
+ TopologyManager.acquireReadLockForService("service3");
+ TopologyManager.acquireReadLockForService("service4");
+
+ TopologyManager.releaseReadLockForService("service3");
+ TopologyManager.releaseReadLockForService("service4");
+ }
+
+ @Test
+ public void testAqcuireAndReleaseWriteLocksForServices1To2 () {
+
+ TopologyManager.acquireWriteLockForService("service1");
+ TopologyManager.acquireWriteLockForService("service2");
+
+ TopologyManager.releaseWriteLockForService("service1");
+ TopologyManager.releaseWriteLockForService("service2");
+ }
+
+ @Test
+ public void testAqcuireAndReleaseWriteLocksForServices3To4 () {
+
+ TopologyManager.acquireWriteLockForService("service3");
+ TopologyManager.acquireWriteLockForService("service4");
+
+ TopologyManager.releaseWriteLockForService("service3");
+ TopologyManager.releaseWriteLockForService("service4");
+ }
+
+ @Test
+ public void testAqcuireAndReleaseReadLocksForClustersOfService1 () {
+
+ TopologyManager.acquireReadLockForCluster("service1", "service1.cluster1.domain");
+ TopologyManager.acquireReadLockForCluster("service1", "service1.cluster2.domain");
+
+ TopologyManager.releaseReadLockForCluster("service1", "service1.cluster1.domain");
+ TopologyManager.releaseReadLockForCluster("service1", "service1.cluster2.domain");
+ }
+
+ @Test
+ public void testAqcuireAndReleaseReadLocksForClustersOfService2 () {
+
+ TopologyManager.acquireReadLockForCluster("service2", "service2.cluster1.domain");
+ TopologyManager.acquireReadLockForCluster("service2", "service2.cluster2.domain");
+
+ TopologyManager.releaseReadLockForCluster("service2", "service2.cluster1.domain");
+ TopologyManager.releaseReadLockForCluster("service2", "service2.cluster2.domain");
+ }
+
+ @Test
+ public void testAqcuireAndReleaseWriteLocksForClustersOfService1 () {
+
+ TopologyManager.acquireWriteLockForCluster("service1", "service1.cluster1.domain");
+ TopologyManager.acquireWriteLockForCluster("service1", "service1.cluster2.domain");
+
+ TopologyManager.acquireWriteLockForCluster("service1", "service1.cluster1.domain");
+ TopologyManager.acquireWriteLockForCluster("service1", "service1.cluster2.domain");
+ }
+
+ @Test
+ public void testAqcuireAndReleaseWriteLocksForClustersOfService2 () {
+
+ TopologyManager.acquireWriteLockForCluster("service2", "service2.cluster1.domain");
+ TopologyManager.acquireWriteLockForCluster("service2", "service2.cluster2.domain");
+
+ TopologyManager.acquireWriteLockForCluster("service2", "service2.cluster1.domain");
+ TopologyManager.acquireWriteLockForCluster("service2", "service2.cluster2.domain");
+ }
+
+ @Test
+ public void testAcquireAndReleaseReadLockForApp1 () {
+
+ TopologyManager.acquireReadLockForApplication("app1");
+ TopologyManager.releaseReadLockForApplication("app1");
+ }
+
+ @Test
+ public void testAcquireAndReleaseWriteLockForApp1 () {
+
+ TopologyManager.acquireWriteLockForApplication("app1");
+ TopologyManager.releaseWriteLockForApplication("app1");
+ }
+
+ @Test
+ public void testAcquireAndReleaseReadLockForApp2 () {
+
+ TopologyManager.acquireReadLockForApplication("app2");
+ TopologyManager.releaseReadLockForApplication("app2");
+ }
+
+ @Test
+ public void testAcquireAndReleaseWriteLockForApp2 () {
+
+ TopologyManager.acquireWriteLockForApplication("app2");
+ TopologyManager.releaseWriteLockForApplication("app2");
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ System.out.println("Cleaning up TopologyLockingTest");
+ topology = null;
+ }
+}
[2/4] initial changes for hierarchical topology locking
Posted by is...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
index 67c9b67..4473add 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberSuspendedMessageProcessor extends MessageProcessor {
@@ -54,96 +55,108 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (MemberSuspendedEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- return false;
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
}
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
+ return false;
}
+ }
- if (member.getStatus() == MemberStatus.Suspended) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- member.setStatus(MemberStatus.Suspended);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (member.getStatus() == MemberStatus.Suspended) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.Suspended);
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
index 5b5cbc9..3619b53 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
@@ -29,6 +29,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberTerminatedMessageProcessor extends MessageProcessor {
@@ -53,87 +54,99 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (MemberTerminatedEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Member member = cluster.getMember(event.getMemberId());
- if(member != null) {
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if(member != null) {
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
+ return false;
}
}
+ }
- // Notify event listeners before removing member object
- notifyEventListeners(event);
+ // Notify event listeners before removing member object
+ notifyEventListeners(event);
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
- // Remove member from the cluster
- cluster.removeMember(member);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
-
- return true;
} else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ // Remove member from the cluster
+ cluster.removeMember(member);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
index 2c216f0..1c4be8f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ServiceCreatedMessageProcessor extends MessageProcessor {
@@ -43,44 +44,21 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor {
if (ServiceCreatedEvent.class.getName().equals(type)) {
// Return if topology has not been initialized
- if (!topology.isInitialized())
+ if (!topology.isInitialized()) {
return false;
+ }
// Parse complete message and build event
ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
- }
+ TopologyManager.acquireWriteLockForServices();
+ try {
+ return doProcess(event, topology);
- // Validate event against the existing topology
- if (topology.serviceExists(event.getServiceName())) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service already created: [service] %s", event.getServiceName()));
- }
- } else {
-
- // Apply changes to the topology
- Service service = new Service(event.getServiceName(), event.getServiceType());
- service.addPorts(event.getPorts());
- topology.addService(service);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Service created: [service] %s", event.getServiceName()));
- }
+ } finally {
+ TopologyManager.releaseWriteLockForServices();
}
-
- notifyEventListeners(event);
- return true;
-
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
@@ -90,4 +68,40 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor {
}
}
}
+
+ private boolean doProcess (ServiceCreatedEvent event, Topology topology) {
+
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Validate event against the existing topology
+ if (topology.serviceExists(event.getServiceName())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service already created: [service] %s", event.getServiceName()));
+ }
+ } else {
+
+ // Apply changes to the topology
+ Service service = new Service(event.getServiceName(), event.getServiceType());
+ service.addPorts(event.getPorts());
+ topology.addService(service);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Service created: [service] %s", event.getServiceName()));
+ }
+ }
+
+
+ notifyEventListeners(event);
+ return true;
+
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
index 2c0bc70..a38cbdc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ServiceRemovedMessageProcessor extends MessageProcessor {
@@ -49,38 +50,14 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
- }
-
- // Notify event listeners before removing service object
- notifyEventListeners(event);
+ TopologyManager.acquireWriteLockForServices();
+ try {
+ return doProcess(event, topology);
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
- }
- } else {
-
- // Apply changes to the topology
- topology.removeService(service);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Service removed: [service] %s", event.getServiceName()));
- }
+ } finally {
+ TopologyManager.releaseWriteLockForServices();
}
- return true;
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
@@ -90,4 +67,40 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
}
}
}
+
+ private boolean doProcess (ServiceRemovedEvent event, Topology topology) {
+
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+ }
+ return false;
+ }
+ }
+
+ // Notify event listeners before removing service object
+ notifyEventListeners(event);
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ } else {
+
+ // Apply changes to the topology
+ topology.removeService(service);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Service removed: [service] %s", event.getServiceName()));
+ }
+ }
+
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index e3ddfa3..db9e8b1 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -47,7 +47,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
private GroupActivatedProcessor groupActivatedProcessor;
- private CompositeApplicationRemovedMessageProcessor compositeApplicationRemovedMessageProcessor;
+ //private CompositeApplicationRemovedMessageProcessor compositeApplicationRemovedMessageProcessor;
private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
private ApplicationRemovedMessageProcessor applicationRemovedMessageProcessor;
private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor;
@@ -109,11 +109,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
add(applicationActivatedMessageProcessor);
if (log.isDebugEnabled()) {
- log.debug("Grouping: added applicationCreatedMessageProcessor, applicationRemovedMessageProcessor: " +
- applicationCreatedMessageProcessor + " / " + applicationRemovedMessageProcessor);
- }
-
- if (log.isDebugEnabled()) {
log.debug("Topology message processor chain initialized X1");
}
}
@@ -153,9 +148,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
applicationCreatedMessageProcessor.addEventListener(eventListener);
} else if (eventListener instanceof ApplicationRemovedEventListener) {
applicationRemovedMessageProcessor.addEventListener(eventListener);
- if (log.isDebugEnabled()) {
- log.debug("Grouping: added eventlistener to applicationCreatedMessageProcessor: " + eventListener);
- }
} else if (eventListener instanceof ApplicationActivatedEventListener) {
applicationActivatedMessageProcessor.addEventListener(eventListener);
} else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 9cc8f78..218c441 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -69,15 +69,15 @@ class TopologyEventMessageDelegator implements Runnable {
log.debug(String.format("Topology event message [%s] received from queue: %s", type, messageQueue.getClass()));
}
- try {
- TopologyManager.acquireWriteLock();
+// try {
+// TopologyManager.acquireWriteLock();
if (log.isDebugEnabled()) {
log.debug(String.format("Delegating topology event message: %s", type));
}
processorChain.process(type, json, TopologyManager.getTopology());
- } finally {
- TopologyManager.releaseWriteLock();
- }
+// } finally {
+// TopologyManager.releaseWriteLock();
+// }
} catch (Exception e) {
log.error("Failed to retrieve topology event message", e);
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
index 5df66bd..2ffd7f6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
@@ -21,7 +21,13 @@ package org.apache.stratos.messaging.message.receiver.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
+
+import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
@@ -30,43 +36,454 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* Usage:
* Acquire a relevant lock and invoke the getTopology() method inside a try block.
* Once processing is done release the lock using a finally block.
+ *
+ * Acquiring Locks:
+ *
+ * Stratos supports hierarchical locking. As per the practice, we need to lock the
+ * hierarchy from root level till the relevant sub tree.
+ *
+ * Acquire a write lock:
+ *
+ * From root level, acquire read lock, and acquire a write lock only for the
+ * relevant sub tree.
+ *
+ * Acquire a read lock:
+ *
+ * From root level, acquire read locks till the relevant sub tree
+ *
+ * Examples -
+ *
+ * Example 1: Acquiring write lock for a Cluster to modify the Cluster object -
+ * acquiring:
+ * 1. acquire read lock for all Services
+ * 2. acquire read lock for the particular Service, to which the cluster belongs to
+ * 3. acquire write lock for the Cluster
+ *
+ * releasing:
+ * 1. release write lock for the Cluster
+ * 2. release read lock for the particular Service
+ * 3. release read lock for all Services
+ *
+ * Example 2: Acquiring write lock to add a new Cluster object -
+ * acquiring:
+ * 1. acquire read lock for all Services
+ * 2. acquire write lock for the particular Service, to which the cluster belongs to
+ *
+ * releasing:
+ * 1. release write lock for the particular Service
+ * 2. release read lock for all Services
+ *
+ * Example 3: Acquiring read lock to read Cluster information
+ * acquiring:
+ * 1. acquire read lock for all Services
+ * 2. acquire read lock for the particular Service, to which the cluster belongs to
+ * 3. acquire read lock for the relevant Cluster
+ *
+ * releasing:
+ * 1. release read lock for the relevant Cluster
+ * 2. release read lock for the particular Service
+ * 3. release read lock for all Services
+ *
+ * Example 4: Acquiring the write lock to add a deploy a Cartridge (add a new Service)
+ * acquire:
+ * 1. acquire write lock for all Services
+ *
+ * release:
+ * 1. release write lock for all Services
*/
public class TopologyManager {
private static final Log log = LogFactory.getLog(TopologyManager.class);
private static volatile Topology topology;
+ private static final TopologyLockHierarchy topologyLockHierarchy =
+ TopologyLockHierarchy.getInstance();
private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
+ // Top level locks - should be used to lock the entire Topology
+
public static void acquireReadLock() {
if(log.isDebugEnabled()) {
- log.debug("Read lock acquired");
+ log.debug("Read lock acquired for Topology");
}
readLock.lock();
}
public static void releaseReadLock() {
if(log.isDebugEnabled()) {
- log.debug("Read lock released");
+ log.debug("Read lock released for Topology");
}
readLock.unlock();
}
public static void acquireWriteLock() {
if(log.isDebugEnabled()) {
- log.debug("Write lock acquired");
+ log.debug("Write lock acquired for Topology");
}
writeLock.lock();
}
public static void releaseWriteLock() {
if(log.isDebugEnabled()) {
- log.debug("Write lock released");
+ log.debug("Write lock released for Topology");
}
writeLock.unlock();
}
+ // Application, Service and Cluster read locks
+
+ public static void acquireReadLockForApplications() {
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock acquired for Applications");
+ }
+ topologyLockHierarchy.getApplicatioLock().acquireReadLock();
+ }
+
+ public static void releaseReadLockForApplications() {
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock released for Applications");
+ }
+ topologyLockHierarchy.getApplicatioLock().releaseReadLock();
+ }
+
+ public static void acquireReadLockForServices() {
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock acquired for Services");
+ }
+ topologyLockHierarchy.getServiceLock().acquireReadLock();
+ }
+
+ public static void releaseReadLockForServices() {
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock released for Services");
+ }
+ topologyLockHierarchy.getServiceLock().releaseReadLock();
+ }
+
+ // Application, Service and Cluster write locks
+
+ public static void acquireWriteLockForApplications() {
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock acquired for Applications");
+ }
+ topologyLockHierarchy.getApplicatioLock().acquireWriteLock();
+ }
+
+ public static void releaseWriteLockForApplications() {
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock released for Applications");
+ }
+ topologyLockHierarchy.getApplicatioLock().releaseWritelock();
+ }
+
+ public static void acquireWriteLockForServices() {
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock acquired for Services");
+ }
+ topologyLockHierarchy.getServiceLock().acquireWriteLock();
+ }
+
+ public static void releaseWriteLockForServices() {
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock released for Services");
+ }
+ topologyLockHierarchy.getServiceLock().releaseWritelock();
+ }
+
+ public static void acquireReadLockForService (String serviceName) {
+
+ // acquire read lock for all Services
+ acquireReadLockForServices();
+
+ TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+ if (topologyServiceLock == null) {
+ handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+ } else {
+ topologyServiceLock.acquireReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock acquired for Service " + serviceName);
+ }
+ }
+ }
+
+ public static void releaseReadLockForService (String serviceName) {
+
+ TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+ if (topologyServiceLock == null) {
+ handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+ } else {
+ topologyServiceLock.releaseReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock released for Service " + serviceName);
+ }
+ }
+
+ // release read lock for all Services
+ releaseReadLockForServices();
+ }
+
+ public static void acquireWriteLockForService (String serviceName) {
+
+ // acquire read lock for all Applications
+ acquireReadLockForServices();
+
+ TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+ if (topologyServiceLock == null) {
+ handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+ } else {
+ topologyServiceLock.acquireWriteLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock acquired for Service " + serviceName);
+ }
+ }
+ }
+
+ public static void releaseWriteLockForService (String serviceName) {
+
+ TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+ if (topologyServiceLock == null) {
+ handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+ } else {
+ topologyServiceLock.releaseWritelock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock released for Service " + serviceName);
+ }
+ }
+
+ // release read lock for all Services
+ releaseReadLockForServices();
+ }
+
+ public static void acquireReadLockForCluster (String serviceName, String clusterId) {
+
+ // acquire read lock for the relevant Services
+ acquireReadLockForService(serviceName);
+
+ TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+ if (topologyClusterLock == null) {
+ handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+ } else {
+ // acquire read lock for the relevant Cluster
+ topologyClusterLock.acquireReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock acquired for Cluster " + clusterId);
+ }
+ }
+ }
+
+ public static void releaseReadLockForCluster (String serviceName, String clusterId) {
+
+ TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+ if (topologyClusterLock == null) {
+ handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+ } else {
+ // release read lock for the relevant Cluster
+ topologyClusterLock.releaseReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock released for Cluster " + clusterId);
+ }
+ }
+
+ // release read lock for relevant Service
+ releaseReadLockForService(serviceName);
+ }
+
+ public static void acquireWriteLockForCluster (String serviceName, String clusterId) {
+
+ // acquire read lock for the relevant Services
+ acquireReadLockForService(serviceName);
+
+ TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+ if (topologyClusterLock == null) {
+ handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+ } else {
+ topologyClusterLock.acquireWriteLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock acquired for Cluster " + clusterId);
+ }
+ }
+ }
+
+ public static void releaseWriteLockForCluster (String serviceName, String clusterId) {
+
+ TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+ if (topologyClusterLock == null) {
+ handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+ } else {
+ topologyClusterLock.releaseWritelock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock released for Cluster " + clusterId);
+ }
+ }
+
+ // release read lock for relevant Service
+ releaseReadLockForService(serviceName);
+ }
+
+ public static void acquireReadLockForApplication (String appId) {
+
+ // acquire read lock for all Applications
+ acquireReadLockForApplications();
+
+ // get the Application's cluster's and acquire read locks
+ Application application = topology.getApplication(appId);
+ if (application == null) {
+ log.warn("Application " + appId + " is not found in the Topology");
+
+ } else {
+
+ Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+ if (clusterData != null && !clusterData.isEmpty()) {
+ for (ClusterDataHolder clusterDataHolder : clusterData) {
+ // acquire read locks for services and clusters
+ acquireReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+ }
+
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("No Cluster Data found in Application " + appId);
+ }
+ }
+ }
+
+ TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+ if (topologyAppLock == null) {
+ handleLockNotFound("Topology lock not found for Application " + appId);
+
+ } else {
+ // now, lock Application
+ topologyAppLock.acquireReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock acquired for Application " + appId);
+ }
+ }
+ }
+
+ public static void releaseReadLockForApplication (String appId) {
+
+ TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+ if (topologyAppLock == null) {
+ handleLockNotFound("Topology lock not found for Application " + appId);
+
+ } else {
+ // release App lock
+ topologyAppLock.releaseReadLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Read lock released for Application " + appId);
+ }
+ }
+
+ // release read lock for all Applications
+ releaseReadLockForApplications();
+
+ // get the Application's cluster information
+ Application application = topology.getApplication(appId);
+ if (application == null) {
+ log.warn("Application " + appId + " is not found in the Topology");
+
+ } else {
+ Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+ if (clusterData != null && !clusterData.isEmpty()) {
+ for (ClusterDataHolder clusterDataHolder : clusterData) {
+ // release read locks for clusters and services
+ releaseReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("No Cluster Data found in Application " + appId);
+ }
+ }
+ }
+ }
+
+ public static synchronized void acquireWriteLockForApplication (String appId) {
+
+ // acquire read lock for all Applications
+ acquireReadLockForApplications();
+
+ // get the Application's cluster's and acquire read locks
+ Application application = topology.getApplication(appId);
+ if (application == null) {
+ log.warn("Application " + appId + " is not found in the Topology");
+
+ } else {
+ Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+ if (clusterData != null && !clusterData.isEmpty()) {
+ for (ClusterDataHolder clusterDataHolder : clusterData) {
+ acquireWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("No Cluster Data found in Application " + appId);
+ }
+ }
+ }
+
+ TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+ if (topologyAppLock == null) {
+ handleLockNotFound("Topology lock not found for Application " + appId);
+
+ } else {
+ // now, lock Application
+ topologyAppLock.acquireWriteLock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock acquired for Application " + appId);
+ }
+ }
+ }
+
+ public static synchronized void releaseWriteLockForApplication (String appId) {
+
+ TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+ if (topologyAppLock == null) {
+ handleLockNotFound("Topology lock not found for Application " + appId);
+
+ } else {
+ // release App lock
+ topologyAppLock.releaseWritelock();
+ if(log.isDebugEnabled()) {
+ log.debug("Write lock released for Application " + appId);
+ }
+ }
+
+ // release read lock for all Applications
+ releaseReadLockForApplications();
+
+ // get the Application's cluster's and acquire read
+ Application application = topology.getApplication(appId);
+ if (application == null) {
+ log.warn("Application " + appId + " is not found in the Topology");
+
+ } else {
+ Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+ if (clusterData != null && !clusterData.isEmpty()) {
+ for (ClusterDataHolder clusterDataHolder : clusterData) {
+ // release read locks for clusters and services
+ releaseWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("No Cluster Data found in Application " + appId);
+ }
+ }
+ }
+ }
+
+ private static void handleLockNotFound (String errorMsg) {
+ log.warn(errorMsg);
+ //throw new RuntimeException(errorMsg);
+ }
+
public static Topology getTopology() {
if (topology == null) {
synchronized (TopologyManager.class){
[3/4] initial changes for hierarchical topology locking
Posted by is...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
index c007343..94b9650 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ClusterCreatedMessageProcessor extends MessageProcessor {
@@ -41,83 +42,97 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Topology topology = (Topology) object;
if (ClusterCreatedEvent.class.getName().equals(type)) {
// Return if topology has not been initialized
- if (!topology.isInitialized())
+ if (!topology.isInitialized()) {
return false;
+ }
// Parse complete message and build event
ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
- }
+ TopologyManager.acquireReadLockForServices();
+ TopologyManager.acquireWriteLockForService(event.getServiceName());
+ try {
+ return doProcess(event, topology);
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } finally {
+ TopologyManager.releaseWriteLockForService(event.getServiceName());
+ TopologyManager.releaseReadLockForServices();
}
- // Validate event properties
- Cluster cluster = event.getCluster();
- if(cluster == null) {
- String msg = "Cluster object of cluster created event is null.";
- log.error(msg);
- throw new RuntimeException(msg);
- }
- if (cluster.getHostNames().isEmpty()) {
- throw new RuntimeException("Host name/s not found in cluster created event");
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ }
+ }
+
+ private boolean doProcess (ClusterCreatedEvent event,Topology topology) {
+
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- if (service.clusterExists(event.getClusterId())) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
- event.getClusterId()));
- }
- } else {
-
- // Apply changes to the topology
- service.addCluster(cluster);
- if (log.isInfoEnabled()) {
- log.info(String.format("Cluster created: %s",
- cluster.toString()));
- }
- }
+ }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+ }
+ return false;
+ }
+ }
+ // Validate event properties
+ Cluster cluster = event.getCluster();
+ if(cluster == null) {
+ String msg = "Cluster object of cluster created event is null.";
+ log.error(msg);
+ throw new RuntimeException(msg);
+ }
+ if (cluster.getHostNames().isEmpty()) {
+ throw new RuntimeException("Host name/s not found in cluster created event");
+ }
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ if (service.clusterExists(event.getClusterId())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
+ event.getClusterId()));
+ }
} else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+
+ // Apply changes to the topology
+ service.addCluster(cluster);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Cluster created: %s",
+ cluster.toString()));
}
}
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
index f125c54..8629363 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor {
@@ -49,64 +50,77 @@ public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor {
ClusterMaintenanceModeEvent event = (ClusterMaintenanceModeEvent) Util.
jsonToObject(message, ClusterMaintenanceModeEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireReadLockForServices();
+ TopologyManager.acquireWriteLockForService(event.getServiceName());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForService(event.getServiceName());
+ TopologyManager.releaseReadLockForServices();
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (ClusterMaintenanceModeEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
+ }
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
- event.getClusterId()));
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
- } else {
- // Apply changes to the topology
- cluster.setStatus(Status.In_Maintenance);
- if (log.isInfoEnabled()) {
- log.info(String.format("Cluster updated as maintenance mode: %s",
- cluster.toString()));
- }
- }
+ return false;
+ }
+ }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
+ event.getClusterId()));
+ }
} else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ // Apply changes to the topology
+ cluster.setStatus(Status.In_Maintenance);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Cluster updated as maintenance mode: %s",
+ cluster.toString()));
}
}
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
index 69ef5b0..1dfb929 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ClusterRemovedMessageProcessor extends MessageProcessor {
@@ -50,65 +51,79 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireReadLockForServices();
+ TopologyManager.acquireWriteLockForService(event.getServiceName());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForService(event.getServiceName());
+ TopologyManager.releaseReadLockForServices();
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (ClusterRemovedEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
+ }
- // Notify event listeners before removing the cluster object
- notifyEventListeners(event);
-
- if (!service.clusterExists(event.getClusterId())) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(),
- event.getClusterId()));
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
- } else {
-
- // Apply changes to the topology
- service.removeCluster(event.getClusterId());
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
+ return false;
+ }
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
}
+ return false;
+ }
+
+ // Notify event listeners before removing the cluster object
+ notifyEventListeners(event);
- return true;
+ if (!service.clusterExists(event.getClusterId())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(),
+ event.getClusterId()));
+ }
} else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+
+ // Apply changes to the topology
+ service.removeCluster(event.getClusterId());
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
}
}
+
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
index 135bdae..6d5cb8f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
import java.util.ArrayList;
@@ -49,102 +50,20 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
if (CompleteTopologyEvent.class.getName().equals(type)) {
// Parse complete message and build event
CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
-
- // if topology has not already initialized
- if (!topology.isInitialized()) {
-
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- // Add services included in service filter
- for (Service service : event.getTopology().getServices()) {
- if (TopologyServiceFilter.getInstance()
- .serviceNameIncluded(service.getServiceName())) {
- topology.addService(service);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Service is excluded: [service] %s",
- service.getServiceName()));
- }
- }
- }
- } else {
- // Add all services
- topology.addServices(event.getTopology().getServices());
- }
-
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- for (Service service : topology.getServices()) {
- List<Cluster> clustersToRemove = new ArrayList<Cluster>();
- for (Cluster cluster : service.getClusters()) {
- if (TopologyClusterFilter.getInstance()
- .clusterIdExcluded(cluster.getClusterId())) {
- clustersToRemove.add(cluster);
- }
- }
- for (Cluster cluster : clustersToRemove) {
- service.removeCluster(cluster);
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Cluster is excluded: [cluster] %s",
- cluster.getClusterId()));
- }
- }
- }
- }
-
- // Apply member filter
- if (TopologyMemberFilter.getInstance().isActive()) {
- for (Service service : topology.getServices()) {
- for (Cluster cluster : service.getClusters()) {
- List<Member> membersToRemove = new ArrayList<Member>();
- for (Member member : cluster.getMembers()) {
- if (TopologyMemberFilter.getInstance()
- .lbClusterIdExcluded(
- member.getLbClusterId())) {
- membersToRemove.add(member);
- }
- }
- for (Member member : membersToRemove) {
- cluster.removeMember(member);
- if (log.isDebugEnabled()) {
- log.debug(String
- .format("Member is excluded: [member] %s [lb-cluster-id] %s",
- member.getMemberId(),
- member.getLbClusterId()));
- }
- }
- }
- }
- }
-
- // add existing Applications to Topology
- Collection<Application> applications = event.getTopology().getApplications();
- if (applications != null && !applications.isEmpty()) {
- for (Application application : applications) {
- topology.addApplication(application);
- if (log.isDebugEnabled()) {
- log.debug("Application with id [ " + application.getId() + " ] added to Topology");
- }
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("No Application information found in Complete Topology event");
- }
- }
- if (log.isInfoEnabled()) {
- log.info("Topology initialized");
- }
+ if (!topology.isInitialized()) {
+ TopologyManager.acquireWriteLock();
+
+ try {
+ return doProcess(event, topology);
- // Set topology initialized
- topology.setInitialized(true);
- }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ } else {
+ return true;
+ }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
@@ -153,4 +72,99 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
return false;
}
}
+
+ private boolean doProcess (CompleteTopologyEvent event, Topology topology) {
+
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ // Add services included in service filter
+ for (Service service : event.getTopology().getServices()) {
+ if (TopologyServiceFilter.getInstance()
+ .serviceNameIncluded(service.getServiceName())) {
+ topology.addService(service);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Service is excluded: [service] %s",
+ service.getServiceName()));
+ }
+ }
+ }
+ } else {
+ // Add all services
+ topology.addServices(event.getTopology().getServices());
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ for (Service service : topology.getServices()) {
+ List<Cluster> clustersToRemove = new ArrayList<Cluster>();
+ for (Cluster cluster : service.getClusters()) {
+ if (TopologyClusterFilter.getInstance()
+ .clusterIdExcluded(cluster.getClusterId())) {
+ clustersToRemove.add(cluster);
+ }
+ }
+ for (Cluster cluster : clustersToRemove) {
+ service.removeCluster(cluster);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Cluster is excluded: [cluster] %s",
+ cluster.getClusterId()));
+ }
+ }
+ }
+ }
+
+ // Apply member filter
+ if (TopologyMemberFilter.getInstance().isActive()) {
+ for (Service service : topology.getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ List<Member> membersToRemove = new ArrayList<Member>();
+ for (Member member : cluster.getMembers()) {
+ if (TopologyMemberFilter.getInstance()
+ .lbClusterIdExcluded(
+ member.getLbClusterId())) {
+ membersToRemove.add(member);
+ }
+ }
+ for (Member member : membersToRemove) {
+ cluster.removeMember(member);
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Member is excluded: [member] %s [lb-cluster-id] %s",
+ member.getMemberId(),
+ member.getLbClusterId()));
+ }
+ }
+ }
+ }
+ }
+
+ // add existing Applications to Topology
+ Collection<Application> applications = event.getTopology().getApplications();
+ if (applications != null && !applications.isEmpty()) {
+ for (Application application : applications) {
+ topology.addApplication(application);
+ if (log.isDebugEnabled()) {
+ log.debug("Application with id [ " + application.getId() + " ] added to Topology");
+ }
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("No Application information found in Complete Topology event");
+ }
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Topology initialized");
+ }
+
+ // Set topology initialized
+ topology.setInitialized(true);
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
index 627d9a9..7200431 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
@@ -21,11 +21,9 @@ package org.apache.stratos.messaging.message.processor.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent;
import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
-import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
/**
@@ -53,34 +51,16 @@ public class GroupActivatedProcessor extends MessageProcessor {
GroupActivatedEvent event = (GroupActivatedEvent) Util.
jsonToObject(message, GroupActivatedEvent.class);
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- }
- Group group = application.getGroup(event.getGroupId());
+ TopologyManager.acquireReadLockForApplications();
+ TopologyManager.acquireWriteLockForApplication(event.getAppId());
- if (group == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
- event.getGroupId()));
- }
- } else {
- // Apply changes to the topology
- group.setStatus(Status.Activated);
- if (log.isInfoEnabled()) {
- log.info(String.format("Group updated as activated : %s",
- group.toString()));
- }
- }
+ try {
+ return doProcess(event, topology);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+ } finally {
+ TopologyManager.releaseWriteLockForApplication(event.getAppId());
+ TopologyManager.releaseReadLockForApplications();
+ }
} else {
if (nextProcessor != null) {
@@ -91,4 +71,36 @@ public class GroupActivatedProcessor extends MessageProcessor {
}
}
}
+
+ private boolean doProcess (GroupActivatedEvent event,Topology topology) {
+
+ // Validate event against the existing topology
+ Application application = topology.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroup(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ }
+ } else {
+ // Apply changes to the topology
+ group.setStatus(Status.Activated);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Group updated as activated : %s",
+ group.toString()));
+ }
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
index 8e4e1b1..2d3f8b3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class InstanceSpawnedMessageProcessor extends MessageProcessor {
@@ -50,92 +51,103 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (InstanceSpawnedEvent event,Topology topology){
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
- }
- return false;
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
+ return false;
}
+ }
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
}
return false;
}
- if (cluster.memberExists(event.getMemberId())) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- Member member = new Member(event.getServiceName(), event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), event.getMemberId());
- member.setStatus(MemberStatus.Created);
- member.setMemberPublicIp(event.getMemberPublicIp());
- member.setMemberIp(event.getMemberIp());
- member.setLbClusterId(event.getLbClusterId());
- member.setProperties(event.getProperties());
- cluster.addMember(member);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ if (cluster.memberExists(event.getMemberId())) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
+ // Apply changes to the topology
+ Member member = new Member(event.getServiceName(), event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), event.getMemberId());
+ member.setStatus(MemberStatus.Created);
+ member.setMemberPublicIp(event.getMemberPublicIp());
+ member.setMemberIp(event.getMemberIp());
+ member.setLbClusterId(event.getLbClusterId());
+ member.setProperties(event.getProperties());
+ cluster.addMember(member);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
index a5d701d..ec1b5ec 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberActivatedMessageProcessor extends MessageProcessor {
@@ -54,111 +55,123 @@ public class MemberActivatedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess (MemberActivatedEvent event,Topology topology) {
+
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
+ return false;
}
+ }
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
+ return false;
}
+ }
- // Validate event properties
- if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
- throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
+ // Validate event properties
+ if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
+ throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
+ throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
}
- if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
- throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
event.getServiceName(),
event.getClusterId(),
event.getMemberId()));
}
+ return false;
+ }
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
- }
- return false;
- }
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
- }
- return false;
- }
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
return false;
}
+ }
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
- }
+ if (member.getStatus() == MemberStatus.Activated) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
- if (member.getStatus() == MemberStatus.Activated) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- member.addPorts(event.getPorts());
- member.setMemberIp(event.getMemberIp());
- member.setStatus(MemberStatus.Activated);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- }
+ // Apply changes to the topology
+ member.addPorts(event.getPorts());
+ member.setMemberIp(event.getMemberIp());
+ member.setStatus(MemberStatus.Activated);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
index b6dc489..b252a61 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberMaintenanceModeProcessor extends MessageProcessor {
@@ -51,98 +52,110 @@ public class MemberMaintenanceModeProcessor extends MessageProcessor {
MemberMaintenanceModeEvent event = (MemberMaintenanceModeEvent) Util.
jsonToObject(message, MemberMaintenanceModeEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (MemberMaintenanceModeEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- return false;
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
}
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
+ return false;
}
+ }
- if (member.getStatus() == MemberStatus.In_Maintenance) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already updated as In_Maintenance: " +
- "[service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- member.setStatus(MemberStatus.In_Maintenance);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member updated as In_Maintenance: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (member.getStatus() == MemberStatus.In_Maintenance) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already updated as In_Maintenance: " +
+ "[service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.In_Maintenance);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member updated as In_Maintenance: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
index 92115aa..f0c3580 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{
@@ -50,98 +51,111 @@ public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{
MemberReadyToShutdownEvent event = (MemberReadyToShutdownEvent) Util.
jsonToObject(message, MemberReadyToShutdownEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (MemberReadyToShutdownEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- return false;
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
}
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
+ return false;
}
+ }
- if (member.getStatus() == MemberStatus.ReadyToShutDown) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already updated as Ready to Shutdown: " +
- "[service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- member.setStatus(MemberStatus.ReadyToShutDown);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (member.getStatus() == MemberStatus.ReadyToShutDown) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already updated as Ready to Shutdown: " +
+ "[service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.ReadyToShutDown);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
index 4d93957..508ec39 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class MemberStartedMessageProcessor extends MessageProcessor {
@@ -54,97 +55,109 @@ public class MemberStartedMessageProcessor extends MessageProcessor {
// Parse complete message and build event
MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (MemberStartedEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
- event.getServiceName(), event.getClusterId()));
+ }
+
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
return false;
}
- Member member = cluster.getMember(event.getMemberId());
- if (member == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- return false;
+ }
+
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
}
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+ event.getServiceName(), event.getClusterId()));
+ }
+ return false;
+ }
+ Member member = cluster.getMember(event.getMemberId());
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
+ }
+ return false;
+ }
- // Apply member filter
- if(TopologyMemberFilter.getInstance().isActive()) {
- if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
- }
- return false;
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
}
+ return false;
}
+ }
- if (member.getStatus() == MemberStatus.Starting) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
- } else {
-
- // Apply changes to the topology
- member.setStatus(MemberStatus.Starting);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
- event.getServiceName(),
- event.getClusterId(),
- event.getMemberId()));
- }
+ if (member.getStatus() == MemberStatus.Starting) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
+ } else {
+ // Apply changes to the topology
+ member.setStatus(MemberStatus.Starting);
- // Notify event listeners
- notifyEventListeners(event);
- return true;
- } else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
+ event.getServiceName(),
+ event.getClusterId(),
+ event.getMemberId()));
}
}
+
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
}
[4/4] git commit: initial changes for hierarchical topology locking
Posted by is...@apache.org.
initial changes for hierarchical topology locking
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4ace39c8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4ace39c8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4ace39c8
Branch: refs/heads/4.0.0-grouping
Commit: 4ace39c85ab1f989e888dcc48afd2c1092ff245f
Parents: 2536b30
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Tue Oct 7 10:21:48 2014 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Tue Oct 7 19:08:54 2014 +0530
----------------------------------------------------------------------
.../AutoscalerHealthStatEventReceiver.java | 18 +-
.../AutoscalerTopologyEventReceiver.java | 151 ++++---
.../controller/topology/TopologyBuilder.java | 7 +-
.../LoadBalancerTopologyEventReceiver.java | 77 +++-
.../StratosManagerTopologyEventReceiver.java | 78 +++-
.../messaging/domain/topology/Service.java | 9 +-
.../messaging/domain/topology/Topology.java | 11 +-
.../domain/topology/locking/TopologyLock.java | 49 +++
.../topology/locking/TopologyLockHierarchy.java | 147 +++++++
.../ApplicationActivatedMessageProcessor.java | 54 ++-
.../ApplicationCreatedMessageProcessor.java | 65 +--
.../ApplicationRemovedMessageProcessor.java | 73 ++--
.../topology/ClusterActivatedProcessor.java | 108 +++--
.../ClusterCreatedMessageProcessor.java | 131 +++---
.../ClusterMaintenanceModeMessageProcessor.java | 104 +++--
.../ClusterRemovedMessageProcessor.java | 107 +++--
.../CompleteTopologyMessageProcessor.java | 200 +++++----
.../topology/GroupActivatedProcessor.java | 70 +--
.../InstanceSpawnedMessageProcessor.java | 150 ++++---
.../MemberActivatedMessageProcessor.java | 183 ++++----
.../MemberMaintenanceModeProcessor.java | 159 +++----
.../MemberReadyToShutdownMessageProcessor.java | 160 +++----
.../topology/MemberStartedMessageProcessor.java | 157 +++----
.../MemberSuspendedMessageProcessor.java | 155 +++----
.../MemberTerminatedMessageProcessor.java | 139 +++---
.../ServiceCreatedMessageProcessor.java | 74 ++--
.../ServiceRemovedMessageProcessor.java | 71 ++--
.../topology/TopologyMessageProcessorChain.java | 10 +-
.../topology/TopologyEventMessageDelegator.java | 10 +-
.../receiver/topology/TopologyManager.java | 425 ++++++++++++++++++-
30 files changed, 2070 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
index 26d3179..a1213f6 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -593,8 +593,11 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
}
private Member findMember(String memberId) {
+
+ TopologyManager.acquireReadLockForServices();
+
try {
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
for(Service service : TopologyManager.getTopology().getServices()) {
for(Cluster cluster : service.getClusters()) {
if(cluster.memberExists(memberId)) {
@@ -605,7 +608,8 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
return null;
}
finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForServices();
}
}
@@ -613,8 +617,13 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
try {
AbstractClusterMonitor monitor = getMonitor(clusterId);
NetworkPartitionContext nwPartitionCtxt;
+
+ // TODO: the optimal way would be to add Service Name to member fault event and use to get a
+ // hierarchical lock using TopologyManager.acquireReadLockForCluster(serviceName, clusterid)
+ TopologyManager.acquireReadLockForServices();
+
try{
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
Member member = findMember(memberId);
if(null == member){
@@ -634,7 +643,8 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
}
}finally{
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForServices();
}
// start a new member in the same Partition
String partitionId = monitor.getPartitionOfMember(memberId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index 3e1bfe2..4064510 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -90,22 +90,23 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
- try {
+
+ if (!topologyInitialized) {
TopologyManager.acquireReadLock();
- if(!topologyInitialized) {
- topologyInitialized = true;
+
+ try {
for (Application application : TopologyManager.getTopology().getApplications()) {
startApplicationMonitor(application);
}
+
+ topologyInitialized = true;
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
}
- } catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
}
}
-
-
});
@@ -118,12 +119,19 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
ApplicationCreatedEvent applicationCreatedEvent = (ApplicationCreatedEvent) event;
//acquire read lock
- TopologyManager.acquireReadLock();
- //start the application monitor
- //TODO catch exception by ApplicationMonitor
- startApplicationMonitor(applicationCreatedEvent.getApplication());
- //release read lock
- TopologyManager.releaseReadLock();
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForApplication(applicationCreatedEvent.getApplication().getId());
+
+ try {
+ //start the application monitor
+ //TODO catch exception by ApplicationMonitor
+ startApplicationMonitor(applicationCreatedEvent.getApplication());
+
+ } finally {
+ //release read lock
+ TopologyManager.releaseReadLockForApplication(applicationCreatedEvent.getApplication().getId());
+ //TopologyManager.releaseReadLock();
+ }
}
});
@@ -197,7 +205,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
ApplicationRemovedEvent applicationRemovedEvent = (ApplicationRemovedEvent) event;
//acquire read lock
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForApplication(applicationRemovedEvent.getApplicationId());
try {
//TODO remove monitors as well as any starting or pending threads
@@ -222,7 +231,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
} finally {
//release read lock
- TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForApplication(applicationRemovedEvent.getApplicationId());
+ //TopologyManager.releaseReadLock();
}
}
@@ -283,26 +293,34 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new ClusterMaintenanceModeEventListener() {
@Override
protected void onEvent(Event event) {
+
+ ClusterMaintenanceModeEvent clusterMaitenanceEvent = null;
+
try {
log.info("Event received: " + event);
- ClusterMaintenanceModeEvent e = (ClusterMaintenanceModeEvent) event;
- TopologyManager.acquireReadLock();
- Service service = TopologyManager.getTopology().getService(e.getServiceName());
- Cluster cluster = service.getCluster(e.getClusterId());
+ clusterMaitenanceEvent = (ClusterMaintenanceModeEvent) event;
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireWriteLockForCluster(clusterMaitenanceEvent.getServiceName(),
+ clusterMaitenanceEvent.getClusterId());
+
+ Service service = TopologyManager.getTopology().getService(clusterMaitenanceEvent.getServiceName());
+ Cluster cluster = service.getCluster(clusterMaitenanceEvent.getClusterId());
if (AutoscalerContext.getInstance().monitorExist((cluster.getClusterId()))) {
- AutoscalerContext.getInstance().getMonitor(e.getClusterId()).
- setStatus(e.getStatus());
+ AutoscalerContext.getInstance().getMonitor(clusterMaitenanceEvent.getClusterId()).
+ setStatus(clusterMaitenanceEvent.getStatus());
} else if (AutoscalerContext.getInstance().
lbMonitorExist((cluster.getClusterId()))) {
- AutoscalerContext.getInstance().getLBMonitor(e.getClusterId()).
- setStatus(e.getStatus());
+ AutoscalerContext.getInstance().getLBMonitor(clusterMaitenanceEvent.getClusterId()).
+ setStatus(clusterMaitenanceEvent.getStatus());
} else {
log.error("cluster monitor not exists for the cluster: " + cluster.toString());
}
} catch (Exception e) {
log.error("Error processing event", e);
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseWriteLockForCluster(clusterMaitenanceEvent.getServiceName(),
+ clusterMaitenanceEvent.getClusterId());
}
}
@@ -312,16 +330,20 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
@Override
protected void onEvent(Event event) {
+
+ ClusterRemovedEvent clusterRemovedEvent = null;
try {
- ClusterRemovedEvent e = (ClusterRemovedEvent) event;
- TopologyManager.acquireReadLock();
+ clusterRemovedEvent = (ClusterRemovedEvent) event;
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(clusterRemovedEvent.getServiceName(),
+ clusterRemovedEvent.getClusterId());
- String clusterId = e.getClusterId();
- String deploymentPolicy = e.getDeploymentPolicy();
+ String clusterId = clusterRemovedEvent.getClusterId();
+ String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
AbstractClusterMonitor monitor;
- if (e.isLbCluster()) {
+ if (clusterRemovedEvent.isLbCluster()) {
DeploymentPolicy depPolicy = PolicyManager.getInstance().
getDeploymentPolicy(deploymentPolicy);
if (depPolicy != null) {
@@ -362,7 +384,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
} catch (Exception e) {
log.error("Error processing event", e);
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(clusterRemovedEvent.getServiceName(),
+ clusterRemovedEvent.getClusterId());
}
}
@@ -380,14 +404,19 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
@Override
protected void onEvent(Event event) {
+ MemberTerminatedEvent memberTerminatedEvent = null;
try {
- TopologyManager.acquireReadLock();
- MemberTerminatedEvent e = (MemberTerminatedEvent) event;
- String networkPartitionId = e.getNetworkPartitionId();
- String clusterId = e.getClusterId();
- String partitionId = e.getPartitionId();
+ //TopologyManager.acquireReadLock();
+
+ memberTerminatedEvent = (MemberTerminatedEvent) event;
+ String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
+ String clusterId = memberTerminatedEvent.getClusterId();
+ String partitionId = memberTerminatedEvent.getPartitionId();
AbstractClusterMonitor monitor;
+ TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
+
if (AutoscalerContext.getInstance().monitorExist(clusterId)) {
monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
} else {
@@ -400,7 +429,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
PartitionContext partitionContext = networkPartitionContext.
getPartitionCtxt(partitionId);
- String memberId = e.getMemberId();
+ String memberId = memberTerminatedEvent.getMemberId();
partitionContext.removeMemberStatsContext(memberId);
if (partitionContext.removeTerminationPendingMember(memberId)) {
@@ -431,7 +460,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
} catch (Exception e) {
log.error("Error processing event", e);
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
}
}
@@ -441,14 +472,18 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
@Override
protected void onEvent(Event event) {
+ MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(),
+ memberActivatedEvent.getClusterId());
+
try {
- TopologyManager.acquireReadLock();
- MemberActivatedEvent e = (MemberActivatedEvent) event;
- String networkPartitionId = e.getNetworkPartitionId();
- String clusterId = e.getClusterId();
- String partitionId = e.getPartitionId();
- String memberId = e.getMemberId();
+ String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
+ String clusterId = memberActivatedEvent.getClusterId();
+ String partitionId = memberActivatedEvent.getPartitionId();
+ String memberId = memberActivatedEvent.getMemberId();
AbstractClusterMonitor monitor;
@@ -476,12 +511,14 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
// partitionContext.incrementCurrentActiveMemberCount(1);
partitionContext.movePendingMemberToActiveMembers(memberId);
//triggering the status checker
- StatusChecker.getInstance().onMemberStatusChange(e.getClusterId());
+ StatusChecker.getInstance().onMemberStatusChange(memberActivatedEvent.getClusterId());
} catch (Exception e) {
log.error("Error processing event", e);
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(),
+ memberActivatedEvent.getClusterId());
}
}
});
@@ -490,16 +527,20 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
@Override
protected void onEvent(Event event) {
+ MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+ memberMaintenanceModeEvent.getClusterId());
+
try {
- TopologyManager.acquireReadLock();
- MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent) event;
- String memberId = e.getMemberId();
- String partitionId = e.getPartitionId();
- String networkPartitionId = e.getNetworkPartitionId();
+ String memberId = memberMaintenanceModeEvent.getMemberId();
+ String partitionId = memberMaintenanceModeEvent.getPartitionId();
+ String networkPartitionId = memberMaintenanceModeEvent.getNetworkPartitionId();
PartitionContext partitionContext;
- String clusterId = e.getClusterId();
+ String clusterId = memberMaintenanceModeEvent.getClusterId();
AbstractClusterMonitor monitor;
if (AutoscalerContext.getInstance().monitorExist(clusterId)) {
@@ -521,7 +562,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
} catch (Exception e) {
log.error("Error processing event", e);
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+ memberMaintenanceModeEvent.getClusterId());
}
}
});
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
index 6d50f6c..30b1b00 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
@@ -608,7 +608,7 @@ public class TopologyBuilder {
}
}
- public static void handleApplicationDeployed(Application application,
+ public static synchronized void handleApplicationDeployed(Application application,
Set<ApplicationClusterContext> applicationClusterContexts,
Set<MetaDataHolder> metaDataHolders) {
@@ -644,6 +644,7 @@ public class TopologyBuilder {
// add to Topology and update
topology.addApplication(application);
TopologyManager.updateTopology(topology);
+
log.info("Application with id [ " + application.getId() + " ] added to Topology successfully");
TopologyEventPublisher.sendApplicationCreatedEvent(application ,clusters);
@@ -653,7 +654,8 @@ public class TopologyBuilder {
}
}
- public static void handleApplicationUndeployed(FasterLookUpDataHolder dataHolder, String applicationId, int tenantId, String tenantDomain) {
+ public static synchronized void handleApplicationUndeployed(FasterLookUpDataHolder dataHolder,
+ String applicationId, int tenantId, String tenantDomain) {
Topology topology = TopologyManager.getTopology();
@@ -672,6 +674,7 @@ public class TopologyBuilder {
if (service != null) {
// remove Cluster
service.removeCluster(clusterDataHolder.getClusterId());
+
if (log.isDebugEnabled()) {
log.debug("Removed cluster with id " + clusterDataHolder.getClusterId());
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
index a5b5e42..4222075 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
@@ -118,10 +118,15 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
+
+ MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireWriteLockForCluster(memberActivatedEvent.getServiceName(),
+ memberActivatedEvent.getClusterId());
+
try {
- TopologyManager.acquireReadLock();
- MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
Service service = TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName());
if (service == null) {
if (log.isWarnEnabled()) {
@@ -167,16 +172,24 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
} catch (Exception e) {
log.error("Error processing event", e);
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseWriteLockForCluster(memberActivatedEvent.getServiceName(),
+ memberActivatedEvent.getClusterId());
}
}
});
topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
@Override
protected void onEvent(Event event) {
+
+ MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+
+ TopologyManager.acquireWriteLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+ memberMaintenanceModeEvent.getClusterId());
+
try {
- TopologyManager.acquireReadLock();
- MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+ //TopologyManager.acquireReadLock();
+
Member member = findMember(memberMaintenanceModeEvent.getServiceName(),
memberMaintenanceModeEvent.getClusterId(), memberMaintenanceModeEvent.getMemberId());
@@ -186,16 +199,22 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
} catch (Exception e) {
log.error("Error processing event", e);
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseWriteLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+ memberMaintenanceModeEvent.getClusterId());
}
}
});
topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
@Override
protected void onEvent(Event event) {
+
+ MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event;
+ TopologyManager.acquireWriteLockForCluster(memberSuspendedEvent.getServiceName(),
+ memberSuspendedEvent.getClusterId());
+
try {
- TopologyManager.acquireReadLock();
- MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event;
+ //TopologyManager.acquireReadLock();
Member member = findMember(memberSuspendedEvent.getServiceName(),
memberSuspendedEvent.getClusterId(), memberSuspendedEvent.getMemberId());
@@ -205,16 +224,23 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
} catch (Exception e) {
log.error("Error processing event", e);
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseWriteLockForCluster(memberSuspendedEvent.getServiceName(),
+ memberSuspendedEvent.getClusterId());
}
}
});
topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
+
+ //TopologyManager.acquireReadLock();
+ MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+
+ TopologyManager.acquireWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
+
try {
- TopologyManager.acquireReadLock();
- MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
Member member = findMember(memberTerminatedEvent.getServiceName(),
memberTerminatedEvent.getClusterId(), memberTerminatedEvent.getMemberId());
@@ -224,18 +250,23 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
} catch (Exception e) {
log.error("Error processing event", e);
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
}
}
});
topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
@Override
protected void onEvent(Event event) {
- try {
- TopologyManager.acquireReadLock();
- // Remove cluster from context
- ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+ // Remove cluster from context
+ ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+ TopologyManager.acquireWriteLockForCluster(clusterRemovedEvent.getServiceName(),
+ clusterRemovedEvent.getClusterId());
+
+ try {
+ //TopologyManager.acquireReadLock();
Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId());
if (cluster != null) {
for (Member member : cluster.getMembers()) {
@@ -251,18 +282,23 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
} catch (Exception e) {
log.error("Error processing event", e);
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseWriteLockForCluster(clusterRemovedEvent.getServiceName(),
+ clusterRemovedEvent.getClusterId());
}
}
});
topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
@Override
protected void onEvent(Event event) {
+
+ ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event;
+ TopologyManager.acquireWriteLockForServices();
+
try {
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
// Remove all clusters of given service from context
- ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event;
Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName());
if (service != null) {
for (Cluster cluster : service.getClusters()) {
@@ -280,7 +316,8 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
} catch (Exception e) {
log.error("Error processing event", e);
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseWriteLockForServices();
}
}
});
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
index c3a5719..d61f474 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
@@ -23,8 +23,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.manager.exception.ADCException;
import org.apache.stratos.manager.exception.ApplicationSubscriptionException;
-import org.apache.stratos.manager.exception.CompositeApplicationDefinitionException;
-import org.apache.stratos.manager.exception.CompositeApplicationException;
import org.apache.stratos.manager.manager.CartridgeSubscriptionManager;
import org.apache.stratos.manager.subscription.ApplicationSubscription;
import org.apache.stratos.manager.topology.model.TopologyClusterInformationModel;
@@ -95,7 +93,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
String serviceType = clustercreatedEvent.getServiceName();
//acquire read lock
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(clustercreatedEvent.getServiceName(),
+ clustercreatedEvent.getClusterId());
try {
Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clustercreatedEvent.getClusterId());
@@ -103,7 +103,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
} finally {
//release read lock
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(clustercreatedEvent.getServiceName(),
+ clustercreatedEvent.getClusterId());
}
}
@@ -137,14 +139,18 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
String serviceType = instanceSpawnedEvent.getServiceName();
//acquire read lock
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(instanceSpawnedEvent.getServiceName(),
+ instanceSpawnedEvent.getClusterId());
try {
Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
TopologyClusterInformationModel.getInstance().addCluster(cluster);
} finally {
//release read lock
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(instanceSpawnedEvent.getServiceName(),
+ instanceSpawnedEvent.getClusterId());
}
}
});
@@ -162,14 +168,18 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
String serviceType = memberStartedEvent.getServiceName();
//acquire read lock
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(memberStartedEvent.getServiceName(),
+ memberStartedEvent.getClusterId());
try {
Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
TopologyClusterInformationModel.getInstance().addCluster(cluster);
} finally {
//release read lock
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(memberStartedEvent.getServiceName(),
+ memberStartedEvent.getClusterId());
}
}
@@ -188,14 +198,18 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
String serviceType = memberActivatedEvent.getServiceName();
//acquire read lock
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(),
+ memberActivatedEvent.getClusterId());
try {
Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
TopologyClusterInformationModel.getInstance().addCluster(cluster);
} finally {
//release read lock
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(),
+ memberActivatedEvent.getClusterId());
}
}
});
@@ -213,7 +227,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
String serviceType = memberSuspendedEvent.getServiceName();
//acquire read lock
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(memberSuspendedEvent.getServiceName(),
+ memberSuspendedEvent.getClusterId());
try {
Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
@@ -221,7 +237,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
} finally {
//release read lock
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(memberSuspendedEvent.getServiceName(),
+ memberSuspendedEvent.getClusterId());
}
}
});
@@ -239,7 +257,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
String serviceType = memberTerminatedEvent.getServiceName();
//acquire read lock
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
try {
Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
@@ -247,8 +267,12 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
// check and remove terminated member
if (cluster.memberExists(memberTerminatedEvent.getMemberId())) {
// release the read lock and acquire the write lock
- TopologyManager.releaseReadLock();
- TopologyManager.acquireWriteLock();
+// TopologyManager.releaseReadLock();
+// TopologyManager.acquireWriteLock();
+ TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
+ TopologyManager.acquireWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
try {
// re-check the state; another thread might have acquired the write lock and modified
@@ -263,17 +287,23 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
// downgrade to read lock - 1. acquire read lock, 2. release write lock
// acquire read lock
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
} finally {
// release the write lock
- TopologyManager.releaseWriteLock();
+ // TopologyManager.releaseWriteLock();
+ TopologyManager.releaseWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
}
}
TopologyClusterInformationModel.getInstance().addCluster(cluster);
} finally {
//release read lock
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
}
}
});
@@ -288,7 +318,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
log.info("[ApplicationCreatedEventListener] Received: " + event.getClass());
try {
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForApplication(appCreateEvent.getApplication().getId());
// create and persist Application subscritpion
CartridgeSubscriptionManager cartridgeSubscriptionManager = new CartridgeSubscriptionManager();
@@ -318,7 +349,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
PrivilegedCarbonContext.endTenantFlow();
}
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForApplication(appCreateEvent.getApplication().getId());
}
}
});
@@ -333,7 +365,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
log.info("[ApplicationRemovedEventListener] Received: " + event.getClass());
try {
- TopologyManager.acquireReadLock();
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForApplication(appRemovedEvent.getApplicationId());
// create and persist Application subscritpion
CartridgeSubscriptionManager cartridgeSubscriptionManager = new CartridgeSubscriptionManager();
@@ -360,7 +393,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
PrivilegedCarbonContext.endTenantFlow();
}
} finally {
- TopologyManager.releaseReadLock();
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForApplication(appRemovedEvent.getApplicationId());
}
}
});
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
index 46d46d4..d03cfda 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
@@ -19,6 +19,9 @@
package org.apache.stratos.messaging.domain.topology;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
+
import java.io.Serializable;
import java.util.*;
@@ -59,14 +62,18 @@ public class Service implements Serializable{
public void addCluster(Cluster cluster) {
this.clusterIdClusterMap.put(cluster.getClusterId(), cluster);
+ TopologyLockHierarchy.getInstance().addClusterLock(cluster.getClusterId(), new TopologyLock());
}
public void removeCluster(Cluster cluster) {
this.clusterIdClusterMap.remove(cluster.getClusterId());
+ TopologyLockHierarchy.getInstance().removeTopologyLockForCluster(cluster.getClusterId());
}
public Cluster removeCluster(String clusterId) {
- return this.clusterIdClusterMap.remove(clusterId);
+ Cluster removedCluster = this.clusterIdClusterMap.remove(clusterId);
+ TopologyLockHierarchy.getInstance().removeTopologyLockForCluster(clusterId);
+ return removedCluster;
}
public boolean clusterExists(String clusterId) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
index f8b535f..dabf611 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
@@ -26,8 +26,8 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.util.CompositeApplicationBuilder;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
/**
* Defines a topology of serviceMap in Stratos.
@@ -54,6 +54,7 @@ public class Topology implements Serializable {
public void addApplication (Application application) {
this.applicationMap.put(application.getId(), application);
+ TopologyLockHierarchy.getInstance().addApplicationLock(application.getId(), new TopologyLock());
}
public Application getApplication (String applicationId) {
@@ -62,6 +63,7 @@ public class Topology implements Serializable {
public void removeApplication (String applicationId) {
applicationMap.remove(applicationId);
+ TopologyLockHierarchy.getInstance().removeTopologyLockForApplication(applicationId);
}
public Collection<Application> getApplications () {
@@ -78,9 +80,10 @@ public class Topology implements Serializable {
public void addService(Service service) {
this.serviceMap.put(service.getServiceName(), service);
+ TopologyLockHierarchy.getInstance().addServiceLock(service.getServiceName(), new TopologyLock());
}
- public void addServices(Collection<Service> services) {
+ public synchronized void addServices(Collection<Service> services) {
for (Service service : services) {
addService(service);
}
@@ -88,10 +91,12 @@ public class Topology implements Serializable {
public void removeService(Service service) {
this.serviceMap.remove(service.getServiceName());
+ TopologyLockHierarchy.getInstance().removeTopologyLockForService(service.getServiceName());
}
public void removeService(String serviceName) {
this.serviceMap.remove(serviceName);
+ TopologyLockHierarchy.getInstance().removeTopologyLockForService(serviceName);
}
public Service getService(String serviceName) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java
new file mode 100644
index 0000000..e1b90ad
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.domain.topology.locking;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class TopologyLock {
+
+ private final ReentrantReadWriteLock lock;
+
+ public TopologyLock () {
+ lock = new ReentrantReadWriteLock(true);
+ }
+
+ public void acquireWriteLock() {
+ lock.writeLock().lock();
+ }
+
+ public void releaseWritelock() {
+ if (lock.isWriteLockedByCurrentThread()) {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void acquireReadLock() {
+ lock.readLock().lock();
+ }
+
+ public void releaseReadLock() {
+ lock.readLock().unlock();
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
new file mode 100644
index 0000000..aa0f7a1
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.domain.topology.locking;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TopologyLockHierarchy {
+
+ private static final Log log = LogFactory.getLog(TopologyLockHierarchy.class);
+
+ // lock for Services
+ private TopologyLock serviceLock;
+
+ // lock for Applications
+ private TopologyLock applicatioLock;
+
+ // key = Service.name
+ private Map<String, TopologyLock> serviceNameToTopologyLockMap;
+
+ // key = Application.id
+ private Map<String, TopologyLock> applicationIdToTopologyLockMap;
+
+ // key = Cluster.id
+ private Map<String, TopologyLock> clusterIdToTopologyLockMap;
+
+ private static volatile TopologyLockHierarchy topologyLockHierarchy;
+
+ private TopologyLockHierarchy () {
+
+ this.serviceLock = new TopologyLock();
+ this.applicatioLock = new TopologyLock();
+ this.serviceNameToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>();
+ this.applicationIdToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>();
+ this.clusterIdToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>();
+ }
+
+ public static TopologyLockHierarchy getInstance () {
+
+ if (topologyLockHierarchy == null) {
+ synchronized (TopologyLockHierarchy.class) {
+ if (topologyLockHierarchy == null) {
+ topologyLockHierarchy = new TopologyLockHierarchy();
+ }
+ }
+ }
+
+ return topologyLockHierarchy;
+ }
+
+ public void addApplicationLock (String appId, final TopologyLock topologyLock) {
+
+ if (!applicationIdToTopologyLockMap.containsKey(appId)) {
+ synchronized (applicationIdToTopologyLockMap) {
+ if (!applicationIdToTopologyLockMap.containsKey(appId)) {
+ applicationIdToTopologyLockMap.put(appId, topologyLock);
+ log.info("Added lock for Application " + appId);
+ }
+ }
+ } else {
+ log.warn("Topology Lock for Application " + appId + " already exists");
+ }
+ }
+
+ public TopologyLock getTopologyLockForApplication (String appId) {
+ return applicationIdToTopologyLockMap.get(appId);
+ }
+
+ public void addServiceLock (String serviceName, final TopologyLock topologyLock) {
+
+ if (!serviceNameToTopologyLockMap.containsKey(serviceName)) {
+ synchronized (serviceNameToTopologyLockMap) {
+ if (!serviceNameToTopologyLockMap.containsKey(serviceName)) {
+ serviceNameToTopologyLockMap.put(serviceName, topologyLock);
+ log.info("Added lock for Service " + serviceName);
+ }
+ }
+ } else {
+ log.warn("Topology Lock for Service " + serviceName + " already exists");
+ }
+ }
+
+ public TopologyLock getTopologyLockForService (String serviceName) {
+ return serviceNameToTopologyLockMap.get(serviceName);
+ }
+
+ public void addClusterLock (String clusterId, final TopologyLock topologyLock) {
+
+ if (!clusterIdToTopologyLockMap.containsKey(clusterId)) {
+ synchronized (clusterIdToTopologyLockMap) {
+ if (!clusterIdToTopologyLockMap.containsKey(clusterId)) {
+ clusterIdToTopologyLockMap.put(clusterId, topologyLock);
+ log.info("Added lock for Cluster " + clusterId);
+ }
+ }
+ } else {
+ log.warn("Topology Lock for Cluster " + clusterId + " already exists");
+ }
+ }
+
+ public TopologyLock getTopologyLockForCluster (String clusterId) {
+ return clusterIdToTopologyLockMap.get(clusterId);
+ }
+
+ public void removeTopologyLockForApplication (String appId) {
+ applicationIdToTopologyLockMap.remove(appId);
+ log.info("Removed lock for Application " + appId);
+ }
+
+ public void removeTopologyLockForService (String serviceName) {
+ serviceNameToTopologyLockMap.remove(serviceName);
+ log.info("Removed lock for Service " + serviceName);
+ }
+
+ public void removeTopologyLockForCluster (String clusterId) {
+ clusterIdToTopologyLockMap.remove(clusterId);
+ log.info("Removed lock for Cluster " + clusterId);
+ }
+
+ public TopologyLock getServiceLock() {
+ return serviceLock;
+ }
+
+ public TopologyLock getApplicatioLock() {
+ return applicatioLock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
index 8c1e66b..803a871 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Status;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ApplicationActivatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
/**
@@ -56,26 +57,16 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor {
ApplicationActivatedEvent event = (ApplicationActivatedEvent) Util.
jsonToObject(message, ApplicationActivatedEvent.class);
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
- if (application == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Application does not exist: [service] %s",
- event.getAppId()));
- }
- return false;
- } else {
- // Apply changes to the topology
- application.setStatus(Status.Activated);
- if (log.isInfoEnabled()) {
- log.info(String.format("Application updated as activated : %s",
- application.toString()));
- }
- }
+ TopologyManager.acquireReadLockForApplications();
+ TopologyManager.acquireWriteLockForApplication(event.getAppId());
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForApplication(event.getAppId());
+ TopologyManager.releaseReadLockForApplications();
+ }
} else {
if (nextProcessor != null) {
@@ -86,4 +77,29 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor {
}
}
}
+
+ private boolean doProcess (ApplicationActivatedEvent event, Topology topology) {
+
+ // Validate event against the existing topology
+ Application application = topology.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ } else {
+ // Apply changes to the topology
+ application.setStatus(Status.Activated);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Application updated as activated : %s",
+ application.toString()));
+ }
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
index f02e4be..4368bd7 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ApplicationCreatedMessageProcessor extends MessageProcessor {
@@ -47,40 +48,21 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
return false;
}
- ApplicationCreatedEvent appCreatedEvent = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
- if (appCreatedEvent == null) {
+ ApplicationCreatedEvent event = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
+ if (event == null) {
log.error("Unable to convert the JSON message to ApplicationCreatedEvent");
return false;
}
- // check if required properties are available
- if (appCreatedEvent.getApplication() == null) {
- String errorMsg = "Application object of application created event is invalid";
- log.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
-
- if (appCreatedEvent.getApplication().getId() == null || appCreatedEvent.getApplication().getId().isEmpty()) {
- String errorMsg = "App id of application created event is invalid: [ " + appCreatedEvent.getApplication().getId() + " ]";
- log.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
+ TopologyManager.acquireWriteLockForApplications();
- // check if an Application with same name exists in topology
- if (topology.applicationExists(appCreatedEvent.getApplication().getId())) {
- log.warn("Application with id [ " + appCreatedEvent.getApplication().getId() + " ] already exists in Topology");
+ try {
+ return doProcess(event, topology);
- } else {
- // add application and the clusters to Topology
- for(Cluster cluster: appCreatedEvent.getClusterList()) {
- topology.getService(cluster.getServiceName()).addCluster(cluster);
- }
- topology.addApplication(appCreatedEvent.getApplication());
+ } finally {
+ TopologyManager.releaseWriteLockForApplications();
}
- notifyEventListeners(appCreatedEvent);
- return true;
-
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
@@ -90,4 +72,35 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
}
}
}
+
+ private boolean doProcess (ApplicationCreatedEvent event,Topology topology) {
+
+ // check if required properties are available
+ if (event.getApplication() == null) {
+ String errorMsg = "Application object of application created event is invalid";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ if (event.getApplication().getId() == null || event.getApplication().getId().isEmpty()) {
+ String errorMsg = "App id of application created event is invalid: [ " + event.getApplication().getId() + " ]";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ // check if an Application with same name exists in topology
+ if (topology.applicationExists(event.getApplication().getId())) {
+ log.warn("Application with id [ " + event.getApplication().getId() + " ] already exists in Topology");
+
+ } else {
+ // add application and the clusters to Topology
+ for(Cluster cluster: event.getClusterList()) {
+ topology.getService(cluster.getServiceName()).addCluster(cluster);
+ }
+ topology.addApplication(event.getApplication());
+ }
+
+ notifyEventListeners(event);
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
index c9dbb07..ed6c2d4 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
@@ -22,9 +22,9 @@ package org.apache.stratos.messaging.message.processor.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
import org.apache.stratos.messaging.event.topology.ApplicationRemovedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
public class ApplicationRemovedMessageProcessor extends MessageProcessor {
@@ -55,38 +55,20 @@ public class ApplicationRemovedMessageProcessor extends MessageProcessor {
return false;
}
- ApplicationRemovedEvent appRemovedEvent = (ApplicationRemovedEvent) Util.jsonToObject(message, ApplicationRemovedEvent.class);
- if (appRemovedEvent == null) {
+ ApplicationRemovedEvent event = (ApplicationRemovedEvent) Util.jsonToObject(message, ApplicationRemovedEvent.class);
+ if (event == null) {
log.error("Unable to convert the JSON message to ApplicationCreatedEvent");
return false;
}
-
- // check if required properties are available
- if (appRemovedEvent.getApplicationId() == null) {
- String errorMsg = "Application Id of application removed event is invalid";
- log.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
-
- if (appRemovedEvent.getTenantDomain()== null) {
- String errorMsg = "Application tenant domain of application removed event is invalid";
- log.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
-
- // check if an Application with same name exists in topology
- String appId = appRemovedEvent.getApplicationId();
- if (topology.applicationExists(appId)) {
- log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it");
- topology.removeApplication(appId);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("ApplicationRemovedMessageProcessor notifying listener " + object);
- }
-
- notifyEventListeners(appRemovedEvent);
- return true;
+
+ TopologyManager.acquireWriteLockForApplications();
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForApplications();
+ }
} else {
if (nextProcessor != null) {
@@ -96,6 +78,35 @@ public class ApplicationRemovedMessageProcessor extends MessageProcessor {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
-
+ }
+
+ private boolean doProcess (ApplicationRemovedEvent event, Topology topology) {
+
+ // check if required properties are available
+ if (event.getApplicationId() == null) {
+ String errorMsg = "Application Id of application removed event is invalid";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ if (event.getTenantDomain()== null) {
+ String errorMsg = "Application tenant domain of application removed event is invalid";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ // check if an Application with same name exists in topology
+ String appId = event.getApplicationId();
+ if (topology.applicationExists(appId)) {
+ log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it");
+ topology.removeApplication(appId);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("ApplicationRemovedMessageProcessor notifying listener ");
+ }
+
+ notifyEventListeners(event);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
index 52de45b..78f772b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
@@ -25,10 +25,10 @@ import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Status;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent;
-import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Util;
/**
@@ -45,75 +45,91 @@ public class ClusterActivatedProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
+
Topology topology = (Topology) object;
if (ClusterActivatedEvent.class.getName().equals(type)) {
// Return if topology has not been initialized
- if (!topology.isInitialized())
+ if (!topology.isInitialized()) {
return false;
+ }
// Parse complete message and build event
ClusterActivatedEvent event = (ClusterActivatedEvent) Util.
jsonToObject(message, ClusterActivatedEvent.class);
- // Apply service filter
- if (TopologyServiceFilter.getInstance().isActive()) {
- if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
- // Service is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
- }
- return false;
- }
+ TopologyManager.acquireReadLockForServices();
+ TopologyManager.acquireWriteLockForService(event.getServiceName());
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyManager.releaseWriteLockForService(event.getServiceName());
+ TopologyManager.releaseReadLockForServices();
}
- // Apply cluster filter
- if (TopologyClusterFilter.getInstance().isActive()) {
- if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
- // Cluster is excluded, do not update topology or fire event
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
- }
- return false;
- }
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
+ }
+ }
+
+ private boolean doProcess (ClusterActivatedEvent event,Topology topology) {
- // Validate event against the existing topology
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceName()));
+ // Apply service filter
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+ // Service is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
}
return false;
}
- Cluster cluster = service.getCluster(event.getClusterId());
+ }
- if (cluster == null) {
- if (log.isWarnEnabled()) {
- log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
- event.getClusterId()));
- }
- } else {
- // Apply changes to the topology
- cluster.setStatus(Status.Activated);
- if (log.isInfoEnabled()) {
- log.info(String.format("Cluster updated as activated : %s",
- cluster.toString()));
+ // Apply cluster filter
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+ // Cluster is excluded, do not update topology or fire event
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
}
+ return false;
}
+ }
- // Notify event listeners
- notifyEventListeners(event);
- return true;
+ // Validate event against the existing topology
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service does not exist: [service] %s",
+ event.getServiceName()));
+ }
+ return false;
+ }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
+ event.getClusterId()));
+ }
} else {
- if (nextProcessor != null) {
- // ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
- } else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ // Apply changes to the topology
+ cluster.setStatus(Status.Activated);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Cluster updated as activated : %s",
+ cluster.toString()));
}
}
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
}
+
}