You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2014/10/31 11:37:26 UTC
[3/4] git commit: moving applications stuff to autoscaler - II
moving applications stuff to autoscaler - II
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c42d7c1d
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c42d7c1d
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c42d7c1d
Branch: refs/heads/4.0.0-grouping
Commit: c42d7c1dc232256e4bbb7a10bda7e8f110ddb795
Parents: d4f90be
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Fri Oct 31 15:55:27 2014 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Fri Oct 31 16:07:05 2014 +0530
----------------------------------------------------------------------
.../topic/ApplicationsEventPublisher.java | 56 ++++-----
.../AutoscalerTopologyEventReceiver.java | 7 +-
.../monitor/ApplicationMonitorFactory.java | 13 +-
.../status/checker/StatusChecker.java | 21 ++--
.../topology/TopologyEventPublisher.java | 2 +-
.../domain/applications/Applications.java | 2 +-
.../locking/ApplicationLockHierarchy.java | 4 +-
.../topology/locking/TopologyLockHierarchy.java | 42 -------
.../ApplicationActivatedMessageProcessor.java | 24 ++--
.../ApplicationCreatedMessageProcessor.java | 18 ++-
.../ApplicationInactivatedMessageProcessor.java | 24 ++--
.../ApplicationTerminatedMessageProcessor.java | 43 +++----
.../ApplicationTerminatingMessageProcessor.java | 24 ++--
.../ApplicationUndeployedMessageProcessor.java | 61 +++++-----
.../CompleteApplicationsMessageProcessor.java | 17 +--
.../applications/GroupActivatedProcessor.java | 20 ++--
.../applications/GroupCreatedProcessor.java | 24 ++--
.../applications/GroupInActivateProcessor.java | 22 ++--
.../applications/GroupTerminatedProcessor.java | 24 ++--
.../applications/GroupTerminatingProcessor.java | 24 ++--
.../updater/ApplicationsUpdater.java | 23 ++--
.../CompleteTopologyMessageProcessor.java | 15 ---
.../topology/updater/TopologyUpdater.java | 68 +----------
.../applications/ApplicationManager.java | 119 +++++++++++++++++++
.../receiver/topology/TopologyManager.java | 67 +----------
25 files changed, 351 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
index f8d989f..bdbffc0 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
@@ -7,7 +7,7 @@ import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.applications.*;
import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.event.applications.*;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager;
import org.apache.stratos.messaging.util.Constants;
import java.util.Set;
@@ -21,8 +21,8 @@ public class ApplicationsEventPublisher {
public static void sendGroupCreatedEvent(String appId, String groupId) {
try {
- TopologyManager.acquireReadLockForApplication(appId);
- Application application = TopologyManager.getTopology().getApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (application != null) {
Group group = application.getGroupRecursively(groupId);
if (group.isStateTransitionValid(GroupStatus.Created)) {
@@ -39,14 +39,14 @@ public class ApplicationsEventPublisher {
}
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
}
public static void sendGroupActivatedEvent(String appId, String groupId) {
try {
- TopologyManager.acquireReadLockForApplication(appId);
- Application application = TopologyManager.getTopology().getApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (application != null) {
Group group = application.getGroupRecursively(groupId);
if (group.isStateTransitionValid(GroupStatus.Active)) {
@@ -63,14 +63,14 @@ public class ApplicationsEventPublisher {
}
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
}
public static void sendGroupInActivateEvent(String appId, String groupId) {
try {
- TopologyManager.acquireReadLockForApplication(appId);
- Application application = TopologyManager.getTopology().getApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (application != null) {
Group group = application.getGroupRecursively(groupId);
if (group.isStateTransitionValid(GroupStatus.Inactive)) {
@@ -87,14 +87,14 @@ public class ApplicationsEventPublisher {
}
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
}
public static void sendGroupTerminatingEvent(String appId, String groupId) {
try {
- TopologyManager.acquireReadLockForApplication(appId);
- Application application = TopologyManager.getTopology().getApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (application != null) {
Group group = application.getGroupRecursively(groupId);
if (group.isStateTransitionValid(GroupStatus.Terminating)) {
@@ -110,7 +110,7 @@ public class ApplicationsEventPublisher {
}
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
}
@@ -122,8 +122,8 @@ public class ApplicationsEventPublisher {
}
try {
- TopologyManager.acquireReadLockForApplication(appId);
- Application application = TopologyManager.getTopology().getApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (application != null) {
Group group = application.getGroupRecursively(groupId);
if (group.isStateTransitionValid(GroupStatus.Terminated)) {
@@ -135,7 +135,7 @@ public class ApplicationsEventPublisher {
}
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
@@ -143,8 +143,8 @@ public class ApplicationsEventPublisher {
public static void sendApplicationActivatedEvent(String appId) {
try {
- TopologyManager.acquireReadLockForApplication(appId);
- Application application = TopologyManager.getTopology().getApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (application != null) {
if (application.isStateTransitionValid(ApplicationStatus.Active)) {
if (log.isInfoEnabled()) {
@@ -159,7 +159,7 @@ public class ApplicationsEventPublisher {
}
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
}
@@ -169,8 +169,8 @@ public class ApplicationsEventPublisher {
}
try {
- TopologyManager.acquireReadLockForApplication(appId);
- Application application = TopologyManager.getTopology().getApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (application != null) {
if (application.isStateTransitionValid(ApplicationStatus.Inactive)) {
ApplicationInactivatedEvent applicationInActivatedEvent =
@@ -181,14 +181,14 @@ public class ApplicationsEventPublisher {
}
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
}
public static void sendApplicationTerminatingEvent(String appId) {
try {
- TopologyManager.acquireReadLockForApplication(appId);
- Application application = TopologyManager.getTopology().getApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (application != null) {
if (application.isStateTransitionValid(ApplicationStatus.Terminating)) {
if (log.isInfoEnabled()) {
@@ -202,14 +202,14 @@ public class ApplicationsEventPublisher {
}
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
}
public static void sendApplicationTerminatedEvent(String appId, Set<ClusterDataHolder> clusterData) {
try {
- TopologyManager.acquireReadLockForApplication(appId);
- Application application = TopologyManager.getTopology().getApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (application != null) {
if (application.isStateTransitionValid(ApplicationStatus.Terminated)) {
if (log.isInfoEnabled()) {
@@ -223,7 +223,7 @@ public class ApplicationsEventPublisher {
}
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index 18929ed..dcf3a82 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -45,6 +45,7 @@ import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.event.topology.*;
import org.apache.stratos.messaging.listener.applications.ApplicationUndeployedEventListener;
import org.apache.stratos.messaging.listener.topology.*;
+import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
@@ -103,7 +104,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
TopologyManager.acquireReadLock();
try {
- for (Application application : TopologyManager.getTopology().getApplications()) {
+ for (Application application : ApplicationManager.getApplications().getApplications().values()) {
startApplicationMonitor(application.getUniqueIdentifier());
}
@@ -172,8 +173,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
log.info("[ClusterCreatedEvent] Received: " + event.getClass());
- ClusterResetEvent clusterCreatedEvent = (ClusterResetEvent) event;
- String clusterId = clusterCreatedEvent.getClusterId();
+ ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event;
+ String clusterId = clusterCreatedEvent.getCluster().getClusterId();
AbstractClusterMonitor clusterMonitor =
(AbstractClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
index 1e64d78..f693310 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
@@ -47,6 +47,7 @@ import org.apache.stratos.cloud.controller.stub.pojo.Property;
import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.applications.Group;
import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Constants;
@@ -102,10 +103,10 @@ public class ApplicationMonitorFactory {
throws DependencyBuilderException,
TopologyInConsistentException {
GroupMonitor groupMonitor;
- TopologyManager.acquireReadLockForApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
try {
- Group group = TopologyManager.getTopology().getApplication(appId).getGroupRecursively(context.getId());
+ Group group = ApplicationManager.getApplications().getApplication(appId).getGroupRecursively(context.getId());
groupMonitor = new GroupMonitor(group, appId);
groupMonitor.setAppId(appId);
if(parentMonitor != null) {
@@ -126,7 +127,7 @@ public class ApplicationMonitorFactory {
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
return groupMonitor;
@@ -146,9 +147,9 @@ public class ApplicationMonitorFactory {
throws DependencyBuilderException,
TopologyInConsistentException {
ApplicationMonitor applicationMonitor;
- TopologyManager.acquireReadLockForApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
try {
- Application application = TopologyManager.getTopology().getApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (application != null) {
applicationMonitor = new ApplicationMonitor(application);
applicationMonitor.setHasDependent(false);
@@ -158,7 +159,7 @@ public class ApplicationMonitorFactory {
throw new TopologyInConsistentException(msg);
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
return applicationMonitor;
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
index eafa269..5468f6d 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
@@ -30,6 +30,7 @@ import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
import org.apache.stratos.messaging.domain.applications.*;
import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import java.util.Map;
@@ -98,8 +99,8 @@ public class StatusChecker {
if (cluster != null) {
try {
- TopologyManager.acquireReadLockForApplication(appId);
- Application application = TopologyManager.getTopology().getApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (!clusterMonitorHasMembers && cluster.getStatus() == ClusterStatus.Terminating) {
if (application.getStatus() == ApplicationStatus.Terminating) {
@@ -123,7 +124,7 @@ public class StatusChecker {
}*/
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
}
}
@@ -233,22 +234,22 @@ public class StatusChecker {
Runnable group = new Runnable() {
public void run() {
try {
- TopologyManager.acquireReadLockForApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
ParentComponent component;
if (groupId.equals(appId)) {
//it is an application
- component = TopologyManager.getTopology().
+ component = ApplicationManager.getApplications().
getApplication(appId);
} else {
//it is a group
- component = TopologyManager.getTopology().
+ component = ApplicationManager.getApplications().
getApplication(appId).getGroupRecursively(groupId);
}
Map<String, ClusterDataHolder> clusterIds = component.getClusterDataMap();
Map<String, Group> groups = component.getAliasToGroupMap();
updateChildStatus(appId, idOfChild, groups, clusterIds, component);
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
@@ -286,8 +287,8 @@ public class StatusChecker {
clusterStatus = getClusterStatus(clusterData);
groupStatus = getGroupStatus(groups);
try {
- TopologyManager.acquireReadLockForApplication(appId);
- Application application = TopologyManager.getTopology().getApplication(appId);
+ ApplicationManager.acquireReadLockForApplication(appId);
+ Application application = ApplicationManager.getApplications().getApplication(appId);
if (groups.isEmpty() && getAllClusterInSameState(clusterData,ClusterStatus.Active) ||
clusterData.isEmpty() && getAllGroupInSameState(groups, GroupStatus.Active) ||
@@ -352,7 +353,7 @@ public class StatusChecker {
log.warn("Clusters/groups not found in this [component] " + appId);
}
} finally {
- TopologyManager.releaseReadLockForApplication(appId);
+ ApplicationManager.releaseReadLockForApplication(appId);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
index f96ba8e..2e0883b 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
@@ -78,7 +78,7 @@ public class TopologyEventPublisher {
}
public static void sendClusterCreatedEvent(String appId, String serviceName, String clusterId) {
- ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent(appId,serviceName, clusterId);
+ ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent(new Cluster());
if(log.isInfoEnabled()) {
log.info("Publishing cluster created event: " +clusterId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java
index e5a7921..e1feb82 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java
@@ -68,6 +68,6 @@ public class Applications implements Serializable {
public synchronized void removeApplication (String appId) {
this.applicationMap.remove(appId);
- ApplicationLockHierarchy.getInstance().removeLock(appId);
+ ApplicationLockHierarchy.getInstance().removeLockForApplication(appId);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java
index cc31892..2b457d7 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java
@@ -71,11 +71,11 @@ public class ApplicationLockHierarchy {
}
}
- public ApplicationLock getLock (String appId) {
+ public ApplicationLock getLockForApplication(String appId) {
return appIdToApplicationLockMap.get(appId);
}
- public void removeLock (String appId) {
+ public void removeLockForApplication (String appId) {
if (appIdToApplicationLockMap.remove(appId) != null) {
log.info("Removed lock for Application " + appId);
} else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
index bb6b8fa..e89df3a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
@@ -35,15 +35,9 @@ public class TopologyLockHierarchy {
// lock for Services
private final TopologyLock serviceLock;
- // lock for Applications
- private final TopologyLock applicatioLock;
-
// key = Service.name
private final Map<String, TopologyLock> serviceNameToTopologyLockMap;
- // key = Application.id
- private final Map<String, TopologyLock> applicationIdToTopologyLockMap;
-
// key = Cluster.id
private final Map<String, TopologyLock> clusterIdToTopologyLockMap;
@@ -53,9 +47,7 @@ public class TopologyLockHierarchy {
this.completeTopologyLock = new TopologyLock();
this.serviceLock = new TopologyLock();
- this.applicatioLock = new TopologyLock();
this.serviceNameToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>();
- this.applicationIdToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>();
this.clusterIdToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>();
}
@@ -72,26 +64,6 @@ public class TopologyLockHierarchy {
return topologyLockHierarchy;
}
- public void addApplicationLock (String appId, final TopologyLock topologyLock) {
-
- if (!applicationIdToTopologyLockMap.containsKey(appId)) {
- synchronized (applicationIdToTopologyLockMap) {
- if (!applicationIdToTopologyLockMap.containsKey(appId)) {
- applicationIdToTopologyLockMap.put(appId, topologyLock);
- log.info("Added lock for Application " + appId);
- }
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Topology Lock for Application " + appId + " already exists");
- }
- }
- }
-
- public TopologyLock getTopologyLockForApplication (String appId) {
- return applicationIdToTopologyLockMap.get(appId);
- }
-
public void addServiceLock (String serviceName, final TopologyLock topologyLock) {
if (!serviceNameToTopologyLockMap.containsKey(serviceName)) {
@@ -132,16 +104,6 @@ public class TopologyLockHierarchy {
return clusterIdToTopologyLockMap.get(clusterId);
}
- public void removeTopologyLockForApplication (String appId) {
- if (applicationIdToTopologyLockMap.remove(appId) != null) {
- log.info("Removed lock for Application " + appId);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Lock already removed for Application " + appId);
- }
- }
- }
-
public void removeTopologyLockForService (String serviceName) {
if (serviceNameToTopologyLockMap.remove(serviceName) != null) {
log.info("Removed lock for Service " + serviceName);
@@ -166,10 +128,6 @@ public class TopologyLockHierarchy {
return serviceLock;
}
- public TopologyLock getApplicatioLock() {
- return applicatioLock;
- }
-
public TopologyLock getCompleteTopologyLock() {
return completeTopologyLock;
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java
index 6c82925..da75265 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java
@@ -22,9 +22,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.Applications;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ApplicationActivatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
@@ -46,40 +48,40 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Applications applications = (Applications) object;
if (ApplicationActivatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
return false;
// Parse complete message and build event
ApplicationActivatedEvent event = (ApplicationActivatedEvent) Util.
jsonToObject(message, ApplicationActivatedEvent.class);
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
try {
- return doProcess(event, topology);
+ return doProcess(event, applications);
} finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
}
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, applications);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
}
- private boolean doProcess(ApplicationActivatedEvent event, Topology topology) {
+ private boolean doProcess(ApplicationActivatedEvent event, Applications applications) {
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
+ // Validate event against the existing applications
+ Application application = applications.getApplication(event.getAppId());
if (application == null) {
if (log.isWarnEnabled()) {
log.warn(String.format("Application does not exist: [service] %s",
@@ -87,7 +89,7 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor {
}
return false;
} else {
- // Apply changes to the topology
+ // Apply changes to the applications
if (!application.isStateTransitionValid(ApplicationStatus.Active)) {
log.error("Invalid State transfer from [ " + application.getStatus() +
" ] to [ " + ApplicationStatus.Active + " ]");
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
index db8e3c8..cfe8500 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
@@ -22,15 +22,11 @@ package org.apache.stratos.messaging.message.processor.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.event.applications.ApplicationCreatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
import org.apache.stratos.messaging.util.Util;
-import java.util.Set;
-
public class ApplicationCreatedMessageProcessor extends MessageProcessor {
private static final Log log = LogFactory.getLog(ApplicationCreatedMessageProcessor.class);
@@ -57,12 +53,12 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
return false;
}
- TopologyUpdater.acquireWriteLockForApplications();
+ ApplicationsUpdater.acquireWriteLockForApplications();
try {
return doProcess(event, applications);
} finally {
- TopologyUpdater.releaseWriteLockForApplications();
+ ApplicationsUpdater.releaseWriteLockForApplications();
}
} else {
@@ -75,7 +71,7 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
}
}
- private boolean doProcess(ApplicationCreatedEvent event, Applications topology) {
+ private boolean doProcess(ApplicationCreatedEvent event, Applications applications) {
// check if required properties are available
if (event.getApplication() == null) {
@@ -90,13 +86,13 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
throw new RuntimeException(errorMsg);
}
- // check if an Application with same name exists in topology
- if (topology.applicationExists(event.getApplication().getUniqueIdentifier())) {
+ // check if an Application with same name exists in applications
+ if (applications.applicationExists(event.getApplication().getUniqueIdentifier())) {
log.warn("Application with id [ " + event.getApplication().getUniqueIdentifier() + " ] already exists in Topology");
} else {
// add application and the clusters to Topology
- topology.addApplication(event.getApplication());
+ applications.addApplication(event.getApplication());
}
notifyEventListeners(event);
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
index 91eae8c..e97e3fc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
@@ -22,9 +22,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.Applications;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
@@ -46,40 +48,40 @@ public class ApplicationInactivatedMessageProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Applications applications = (Applications) object;
if (ApplicationInactivatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
return false;
// Parse complete message and build event
ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util.
jsonToObject(message, ApplicationInactivatedEvent.class);
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
try {
- return doProcess(event, topology);
+ return doProcess(event, applications);
} finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
}
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, applications);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
}
- private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) {
+ private boolean doProcess (ApplicationInactivatedEvent event, Applications applications) {
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
+ // Validate event against the existing applications
+ Application application = applications.getApplication(event.getAppId());
if (application == null) {
if (log.isWarnEnabled()) {
log.warn(String.format("Application does not exist: [service] %s",
@@ -87,7 +89,7 @@ public class ApplicationInactivatedMessageProcessor extends MessageProcessor {
}
return false;
} else {
- // Apply changes to the topology
+ // Apply changes to the applications
if (!application.isStateTransitionValid(ApplicationStatus.Inactive)) {
log.error("Invalid State transfer from [ " + application.getStatus() +
" ] to [ " + ApplicationStatus.Inactive + " ]");
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
index 8cd2182..99d08fe 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
@@ -20,10 +20,12 @@ package org.apache.stratos.messaging.message.processor.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Applications;
import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.event.topology.ApplicationTerminatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
@@ -47,18 +49,18 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Applications applications = (Applications) object;
if (ApplicationTerminatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
return false;
// Parse complete message and build event
ApplicationTerminatedEvent event = (ApplicationTerminatedEvent) Util.
jsonToObject(message, ApplicationTerminatedEvent.class);
- TopologyUpdater.acquireWriteLockForApplications();
+ ApplicationsUpdater.acquireWriteLockForApplications();
Set<ClusterDataHolder> clusterDataHolders = event.getClusterData();
if (clusterDataHolders != null) {
for (ClusterDataHolder clusterData : clusterDataHolders) {
@@ -67,10 +69,10 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
}
try {
- return doProcess(event, topology);
+ return doProcess(event, applications);
} finally {
- TopologyUpdater.releaseWriteLockForApplications();
+ ApplicationsUpdater.releaseWriteLockForApplications();
if (clusterDataHolders != null) {
for (ClusterDataHolder clusterData : clusterDataHolders) {
TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType());
@@ -81,14 +83,14 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, applications);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
}
- private boolean doProcess (ApplicationTerminatedEvent event, Topology topology) {
+ private boolean doProcess (ApplicationTerminatedEvent event, Applications applications) {
// check if required properties are available
if (event.getAppId() == null) {
@@ -103,25 +105,26 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
throw new RuntimeException(errorMsg);
}
- // check if an Application with same name exists in topology
+ // check if an Application with same name exists in applications
String appId = event.getAppId();
- if (topology.applicationExists(appId)) {
+ if (applications.applicationExists(appId)) {
log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it");
- topology.removeApplication(appId);
+ applications.removeApplication(appId);
}
if (event.getClusterData() != null) {
// remove the Clusters from the Topology
for (ClusterDataHolder clusterData : event.getClusterData()) {
- Service service = topology.getService(clusterData.getServiceType());
- if (service != null) {
- service.removeCluster(clusterData.getClusterId());
- if (log.isDebugEnabled()) {
- log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology");
- }
- } else {
- log.warn("Service " + clusterData.getServiceType() + " not found in Topology!");
- }
+ log.info("################################ TODO ################################");
+// Service service = applications.getService(clusterData.getServiceType());
+// if (service != null) {
+// service.removeCluster(clusterData.getClusterId());
+// if (log.isDebugEnabled()) {
+// log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology");
+// }
+// } else {
+// log.warn("Service " + clusterData.getServiceType() + " not found in Topology!");
+// }
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
index 057d013..633e080 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
@@ -22,9 +22,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.Applications;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ApplicationTerminatingEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
@@ -46,40 +48,40 @@ public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Applications applications = (Applications) object;
if (ApplicationTerminatingEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
return false;
// Parse complete message and build event
ApplicationTerminatingEvent event = (ApplicationTerminatingEvent) Util.
jsonToObject(message, ApplicationTerminatingEvent.class);
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
try {
- return doProcess(event, topology);
+ return doProcess(event, applications);
} finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
}
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, applications);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
}
- private boolean doProcess (ApplicationTerminatingEvent event, Topology topology) {
+ private boolean doProcess (ApplicationTerminatingEvent event, Applications applications) {
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
+ // Validate event against the existing applications
+ Application application = applications.getApplication(event.getAppId());
if (application == null) {
if (log.isWarnEnabled()) {
log.warn(String.format("Application does not exist: [service] %s",
@@ -87,7 +89,7 @@ public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
}
return false;
} else {
- // Apply changes to the topology
+ // Apply changes to the applications
if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) {
log.error("Invalid State transfer from [ " + application.getStatus() +
" ] to [ " + ApplicationStatus.Terminating + " ]");
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java
index 7e91ab8..e911b77 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java
@@ -23,10 +23,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.Applications;
import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.event.topology.ApplicationUndeployedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
@@ -46,10 +48,10 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Applications applications = (Applications) object;
if (ApplicationUndeployedEvent.class.getName().equals(type)) {
- if (!topology.isInitialized()) {
+ if (!applications.isInitialized()) {
return false;
}
@@ -61,7 +63,7 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor {
}
// get write lock for the application and relevant Clusters
- TopologyUpdater.acquireWriteLockForApplication(event.getApplicationId());
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getApplicationId());
Set<ClusterDataHolder> clusterDataHolders = event.getClusterData();
if (clusterDataHolders != null) {
for (ClusterDataHolder clusterData : clusterDataHolders) {
@@ -71,7 +73,7 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor {
}
try {
- return doProcess(event, topology);
+ return doProcess(event, applications);
} finally {
// remove locks
@@ -81,13 +83,13 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor {
clusterData.getClusterId());
}
}
- TopologyUpdater.releaseWriteLockForApplication(event.getApplicationId());
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getApplicationId());
}
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, applications);
} else {
throw new RuntimeException(String.format
("Failed to process message using available message processors: [type] %s [body] %s", type, message));
@@ -95,10 +97,10 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor {
}
}
- private boolean doProcess (ApplicationUndeployedEvent event, Topology topology) {
+ private boolean doProcess (ApplicationUndeployedEvent event, Applications applications) {
// update the application status to Terminating
- Application application = topology.getApplication(event.getApplicationId());
+ Application application = applications.getApplication(event.getApplicationId());
// check and update application status to 'Terminating'
if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) {
log.error("Invalid state transfer from " + application.getStatus() + " to " + ApplicationStatus.Terminating);
@@ -110,27 +112,28 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor {
Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
// update the Cluster statuses to Terminating
for (ClusterDataHolder clusterDataHolder : clusterData) {
- Service service = topology.getService(clusterDataHolder.getServiceType());
- if (service != null) {
- Cluster aCluster = service.getCluster(clusterDataHolder.getClusterId());
- if (aCluster != null) {
- // validate state transition
- if (!aCluster.isStateTransitionValid(ClusterStatus.Terminating)) {
- log.error("Invalid state transfer from " + aCluster.getStatus() + " to "
- + ClusterStatus.Terminating);
- }
- // for now anyway update the status forcefully
- aCluster.setStatus(ClusterStatus.Terminating);
-
- } else {
- log.warn("Unable to find Cluster with cluster id " + clusterDataHolder.getClusterId() +
- " in Topology");
- }
-
- } else {
- log.warn("Unable to remove cluster with cluster id: " + clusterDataHolder.getClusterId() + " from Topology, " +
- " associated Service [ " + clusterDataHolder.getServiceType() + " ] npt found");
- }
+ log.info("############################### TODO ###############################");
+// Service service = applications.getService(clusterDataHolder.getServiceType());
+// if (service != null) {
+// Cluster aCluster = service.getCluster(clusterDataHolder.getClusterId());
+// if (aCluster != null) {
+// // validate state transition
+// if (!aCluster.isStateTransitionValid(ClusterStatus.Terminating)) {
+// log.error("Invalid state transfer from " + aCluster.getStatus() + " to "
+// + ClusterStatus.Terminating);
+// }
+// // for now anyway update the status forcefully
+// aCluster.setStatus(ClusterStatus.Terminating);
+//
+// } else {
+// log.warn("Unable to find Cluster with cluster id " + clusterDataHolder.getClusterId() +
+// " in Topology");
+// }
+//
+// } else {
+// log.warn("Unable to remove cluster with cluster id: " + clusterDataHolder.getClusterId() + " from Topology, " +
+// " associated Service [ " + clusterDataHolder.getServiceType() + " ] npt found");
+// }
}
notifyEventListeners(event);
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java
index 53c469b..c9af67a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java
@@ -22,25 +22,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
-import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
import org.apache.stratos.messaging.event.applications.CompleteApplicationsEvent;
-import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
-import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
-import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
-import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
public class CompleteApplicationsMessageProcessor extends MessageProcessor {
@@ -62,13 +49,13 @@ public class CompleteApplicationsMessageProcessor extends MessageProcessor {
jsonToObject(message, CompleteApplicationsEvent.class);
if (!applications.isInitialized()) {
- ApplicationsUpdater.acquireWriteLock();
+ ApplicationsUpdater.acquireWriteLockForApplications();
try {
doProcess(event, applications);
} finally {
- ApplicationsUpdater.releaseWriteLock();
+ ApplicationsUpdater.releaseWriteLockForApplications();
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
index 845e933..5c8d477 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.processor.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
import org.apache.stratos.messaging.domain.applications.Group;
import org.apache.stratos.messaging.domain.applications.GroupStatus;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
@@ -43,40 +45,40 @@ public class GroupActivatedProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Applications applications = (Applications) object;
if (GroupActivatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
return false;
// Parse complete message and build event
GroupActivatedEvent event = (GroupActivatedEvent) Util.
jsonToObject(message, GroupActivatedEvent.class);
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
try {
- return doProcess(event, topology);
+ return doProcess(event, applications);
} finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
}
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, applications);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
}
- private boolean doProcess (GroupActivatedEvent event,Topology topology) {
+ private boolean doProcess (GroupActivatedEvent event, Applications applications) {
// Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
+ Application application = applications.getApplication(event.getAppId());
if (application == null) {
if (log.isWarnEnabled()) {
log.warn(String.format("Application does not exist: [service] %s",
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java
index 47d4457..67861e4 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.processor.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
import org.apache.stratos.messaging.domain.applications.Group;
import org.apache.stratos.messaging.domain.applications.GroupStatus;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.GroupCreatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
@@ -43,40 +45,40 @@ public class GroupCreatedProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Applications applications = (Applications) object;
if (GroupCreatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
return false;
// Parse complete message and build event
GroupCreatedEvent event = (GroupCreatedEvent) Util.
jsonToObject(message, GroupCreatedEvent.class);
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
try {
- return doProcess(event, topology);
+ return doProcess(event, applications);
} finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
}
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, applications);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
}
- private boolean doProcess (GroupCreatedEvent event,Topology topology) {
+ private boolean doProcess (GroupCreatedEvent event,Applications applications) {
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
+ // Validate event against the existing applications
+ Application application = applications.getApplication(event.getAppId());
if (application == null) {
if (log.isWarnEnabled()) {
log.warn(String.format("Application does not exist: [service] %s",
@@ -92,7 +94,7 @@ public class GroupCreatedProcessor extends MessageProcessor {
event.getGroupId()));
}
} else {
- // Apply changes to the topology
+ // Apply changes to the applications
if (!group.isStateTransitionValid(GroupStatus.Created)) {
log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Created + " " +
"for Group " + group.getAlias());
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
index 063a3de..4f2e581 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.processor.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
import org.apache.stratos.messaging.domain.applications.Group;
import org.apache.stratos.messaging.domain.applications.GroupStatus;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.GroupInactivateEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
@@ -43,40 +45,40 @@ public class GroupInActivateProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Applications applications = (Applications) object;
if (GroupInactivateEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
return false;
// Parse complete message and build event
GroupInactivateEvent event = (GroupInactivateEvent) Util.
jsonToObject(message, GroupInactivateEvent.class);
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
try {
- return doProcess(event, topology);
+ return doProcess(event, applications);
} finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
}
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, applications);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
}
- private boolean doProcess(GroupInactivateEvent event, Topology topology) {
+ private boolean doProcess(GroupInactivateEvent event, Applications applications) {
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
+ // Validate event against the existing applications
+ Application application = applications.getApplication(event.getAppId());
if (application == null) {
if (log.isWarnEnabled()) {
log.warn(String.format("Application does not exist: [service] %s",
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
index 3de0914..6e985b7 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.processor.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
import org.apache.stratos.messaging.domain.applications.Group;
import org.apache.stratos.messaging.domain.applications.GroupStatus;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
@@ -43,40 +45,40 @@ public class GroupTerminatedProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Applications applications = (Applications) object;
if (GroupTerminatedEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
return false;
// Parse complete message and build event
GroupTerminatedEvent event = (GroupTerminatedEvent) Util.
jsonToObject(message, GroupTerminatedEvent.class);
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
try {
- return doProcess(event, topology);
+ return doProcess(event, applications);
} finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
}
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, applications);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
}
- private boolean doProcess (GroupTerminatedEvent event,Topology topology) {
+ private boolean doProcess (GroupTerminatedEvent event, Applications applications) {
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
+ // Validate event against the existing applications
+ Application application = applications.getApplication(event.getAppId());
if (application == null) {
if (log.isWarnEnabled()) {
log.warn(String.format("Application does not exist: [service] %s",
@@ -92,7 +94,7 @@ public class GroupTerminatedProcessor extends MessageProcessor {
event.getGroupId()));
}
} else {
- // Apply changes to the topology
+ // Apply changes to the applications
if (!group.isStateTransitionValid(GroupStatus.Terminated)) {
log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Terminated);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
index e124b7b..c9a136f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.processor.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
import org.apache.stratos.messaging.domain.applications.Group;
import org.apache.stratos.messaging.domain.applications.GroupStatus;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
import org.apache.stratos.messaging.util.Util;
@@ -43,40 +45,40 @@ public class GroupTerminatingProcessor extends MessageProcessor {
@Override
public boolean process(String type, String message, Object object) {
- Topology topology = (Topology) object;
+ Applications applications = (Applications) object;
if (GroupTerminatingEvent.class.getName().equals(type)) {
- // Return if topology has not been initialized
- if (!topology.isInitialized())
+ // Return if applications has not been initialized
+ if (!applications.isInitialized())
return false;
// Parse complete message and build event
GroupTerminatingEvent event = (GroupTerminatingEvent) Util.
jsonToObject(message, GroupTerminatingEvent.class);
- TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId());
try {
- return doProcess(event, topology);
+ return doProcess(event, applications);
} finally {
- TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId());
}
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
- return nextProcessor.process(type, message, topology);
+ return nextProcessor.process(type, message, applications);
} else {
throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
}
}
}
- private boolean doProcess (GroupTerminatingEvent event,Topology topology) {
+ private boolean doProcess (GroupTerminatingEvent event,Applications applications) {
- // Validate event against the existing topology
- Application application = topology.getApplication(event.getAppId());
+ // Validate event against the existing applications
+ Application application = applications.getApplication(event.getAppId());
if (application == null) {
if (log.isWarnEnabled()) {
log.warn(String.format("Application does not exist: [service] %s",
@@ -92,7 +94,7 @@ public class GroupTerminatingProcessor extends MessageProcessor {
event.getGroupId()));
}
} else {
- // Apply changes to the topology
+ // Apply changes to the applications
if (!group.isStateTransitionValid(GroupStatus.Terminating)) {
log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
}