You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/11 14:20:21 UTC
[1/3] Implemented topology member filter
Updated Branches:
refs/heads/master 0bfb3bea7 -> 3483dec7e
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
index f8d52d6..c19c3cb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
@@ -26,9 +26,10 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
import org.apache.stratos.messaging.util.Util;
public class MemberTerminatedMessageProcessor extends MessageProcessor {
@@ -54,8 +55,8 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor {
MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
// Apply service filter
- if (ServiceFilter.getInstance().isActive()) {
- if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
// Service is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -65,8 +66,8 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor {
}
// Apply cluster filter
- if (ClusterFilter.getInstance().isActive()) {
- if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
// Cluster is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
@@ -75,6 +76,16 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor {
}
}
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
+ }
+ return false;
+ }
+ }
+
// Validate event against the existing topology
Service service = topology.getService(event.getServiceName());
if (service == null) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
index 8a8a452..02fb87c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
@@ -23,8 +23,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
import org.apache.stratos.messaging.util.Util;
public class ServiceCreatedMessageProcessor extends MessageProcessor {
@@ -50,8 +50,8 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor {
ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
// Apply service filter
- if (ServiceFilter.getInstance().isActive()) {
- if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
// Service is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
index 5f31ec2..3b9e88f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
@@ -23,8 +23,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
import org.apache.stratos.messaging.util.Util;
public class ServiceRemovedMessageProcessor extends MessageProcessor {
@@ -50,8 +50,8 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
// Apply service filter
- if (ServiceFilter.getInstance().isActive()) {
- if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
// Service is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
index a92413d..49e6094 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
@@ -28,4 +28,18 @@ public class Constants {
public static final String TENANT_RANGE_DELIMITER = "-";
public static final String EVENT_CLASS_NAME = "event-class-name";
+
+ /* Topology filter constants */
+ public static final String FILTER_VALUE_ASSIGN_OPERATOR="=";
+ public static final String FILTER_KEY_VALUE_SEPARATOR = ";";
+ public static final String FILTER_VALUE_SEPARATOR = ",";
+
+ public static final String TOPOLOGY_SERVICE_FILTER = "stratos.topology.service.filter";
+ public static final String TOPOLOGY_SERVICE_FILTER_SERVICE_NAME = "service.name";
+
+ public static final String TOPOLOGY_CLUSTER_FILTER = "stratos.topology.cluster.filter";
+ public static final String TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID = "cluster.id";
+
+ public static final String TOPOLOGY_MEMBER_FILTER = "stratos.topology.member.filter";
+ public static final String TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID = "lb.cluster.id";
}
[3/3] git commit: Merge remote-tracking branch 'origin/master'
Posted by im...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/3483dec7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/3483dec7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/3483dec7
Branch: refs/heads/master
Commit: 3483dec7edc6a66a445bc3e6264c9fd10fac16da
Parents: 7a5a6b8 0bfb3be
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Dec 11 18:50:11 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Wed Dec 11 18:50:11 2013 +0530
----------------------------------------------------------------------
.../manager/CartridgeSubscriptionManager.java | 114 +++++++++++++++++
.../apache/stratos/adc/mgt/payload/Payload.java | 12 +-
.../ApplicationCartridgeSubscription.java | 7 +-
.../mgt/subscription/CartridgeSubscription.java | 28 +++--
.../subscription/DataCartridgeSubscription.java | 5 +-
.../FrameworkCartridgeSubscription.java | 10 +-
.../subscription/LBCartridgeSubscription.java | 5 +-
.../factory/CartridgeSubscriptionFactory.java | 6 +-
.../ServiceDeploymentMultiTenantBehaviour.java | 121 +++++++++++++++++++
.../SubscriptionMultiTenantBehaviour.java | 2 +-
.../definition/ServiceDefinitionBean.java | 78 ++++++++++++
.../rest/endpoint/services/AbstractAdmin.java | 4 +
.../rest/endpoint/services/ServiceUtils.java | 30 +++++
.../rest/endpoint/services/StratosAdmin.java | 22 ++++
.../src/main/resources/addTenantRequest.txt | 2 +-
.../src/main/resources/getTenantList.txt | 2 +-
16 files changed, 419 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
[2/3] git commit: Implemented topology member filter
Posted by im...@apache.org.
Implemented topology member filter
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7a5a6b81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7a5a6b81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7a5a6b81
Branch: refs/heads/master
Commit: 7a5a6b815914dcf7eaeacf7ed8ae4a6251ef3de2
Parents: a10d95f
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Dec 11 18:49:56 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Wed Dec 11 18:49:56 2013 +0530
----------------------------------------------------------------------
.../controller/topology/TopologyBuilder.java | 112 +++++--------------
.../topology/TopologyEventMessageDelegator.java | 16 +--
.../topology/TopologyEventSender.java | 56 ++--------
.../conf/LoadBalancerConfiguration.java | 2 +-
.../load/balancer/conf/util/Constants.java | 1 +
.../internal/LoadBalancerServiceComponent.java | 12 +-
.../balancer/test/RoundRobinAlgorithmTest.java | 6 +-
components/org.apache.stratos.messaging/pom.xml | 5 +
.../messaging/domain/topology/Member.java | 23 ++--
.../event/topology/InstanceSpawnedEvent.java | 8 ++
.../event/topology/MemberStartedEvent.java | 9 ++
.../event/topology/MemberSuspendedEvent.java | 9 ++
.../event/topology/MemberTerminatedEvent.java | 9 ++
.../messaging/message/filter/MessageFilter.java | 94 ++++++++++++++++
.../message/filter/topology/ClusterFilter.java | 83 --------------
.../message/filter/topology/ServiceFilter.java | 83 --------------
.../filter/topology/TopologyClusterFilter.java | 65 +++++++++++
.../filter/topology/TopologyMemberFilter.java | 46 ++++++++
.../filter/topology/TopologyServiceFilter.java | 65 +++++++++++
.../ClusterCreatedMessageProcessor.java | 12 +-
.../ClusterRemovedMessageProcessor.java | 13 +--
.../CompleteTopologyMessageProcessor.java | 12 +-
.../InstanceSpawnedMessageProcessor.java | 27 +++--
.../MemberActivatedMessageProcessor.java | 23 +++-
.../topology/MemberStartedMessageProcessor.java | 23 +++-
.../MemberSuspendedMessageProcessor.java | 23 +++-
.../MemberTerminatedMessageProcessor.java | 23 +++-
.../ServiceCreatedMessageProcessor.java | 6 +-
.../ServiceRemovedMessageProcessor.java | 6 +-
.../stratos/messaging/util/Constants.java | 14 +++
30 files changed, 508 insertions(+), 378 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
index 39c7608..fb1fe5a 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
@@ -29,8 +29,8 @@ import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.instance.status.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.instance.status.MemberStartedEvent;
+import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
+import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
import java.util.List;
import java.util.Properties;
@@ -75,58 +75,6 @@ public class TopologyBuilder {
}
-// public static void handlePartitionCreated(Partition partition) {
-//
-// Topology topology = TopologyManager.getInstance().getTopology();
-// if (partition == null) {
-// throw new RuntimeException(String.format("Partition is empty"));
-// }
-// try {
-// TopologyManager.getInstance().acquireWriteLock();
-// topology.addPartition(partition);
-// TopologyManager.getInstance().updateTopology(topology);
-// } finally {
-// TopologyManager.getInstance().releaseWriteLock();
-// }
-// TopologyEventSender.sendPartitionCreatedEvent(partition);
-//
-// }
-
-// public static void handlePartitionUpdated(Partition newPartition, Partition oldPartition) {
-//
-// Topology topology = TopologyManager.getInstance().getTopology();
-// if (newPartition == null || oldPartition == null) {
-// throw new RuntimeException(String.format("Partition is empty"));
-// }
-// try {
-// TopologyManager.getInstance().acquireWriteLock();
-// topology.removePartition(oldPartition);
-// topology.addPartition(newPartition);
-// TopologyManager.getInstance().updateTopology(topology);
-// } finally {
-// TopologyManager.getInstance().releaseWriteLock();
-// }
-// TopologyEventSender.sendPartitionUpdatedEvent(newPartition, oldPartition.getId());
-//
-// }
-//
-// public static void handlePartitionRemoved(Partition partition) {
-//
-// Topology topology = TopologyManager.getInstance().getTopology();
-// if (partition == null) {
-// throw new RuntimeException(String.format("Partition is empty"));
-// }
-// try {
-// TopologyManager.getInstance().acquireWriteLock();
-// topology.removePartition(partition);
-// TopologyManager.getInstance().updateTopology(topology);
-// } finally {
-// TopologyManager.getInstance().releaseWriteLock();
-// }
-// TopologyEventSender.sendPartitionRemovedEvent(partition);
-// }
-
-
public static void handleServiceRemoved(List<Cartridge> cartridgeList) {
Topology topology = TopologyManager.getInstance().getTopology();
@@ -240,8 +188,7 @@ public class TopologyBuilder {
try {
TopologyManager.getInstance().acquireWriteLock();
- Member member = new Member(serviceName, clusterId, memberId);
- member.setPartitionId(partitionId);
+ Member member = new Member(serviceName, clusterId, partitionId, memberId);
member.setStatus(MemberStatus.Created);
member.setMemberIp(privateIp);
cluster.addMember(member);
@@ -249,28 +196,28 @@ public class TopologyBuilder {
} finally {
TopologyManager.getInstance().releaseWriteLock();
}
- TopologyEventSender.sendInstanceSpawnedEvent(serviceName, clusterId, memberId, partitionId);
+ TopologyEventSender.sendInstanceSpawnedEvent(serviceName, clusterId, partitionId, memberId);
}
- public static void handleMemberStarted(MemberStartedEvent memberStartedEvent) {
+ public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) {
Topology topology = TopologyManager.getInstance().getTopology();
- Service service = topology.getService(memberStartedEvent.getServiceName());
+ Service service = topology.getService(instanceStartedEvent.getServiceName());
if (service == null) {
throw new RuntimeException(String.format("Service %s does not exist",
- memberStartedEvent.getServiceName()));
+ instanceStartedEvent.getServiceName()));
}
- if (!service.clusterExists(memberStartedEvent.getClusterId())) {
+ if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
throw new RuntimeException(String.format("Cluster %s does not exist in service %s",
- memberStartedEvent.getClusterId(),
- memberStartedEvent.getServiceName()));
+ instanceStartedEvent.getClusterId(),
+ instanceStartedEvent.getServiceName()));
}
- Member member = service.getCluster(memberStartedEvent.getClusterId()).
- getMember(memberStartedEvent.getMemberId());
+ Member member = service.getCluster(instanceStartedEvent.getClusterId()).
+ getMember(instanceStartedEvent.getMemberId());
if (member == null) {
throw new RuntimeException(String.format("Member %s does not exist",
- memberStartedEvent.getMemberId()));
+ instanceStartedEvent.getMemberId()));
}
try {
TopologyManager.getInstance().acquireWriteLock();
@@ -282,39 +229,39 @@ public class TopologyBuilder {
TopologyManager.getInstance().releaseWriteLock();
}
//memberStartedEvent.
- TopologyEventSender.sendMemberStartedEvent(memberStartedEvent);
+ TopologyEventSender.sendMemberStartedEvent(instanceStartedEvent);
}
- public static void handleMemberActivated(MemberActivatedEvent memberActivatedEvent) {
+ public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) {
Topology topology = TopologyManager.getInstance().getTopology();
- Service service = topology.getService(memberActivatedEvent.getServiceName());
+ Service service = topology.getService(instanceActivatedEvent.getServiceName());
if (service == null) {
throw new RuntimeException(String.format("Service %s does not exist",
- memberActivatedEvent.getServiceName()));
+ instanceActivatedEvent.getServiceName()));
}
- Cluster cluster = service.getCluster(memberActivatedEvent.getClusterId());
+ Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId());
if (cluster == null) {
throw new RuntimeException(String.format("Cluster %s does not exist",
- memberActivatedEvent.getClusterId()));
+ instanceActivatedEvent.getClusterId()));
}
- Member member = cluster.getMember(memberActivatedEvent.getMemberId());
+ Member member = cluster.getMember(instanceActivatedEvent.getMemberId());
if (member == null) {
throw new RuntimeException(String.format("Member %s does not exist",
- memberActivatedEvent.getMemberId()));
+ instanceActivatedEvent.getMemberId()));
}
- org.apache.stratos.messaging.event.topology.MemberActivatedEvent memberActivatedEventTopology =
- new org.apache.stratos.messaging.event.topology.MemberActivatedEvent(memberActivatedEvent.getServiceName(),
- memberActivatedEvent.getClusterId(), memberActivatedEvent.getMemberId());
+ org.apache.stratos.messaging.event.topology.MemberActivatedEvent memberActivatedEvent =
+ new org.apache.stratos.messaging.event.topology.MemberActivatedEvent(instanceActivatedEvent.getServiceName(),
+ instanceActivatedEvent.getClusterId(), instanceActivatedEvent.getPartitionId(), instanceActivatedEvent.getMemberId());
try {
TopologyManager.getInstance().acquireWriteLock();
member.setStatus(MemberStatus.Activated);
log.info("member started event adding status activated");
Cartridge cartridge = FasterLookUpDataHolder.getInstance().
- getCartridge(memberActivatedEvent.getServiceName());
+ getCartridge(instanceActivatedEvent.getServiceName());
List<PortMapping> portMappings = cartridge.getPortMappings();
Port port;
@@ -324,17 +271,16 @@ public class TopologyBuilder {
Integer.parseInt(portMapping.getPort()),
Integer.parseInt(portMapping.getProxyPort()));
member.addPort(port);
- memberActivatedEventTopology.addPort(port);
+ memberActivatedEvent.addPort(port);
}
-
- memberActivatedEventTopology.setPartitionId(member.getPartitionId());
- memberActivatedEventTopology.setMemberIp(member.getMemberIp());
+
+ memberActivatedEvent.setMemberIp(member.getMemberIp());
TopologyManager.getInstance().updateTopology(topology);
} finally {
TopologyManager.getInstance().releaseWriteLock();
}
- TopologyEventSender.sendMemberActivatedEvent(memberActivatedEventTopology);
+ TopologyEventSender.sendMemberActivatedEvent(memberActivatedEvent);
}
public static void handleMemberTerminated(String serviceName, String clusterId, String memberId) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java
index 9a975bb..098f7d5 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java
@@ -20,8 +20,8 @@ package org.apache.stratos.cloud.controller.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.instance.status.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.instance.status.MemberStartedEvent;
+import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
+import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
import org.apache.stratos.messaging.util.Constants;
import org.apache.stratos.messaging.util.Util;
@@ -45,12 +45,12 @@ public class TopologyEventMessageDelegator implements Runnable {
log.info(String.format("Event message received from queue: %s", type));
- if(MemberStartedEvent.class.getName().equals(type)) {
- TopologyBuilder.handleMemberStarted((MemberStartedEvent)Util.
- jsonToObject(json, MemberStartedEvent.class));
- } else if(MemberActivatedEvent.class.getName().equals(type)) {
- TopologyBuilder.handleMemberActivated((MemberActivatedEvent) Util.
- jsonToObject(json, MemberActivatedEvent.class));
+ if(InstanceStartedEvent.class.getName().equals(type)) {
+ TopologyBuilder.handleMemberStarted((InstanceStartedEvent)Util.
+ jsonToObject(json, InstanceStartedEvent.class));
+ } else if(InstanceActivatedEvent.class.getName().equals(type)) {
+ TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util.
+ jsonToObject(json, InstanceActivatedEvent.class));
}
if (log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventSender.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventSender.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventSender.java
index 59a9e55..8b8137c 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventSender.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventSender.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.domain.topology.Port;
import org.apache.stratos.messaging.domain.topology.ServiceType;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
import org.apache.stratos.messaging.event.topology.*;
import java.util.List;
@@ -64,43 +65,10 @@ public class TopologyEventSender {
}
}
-// public static void sendPartitionCreatedEvent(Partition partition) {
-// PartitionCreatedEvent partitionCreatedEvent =
-// new PartitionCreatedEvent(partition);
-//
-// if(log.isInfoEnabled()) {
-// log.info(String.format("Publishing partition created event: [partition] %s", partition.getId()));
-// }
-// publishEvent(partitionCreatedEvent);
-// }
-
-// public static void sendPartitionUpdatedEvent(Partition partition, String oldPartitionId) {
-// PartitionUpdatedEvent partitionUpdatedEvent =
-// new PartitionUpdatedEvent(partition.getId(),
-// partition.getScope(),
-// oldPartitionId);
-// partitionUpdatedEvent.setProperties(partition.getProperties());
-//
-// if(log.isInfoEnabled()) {
-// log.info(String.format("Publishing partition updated event: [partition] %s", partition.getId()));
-// }
-// publishEvent(partitionUpdatedEvent);
-// }
-//
-// public static void sendPartitionRemovedEvent(Partition partition) {
-// PartitionRemovedEvent partitionRemovedEvent = new PartitionRemovedEvent(partition.getId());
-//
-// if(log.isInfoEnabled()) {
-// log.info(String.format("Publishing partition removed event: [partition] %s", partition.getId()));
-// }
-// publishEvent(partitionRemovedEvent);
-// }
-
public static void sendServiceRemovedEvent(List<Cartridge> cartridgeList) {
ServiceRemovedEvent serviceRemovedEvent;
for(Cartridge cartridge : cartridgeList) {
- serviceRemovedEvent = new ServiceRemovedEvent();
- serviceRemovedEvent.setServiceName(cartridge.getType());
+ serviceRemovedEvent = new ServiceRemovedEvent(cartridge.getType());
if(log.isInfoEnabled()) {
log.info(String.format("Publishing service removed event: [service] %s", serviceRemovedEvent.getServiceName()));
}
@@ -139,23 +107,20 @@ public class TopologyEventSender {
}
- public static void sendInstanceSpawnedEvent(String serviceName, String clusterId, String memberId, String nodeId) {
- InstanceSpawnedEvent instanceSpawnedEvent = new InstanceSpawnedEvent(serviceName,
- clusterId,
- memberId,
- nodeId);
+ public static void sendInstanceSpawnedEvent(String serviceName, String clusterId, String partitionId, String memberId) {
+ InstanceSpawnedEvent instanceSpawnedEvent = new InstanceSpawnedEvent(serviceName, clusterId, partitionId, memberId);
if(log.isInfoEnabled()) {
- log.info(String.format("Publishing instance spawned event: [service] %s [cluster] %s [member] %s [node] %s", serviceName, clusterId, memberId, nodeId));
+ log.info(String.format("Publishing instance spawned event: [service] %s [cluster] %s [partition] %s [member] %s", serviceName, clusterId, partitionId, memberId));
}
publishEvent(instanceSpawnedEvent);
}
- public static void sendMemberStartedEvent(org.apache.stratos.messaging.event.instance.status.MemberStartedEvent memberStartedEvent) {
- MemberStartedEvent memberStartedEventTopology = new MemberStartedEvent(memberStartedEvent.getServiceName(),
- memberStartedEvent.getClusterId(), memberStartedEvent.getMemberId());
+ public static void sendMemberStartedEvent(InstanceStartedEvent instanceStartedEvent) {
+ MemberStartedEvent memberStartedEventTopology = new MemberStartedEvent(instanceStartedEvent.getServiceName(),
+ instanceStartedEvent.getClusterId(), instanceStartedEvent.getMemberId());
if(log.isInfoEnabled()) {
- log.info(String.format("Publishing member started event: [service] %s [cluster] %s [member] %s", memberStartedEvent.getServiceName(), memberStartedEvent.getClusterId(), memberStartedEvent.getMemberId()));
+ log.info(String.format("Publishing member started event: [service] %s [cluster] %s [member] %s", instanceStartedEvent.getServiceName(), instanceStartedEvent.getClusterId(), instanceStartedEvent.getMemberId()));
}
publishEvent(memberStartedEventTopology);
}
@@ -178,8 +143,7 @@ public class TopologyEventSender {
}
public static void sendCompleteTopologyEvent(Topology topology) {
- CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent();
- completeTopologyEvent.setTopology(topology);
+ CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
if(log.isInfoEnabled()) {
log.info(String.format("Publishing complete topology event"));
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
index adbb60e..58d9163 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
@@ -414,7 +414,7 @@ public class LoadBalancerConfiguration {
for (Node memberNode : membersNode.getChildNodes()) {
String memberId = memberNode.getName();
- Member member = new Member(cluster.getServiceName(), cluster.getClusterId(), memberId);
+ Member member = new Member(cluster.getServiceName(), cluster.getClusterId(), Constants.STATIC_PARTITION, memberId);
String ip = memberNode.getProperty(Constants.CONF_PROPERTY_IP);
validateRequiredPropertyInNode(Constants.CONF_PROPERTY_IP, ip, String.format("member %s", memberId));
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
index e858858..caf0d33 100755
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
@@ -58,6 +58,7 @@ public class Constants {
public static final String CONF_DELIMITER_HOSTS = ",";
public static final long DEFAULT_SESSION_TIMEOUT = 90000;
+ public static final String STATIC_PARTITION = "static-partition";
/* Nginx format related constants */
public static final String NGINX_COMMENT = "#";
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index be7133e..48387c4 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -32,8 +32,8 @@ import org.apache.stratos.load.balancer.conf.configurator.CEPConfigurator;
import org.apache.stratos.load.balancer.conf.configurator.JndiConfigurator;
import org.apache.stratos.load.balancer.conf.configurator.SynapseConfigurator;
import org.apache.stratos.load.balancer.context.LoadBalancerContext;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.config.xml.MultiXMLConfigurationBuilder;
import org.apache.synapse.core.SynapseEnvironment;
@@ -141,9 +141,9 @@ public class LoadBalancerServiceComponent {
}
if (log.isInfoEnabled()) {
- if (ServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
StringBuilder sb = new StringBuilder();
- for (String serviceName : ServiceFilter.getInstance().getIncludedServiceNames()) {
+ for (String serviceName : TopologyServiceFilter.getInstance().getIncludedServiceNames()) {
if (sb.length() > 0) {
sb.append(", ");
}
@@ -151,9 +151,9 @@ public class LoadBalancerServiceComponent {
}
log.info(String.format("Service filter activated: [services] %s", sb.toString()));
}
- if (ClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().isActive()) {
StringBuilder sb = new StringBuilder();
- for (String clusterId : ClusterFilter.getInstance().getIncludedClusterIds()) {
+ for (String clusterId : TopologyClusterFilter.getInstance().getIncludedClusterIds()) {
if (sb.length() > 0) {
sb.append(", ");
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/RoundRobinAlgorithmTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/RoundRobinAlgorithmTest.java b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/RoundRobinAlgorithmTest.java
index ad90335..a2a877f 100755
--- a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/RoundRobinAlgorithmTest.java
+++ b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/RoundRobinAlgorithmTest.java
@@ -39,15 +39,15 @@ public class RoundRobinAlgorithmTest {
@Test
public final void testRoundRobinAlgorithm() {
List<Member> members = new ArrayList<Member>();
- Member member = new Member("service1", "cluster1", "m1");
+ Member member = new Member("service1", "cluster1", "p1", "m1");
member.setStatus(MemberStatus.Activated);
members.add(member);
- member = new Member("service1", "cluster1", "m2");
+ member = new Member("service1", "cluster1", "p1", "m2");
member.setStatus(MemberStatus.Activated);
members.add(member);
- member = new Member("service1", "cluster1", "m3");
+ member = new Member("service1", "cluster1", "p1", "m3");
member.setStatus(MemberStatus.Activated);
members.add(member);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/pom.xml b/components/org.apache.stratos.messaging/pom.xml
index 183c322..cb227b3 100644
--- a/components/org.apache.stratos.messaging/pom.xml
+++ b/components/org.apache.stratos.messaging/pom.xml
@@ -55,6 +55,11 @@
<version>3.1</version>
</dependency>
<dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+ <dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.logging</artifactId>
<version>${wso2carbon.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
index a82b7a1..bb14ae3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
@@ -36,20 +36,24 @@ import java.util.Properties;
@XmlRootElement
public class Member implements Serializable {
private static final long serialVersionUID = 4179661867903664661L;
- private String serviceName;
- private String clusterId;
- private String memberId;
+
+ private final String serviceName;
+ private final String clusterId;
+ private final String partitionId;
+ private final String memberId;
+
private MemberStatus status;
private String memberIp;
@XmlJavaTypeAdapter(MapAdapter.class)
- private Map<String, Port> portMap;
+ private final Map<String, Port> portMap;
@XmlJavaTypeAdapter(MapAdapter.class)
private Properties properties;
- private String partitionId;
+ private String lbClusterId;
- public Member(String serviceName, String clusterId, String memberId) {
+ public Member(String serviceName, String clusterId, String partitionId, String memberId) {
this.serviceName = serviceName;
this.clusterId = clusterId;
+ this.partitionId = partitionId;
this.memberId = memberId;
this.portMap = new HashMap<String, Port>();
}
@@ -128,9 +132,12 @@ public class Member implements Serializable {
return partitionId;
}
- public void setPartitionId(String partitionId) {
- this.partitionId = partitionId;
+ public void setLbClusterId(String lbClusterId) {
+ this.lbClusterId = lbClusterId;
}
+ public String getLbClusterId() {
+ return lbClusterId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/InstanceSpawnedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/InstanceSpawnedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/InstanceSpawnedEvent.java
index 3817b12..db88940 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/InstanceSpawnedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/InstanceSpawnedEvent.java
@@ -32,6 +32,7 @@ public class InstanceSpawnedEvent extends TopologyEvent implements Serializable
private final String clusterId;
private final String memberId;
private final String partitionId;
+ private String lbClusterId;
public InstanceSpawnedEvent(String serviceName, String clusterId, String partitionId, String memberId) {
this.serviceName = serviceName;
@@ -56,4 +57,11 @@ public class InstanceSpawnedEvent extends TopologyEvent implements Serializable
return memberId;
}
+ public String getLbClusterId() {
+ return lbClusterId;
+ }
+
+ public void setLbClusterId(String lbClusterId) {
+ this.lbClusterId = lbClusterId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
index 5d57b16..380a369 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
@@ -35,6 +35,7 @@ public class MemberStartedEvent extends TopologyEvent implements Serializable {
private MemberStatus status;
private Properties properties;
private String partitionId;
+ private String lbClusterId;
public MemberStartedEvent(String serviceName, String clusterId, String memberId) {
this.serviceName = serviceName;
@@ -83,4 +84,12 @@ public class MemberStartedEvent extends TopologyEvent implements Serializable {
public void setPartitionId(String partitionId) {
this.partitionId = partitionId;
}
+
+ public String getLbClusterId() {
+ return lbClusterId;
+ }
+
+ public void setLbClusterId(String lbClusterId) {
+ this.lbClusterId = lbClusterId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberSuspendedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberSuspendedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberSuspendedEvent.java
index 91d13c6..a58d3c7 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberSuspendedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberSuspendedEvent.java
@@ -30,6 +30,7 @@ public class MemberSuspendedEvent extends TopologyEvent implements Serializable
private final String serviceName;
private final String clusterId;
private final String memberId;
+ private String lbClusterId;
public MemberSuspendedEvent(String serviceName, String clusterId, String memberId) {
this.serviceName = serviceName;
@@ -48,4 +49,12 @@ public class MemberSuspendedEvent extends TopologyEvent implements Serializable
public String getMemberId() {
return memberId;
}
+
+ public String getLbClusterId() {
+ return lbClusterId;
+ }
+
+ public void setLbClusterId(String lbClusterId) {
+ this.lbClusterId = lbClusterId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberTerminatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberTerminatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberTerminatedEvent.java
index 1ae6e46..8a50b0b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberTerminatedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberTerminatedEvent.java
@@ -30,6 +30,7 @@ public class MemberTerminatedEvent extends TopologyEvent implements Serializable
private final String serviceName;
private final String clusterId;
private final String memberId;
+ private String lbClusterId;
public MemberTerminatedEvent(String serviceName, String clusterId, String memberId) {
this.serviceName = serviceName;
@@ -48,4 +49,12 @@ public class MemberTerminatedEvent extends TopologyEvent implements Serializable
public String getMemberId() {
return memberId;
}
+
+ public String getLbClusterId() {
+ return lbClusterId;
+ }
+
+ public void setLbClusterId(String lbClusterId) {
+ this.lbClusterId = lbClusterId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/MessageFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/MessageFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/MessageFilter.java
new file mode 100644
index 0000000..d8e0481
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/MessageFilter.java
@@ -0,0 +1,94 @@
+package org.apache.stratos.messaging.message.filter;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Message filter for filtering incoming messages in message processors.
+ */
+public class MessageFilter {
+
+ private static final Log log = LogFactory.getLog(TopologyServiceFilter.class);
+
+ private String filterName;
+ private Map<String, Map<String, Boolean>> filterMap;
+
+ public MessageFilter(String filterName) {
+ this.filterName = filterName;
+ this.filterMap = new HashMap<String, Map<String, Boolean>>();
+ init();
+ }
+
+ private Map<String, String> splitToMap(String filter) {
+ HashMap<String, String> keyValuePairMap = new HashMap<String, String>();
+ String[] keyValuePairArray = filter.split(Constants.FILTER_KEY_VALUE_SEPARATOR);
+ for(String keyValuePair : keyValuePairArray) {
+ String [] keyValueArray = keyValuePair.split(Constants.FILTER_VALUE_ASSIGN_OPERATOR);
+ if(keyValueArray.length == 2) {
+ keyValuePairMap.put(keyValueArray[0].trim(), keyValueArray[1].trim());
+ }
+ }
+ return keyValuePairMap;
+ }
+
+ /**
+ * Initialize message filter using system property.
+ */
+ public void init() {
+ String filter = System.getProperty(filterName);
+ if(StringUtils.isNotBlank(filter)) {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Initializing filter: %s", filterName));
+ }
+
+ String propertyValue;
+ Map<String, Boolean> propertyValueMap;
+ String[] propertyValueArray;
+ Map<String, String> keyValuePairMap = splitToMap(filter);
+
+ for(String propertyName : keyValuePairMap.keySet()) {
+ propertyValue = keyValuePairMap.get(propertyName);
+ propertyValueMap = new HashMap<String, Boolean>();
+ propertyValueArray = propertyValue.split(Constants.FILTER_VALUE_SEPARATOR);
+ for(String value : propertyValueArray) {
+ propertyValueMap.put(value, true);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Filter property value found: [property] %s [value] %s", propertyName, value));
+ }
+ }
+ filterMap.put(propertyName, propertyValueMap);
+ }
+ }
+ }
+
+ public boolean isActive() {
+ return filterMap.size() > 0;
+ }
+
+ public boolean included(String propertyName, String propertyValue) {
+ if(filterMap.containsKey(propertyName)) {
+ Map<String, Boolean> propertyValueMap = filterMap.get(propertyName);
+ return propertyValueMap.containsKey(propertyValue);
+ }
+ return false;
+ }
+
+ public boolean excluded(String propertyName, String propertyValue) {
+ return !included(propertyName, propertyValue);
+ }
+
+ public Collection<String> getIncludedPropertyValues(String propertyName) {
+ if(filterMap.containsKey(propertyName)) {
+ return filterMap.get(propertyName).keySet();
+ }
+ return CollectionUtils.EMPTY_COLLECTION;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java
deleted file mode 100644
index ba8e23c..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.filter.topology;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A filter to discard topology events which are not in a given cluster id list.
- */
-public class ClusterFilter {
- private static final Log log = LogFactory.getLog(ClusterFilter.class);
- private static volatile ClusterFilter instance;
-
- private Map<String, Boolean> clusterIdMap;
-
- private ClusterFilter() {
- this.clusterIdMap = new HashMap<String, Boolean>();
-
- String filter = System.getProperty("stratos.messaging.topology.cluster.filter");
- if(StringUtils.isNotBlank(filter)) {
- String[] array = filter.split(",");
- for(String item : array) {
- clusterIdMap.put(item, true);
- }
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster filter initialized: [included] %s", filter));
- }
- }
- }
-
- public static synchronized ClusterFilter getInstance() {
- if (instance == null) {
- synchronized (ClusterFilter.class){
- if (instance == null) {
- instance = new ClusterFilter();
- if(log.isDebugEnabled()) {
- log.debug("Cluster filter object created");
- }
- }
- }
- }
- return instance;
- }
-
- public boolean isActive() {
- return clusterIdMap.size() > 0;
- }
-
- public boolean included(String clusterId) {
- return clusterIdMap.containsKey(clusterId);
- }
-
- public boolean excluded(String clusterId) {
- return !clusterIdMap.containsKey(clusterId);
- }
-
- public Collection<String> getIncludedClusterIds() {
- return clusterIdMap.keySet();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java
deleted file mode 100644
index 2c78a25..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.filter.topology;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A filter to discard topology events which are not in a given service name list.
- */
-public class ServiceFilter {
- private static final Log log = LogFactory.getLog(ServiceFilter.class);
- private static volatile ServiceFilter instance;
-
- private Map<String, Boolean> serviceNameMap;
-
- private ServiceFilter() {
- this.serviceNameMap = new HashMap<String, Boolean>();
-
- String filter = System.getProperty("stratos.messaging.topology.service.filter");
- if(StringUtils.isNotBlank(filter)) {
- String[] array = filter.split(",");
- for(String item : array) {
- serviceNameMap.put(item, true);
- }
- if(log.isDebugEnabled()) {
- log.debug(String.format("Service filter initialized: [included] %s", filter));
- }
- }
- }
-
- public static synchronized ServiceFilter getInstance() {
- if (instance == null) {
- synchronized (ServiceFilter.class){
- if (instance == null) {
- instance = new ServiceFilter();
- if(log.isDebugEnabled()) {
- log.debug("Service filter object created");
- }
- }
- }
- }
- return instance;
- }
-
- public boolean isActive() {
- return serviceNameMap.size() > 0;
- }
-
- public boolean included(String serviceName) {
- return serviceNameMap.containsKey(serviceName);
- }
-
- public boolean excluded(String serviceName) {
- return !serviceNameMap.containsKey(serviceName);
- }
-
- public Collection<String> getIncludedServiceNames() {
- return serviceNameMap.keySet();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java
new file mode 100644
index 0000000..5416744
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.filter.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.filter.MessageFilter;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.util.Collection;
+
+/**
+ * A filter to discard topology events which are not in a given cluster id list.
+ */
+public class TopologyClusterFilter extends MessageFilter {
+ private static final Log log = LogFactory.getLog(TopologyServiceFilter.class);
+ private static volatile TopologyClusterFilter instance;
+
+ public TopologyClusterFilter() {
+ super(Constants.TOPOLOGY_CLUSTER_FILTER);
+ }
+
+ public static synchronized TopologyClusterFilter getInstance() {
+ if (instance == null) {
+ synchronized (TopologyClusterFilter.class) {
+ if (instance == null) {
+ instance = new TopologyClusterFilter();
+ if (log.isDebugEnabled()) {
+ log.debug("Topology cluster filter instance created");
+ }
+ }
+ }
+ }
+ return instance;
+ }
+
+ public boolean clusterIdIncluded(String value) {
+ return included(Constants.TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID, value);
+ }
+
+ public boolean clusterIdExcluded(String value) {
+ return excluded(Constants.TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID, value);
+ }
+
+ public Collection<String> getIncludedClusterIds() {
+ return getIncludedPropertyValues(Constants.TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java
new file mode 100644
index 0000000..1da4413
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java
@@ -0,0 +1,46 @@
+package org.apache.stratos.messaging.message.filter.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.filter.MessageFilter;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.util.Collection;
+
+/**
+ * A filter to discard topology events which are not in a load balancer cluster.
+ */
+public class TopologyMemberFilter extends MessageFilter {
+ private static final Log log = LogFactory.getLog(TopologyServiceFilter.class);
+ private static volatile TopologyMemberFilter instance;
+
+ public TopologyMemberFilter() {
+ super(Constants.TOPOLOGY_MEMBER_FILTER);
+ }
+
+ public static synchronized TopologyMemberFilter getInstance() {
+ if (instance == null) {
+ synchronized (TopologyMemberFilter.class){
+ if (instance == null) {
+ instance = new TopologyMemberFilter();
+ if(log.isDebugEnabled()) {
+ log.debug("Topology member filter instance created");
+ }
+ }
+ }
+ }
+ return instance;
+ }
+
+ public boolean lbClusterIdIncluded(String value) {
+ return included(Constants.TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID, value);
+ }
+
+ public boolean lbClusterIdExcluded(String value) {
+ return excluded(Constants.TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID, value);
+ }
+
+ public Collection<String> getIncludedLbClusterIds() {
+ return getIncludedPropertyValues(Constants.TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java
new file mode 100644
index 0000000..6ddc6e0
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.filter.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.filter.MessageFilter;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.util.Collection;
+
+/**
+ * A filter to discard topology events which are not in a given service name list.
+ */
+public class TopologyServiceFilter extends MessageFilter {
+ private static final Log log = LogFactory.getLog(TopologyServiceFilter.class);
+ private static volatile TopologyServiceFilter instance;
+
+ public TopologyServiceFilter() {
+ super(Constants.TOPOLOGY_SERVICE_FILTER);
+ }
+
+ public static synchronized TopologyServiceFilter getInstance() {
+ if (instance == null) {
+ synchronized (TopologyServiceFilter.class) {
+ if (instance == null) {
+ instance = new TopologyServiceFilter();
+ if (log.isDebugEnabled()) {
+ log.debug("Topology service filter instance created");
+ }
+ }
+ }
+ }
+ return instance;
+ }
+
+ public boolean serviceNameIncluded(String value) {
+ return included(Constants.TOPOLOGY_SERVICE_FILTER_SERVICE_NAME, value);
+ }
+
+ public boolean serviceNameExcluded(String value) {
+ return excluded(Constants.TOPOLOGY_SERVICE_FILTER_SERVICE_NAME, value);
+ }
+
+ public Collection<String> getIncludedServiceNames() {
+ return getIncludedPropertyValues(Constants.TOPOLOGY_SERVICE_FILTER_SERVICE_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
index 8a8d394..d1795b6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -25,9 +25,9 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
import org.apache.stratos.messaging.util.Util;
public class ClusterCreatedMessageProcessor extends MessageProcessor {
@@ -53,8 +53,8 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
// Apply service filter
- if (ServiceFilter.getInstance().isActive()) {
- if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
// Service is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -64,8 +64,8 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
}
// Apply cluster filter
- if (ClusterFilter.getInstance().isActive()) {
- if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
// Cluster is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
index eb1d98a..edbfed1 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -18,15 +18,14 @@
*/
package org.apache.stratos.messaging.message.processor.topology;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
import org.apache.stratos.messaging.util.Util;
public class ClusterRemovedMessageProcessor extends MessageProcessor {
@@ -52,8 +51,8 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor {
ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
// Apply service filter
- if (ServiceFilter.getInstance().isActive()) {
- if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
// Service is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -63,8 +62,8 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor {
}
// Apply cluster filter
- if (ClusterFilter.getInstance().isActive()) {
- if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
// Cluster is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
index 5cea64e..3fa8890 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -24,9 +24,9 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
import org.apache.stratos.messaging.util.Util;
public class CompleteTopologyMessageProcessor extends MessageProcessor {
@@ -53,10 +53,10 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
// Apply service filter
- if (ServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
// Add services included in service filter
for (Service service : event.getTopology().getServices()) {
- if (ServiceFilter.getInstance().included(service.getServiceName())) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(service.getServiceName())) {
topology.addService(service);
} else {
if (log.isDebugEnabled()) {
@@ -70,10 +70,10 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
}
// Apply cluster filter
- if (ClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().isActive()) {
for (Service service : topology.getServices()) {
for (Cluster cluster : service.getClusters()) {
- if (ClusterFilter.getInstance().excluded(cluster.getClusterId())) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(cluster.getClusterId())) {
service.removeCluster(cluster.getClusterId());
if (log.isDebugEnabled()) {
log.debug(String.format("Cluster is excluded: [cluster] %s", cluster.getClusterId()));
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
index f614782..d362cbc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
@@ -22,9 +22,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
import org.apache.stratos.messaging.util.Util;
public class InstanceSpawnedMessageProcessor extends MessageProcessor {
@@ -50,8 +51,8 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
// Apply service filter
- if (ServiceFilter.getInstance().isActive()) {
- if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
// Service is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -61,8 +62,8 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
}
// Apply cluster filter
- if (ClusterFilter.getInstance().isActive()) {
- if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
// Cluster is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
@@ -71,6 +72,16 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
}
}
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
+ }
+ return false;
+ }
+ }
+
// Validate event against the existing topology
Service service = topology.getService(event.getServiceName());
if (service == null) {
@@ -99,9 +110,9 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
}
// Apply changes to the topology
- Member member = new Member(event.getServiceName(), event.getClusterId(), event.getMemberId());
+ Member member = new Member(event.getServiceName(), event.getClusterId(), event.getPartitionId(), event.getMemberId());
member.setStatus(MemberStatus.Created);
- member.setPartitionId(event.getPartitionId());
+ member.setLbClusterId(event.getLbClusterId());
cluster.addMember(member);
if (log.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
index e299b7d..71853d9 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
@@ -26,9 +26,10 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
import org.apache.stratos.messaging.util.Util;
public class MemberActivatedMessageProcessor extends MessageProcessor {
@@ -54,8 +55,8 @@ public class MemberActivatedMessageProcessor extends MessageProcessor {
MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
// Apply service filter
- if (ServiceFilter.getInstance().isActive()) {
- if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
// Service is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -65,8 +66,8 @@ public class MemberActivatedMessageProcessor extends MessageProcessor {
}
// Apply cluster filter
- if (ClusterFilter.getInstance().isActive()) {
- if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
// Cluster is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
@@ -75,6 +76,16 @@ public class MemberActivatedMessageProcessor extends MessageProcessor {
}
}
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
+ }
+ return false;
+ }
+ }
+
// Validate event properties
if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
index eeae3d3..2753379 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
@@ -26,9 +26,10 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
import org.apache.stratos.messaging.util.Util;
public class MemberStartedMessageProcessor extends MessageProcessor {
@@ -54,8 +55,8 @@ public class MemberStartedMessageProcessor extends MessageProcessor {
MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
// Apply service filter
- if (ServiceFilter.getInstance().isActive()) {
- if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
// Service is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -65,8 +66,8 @@ public class MemberStartedMessageProcessor extends MessageProcessor {
}
// Apply cluster filter
- if (ClusterFilter.getInstance().isActive()) {
- if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
// Cluster is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
@@ -75,6 +76,16 @@ public class MemberStartedMessageProcessor extends MessageProcessor {
}
}
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
+ }
+ return false;
+ }
+ }
+
// Validate event against the existing topology
Service service = topology.getService(event.getServiceName());
if (service == null) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
index e72d62c..1aa7fef 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
@@ -26,9 +26,10 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
import org.apache.stratos.messaging.util.Util;
public class MemberSuspendedMessageProcessor extends MessageProcessor {
@@ -54,8 +55,8 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor {
MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
// Apply service filter
- if (ServiceFilter.getInstance().isActive()) {
- if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
// Service is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -65,8 +66,8 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor {
}
// Apply cluster filter
- if (ClusterFilter.getInstance().isActive()) {
- if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
// Cluster is excluded, do not update topology or fire event
if (log.isDebugEnabled()) {
log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
@@ -75,6 +76,16 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor {
}
}
+ // Apply member filter
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
+ }
+ return false;
+ }
+ }
+
// Validate event against the existing topology
Service service = topology.getService(event.getServiceName());
if (service == null) {