You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/04/28 21:16:22 UTC
[1/4] Introduced new functionality to add domains to cartridge
subscriptions
Repository: incubator-stratos
Updated Branches:
refs/heads/4.0.0-incubating 57bffdf61 -> 33471692b
refs/heads/master 5eee96dbb -> f888f854c
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsAddedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsAddedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsAddedMessageProcessor.java
new file mode 100644
index 0000000..aeea6bf
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsAddedMessageProcessor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.tenant.Subscription;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.apache.stratos.messaging.event.tenant.SubscriptionDomainsAddedEvent;
+import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * Tenant subscribed message processor for adding domains to tenant subscriptions.
+ */
+public class SubscriptionDomainsAddedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(SubscriptionDomainsAddedMessageProcessor.class);
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (SubscriptionDomainsAddedEvent.class.getName().equals(type)) {
+ // Return if tenant manager has not initialized
+ if(!TenantManager.getInstance().isInitialized()) {
+ return false;
+ }
+
+ // Parse complete message and build event
+ SubscriptionDomainsAddedEvent event = (SubscriptionDomainsAddedEvent) Util.jsonToObject(message, TenantSubscribedEvent.class);
+
+ try {
+ TenantManager.acquireWriteLock();
+ Tenant tenant = TenantManager.getInstance().getTenant(event.getTenantId());
+ if(tenant == null) {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Tenant not found: [tenant-id] %d", event.getTenantId()));
+ }
+ return false;
+ }
+ Subscription subscription = tenant.getSubscription(event.getServiceName());
+ if(subscription == null) {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Subscription not found: [tenant-id] %d", event.getTenantId()));
+ }
+ return false;
+ }
+ subscription.addDomains(event.getDomains());
+ if(log.isInfoEnabled()) {
+ log.info(String.format("Domains added to tenant subscription: [tenant-id] %d [tenant-domain] %s [service] %s [domains] %s",
+ tenant.getTenantId(), tenant.getTenantDomain(), event.getServiceName(), event.getDomains()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+ finally {
+ TenantManager.releaseWriteLock();
+ }
+ }
+ else {
+ if(nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ }
+ else {
+ throw new RuntimeException(String.format("Failed to process tenant message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsRemovedMessageProcessor.java
new file mode 100644
index 0000000..fc08357
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsRemovedMessageProcessor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.tenant.Subscription;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.apache.stratos.messaging.event.tenant.SubscriptionDomainsAddedEvent;
+import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * Tenant subscribed message processor for removing domains from tenant subscriptions.
+ */
+public class SubscriptionDomainsRemovedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(SubscriptionDomainsRemovedMessageProcessor.class);
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (SubscriptionDomainsAddedEvent.class.getName().equals(type)) {
+ // Return if tenant manager has not initialized
+ if(!TenantManager.getInstance().isInitialized()) {
+ return false;
+ }
+
+ // Parse complete message and build event
+ SubscriptionDomainsAddedEvent event = (SubscriptionDomainsAddedEvent) Util.jsonToObject(message, TenantSubscribedEvent.class);
+
+ try {
+ TenantManager.acquireWriteLock();
+ Tenant tenant = TenantManager.getInstance().getTenant(event.getTenantId());
+ if(tenant == null) {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Tenant not found: [tenant-id] %d", event.getTenantId()));
+ }
+ return false;
+ }
+ Subscription subscription = tenant.getSubscription(event.getServiceName());
+ if(subscription == null) {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Subscription not found: [tenant-id] %d", event.getTenantId()));
+ }
+ return false;
+ }
+ subscription.removeDomains(event.getDomains());
+ if(log.isInfoEnabled()) {
+ log.info(String.format("Domains removed from tenant subscription: [tenant-id] %d [tenant-domain] %s [service] %s [domains] %s",
+ tenant.getTenantId(), tenant.getTenantDomain(), event.getServiceName(), event.getDomains()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+ finally {
+ TenantManager.releaseWriteLock();
+ }
+ }
+ else {
+ if(nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ }
+ else {
+ throw new RuntimeException(String.format("Failed to process tenant message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java
index d4c008e..725ad0f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantMessageProcessorChain.java
@@ -37,6 +37,8 @@ public class TenantMessageProcessorChain extends MessageProcessorChain {
private TenantRemovedMessageProcessor tenantRemovedMessageProcessor;
private TenantSubscribedMessageProcessor tenantSubscribedMessageProcessor;
private TenantUnSubscribedMessageProcessor tenantUnSubscribedMessageProcessor;
+ private SubscriptionDomainsAddedMessageProcessor subscriptionDomainsAddedMessageProcessor;
+ private SubscriptionDomainsRemovedMessageProcessor subscriptionDomainsRemovedMessageProcessor;
public void initialize() {
// Add tenant event processors
@@ -58,6 +60,12 @@ public class TenantMessageProcessorChain extends MessageProcessorChain {
tenantUnSubscribedMessageProcessor = new TenantUnSubscribedMessageProcessor();
add(tenantUnSubscribedMessageProcessor);
+ subscriptionDomainsAddedMessageProcessor = new SubscriptionDomainsAddedMessageProcessor();
+ add(subscriptionDomainsAddedMessageProcessor);
+
+ subscriptionDomainsRemovedMessageProcessor = new SubscriptionDomainsRemovedMessageProcessor();
+ add(subscriptionDomainsRemovedMessageProcessor);
+
if (log.isDebugEnabled()) {
log.debug("Tenant message processor chain initialized");
}
@@ -76,6 +84,10 @@ public class TenantMessageProcessorChain extends MessageProcessorChain {
tenantSubscribedMessageProcessor.addEventListener(eventListener);
} else if (eventListener instanceof TenantUnSubscribedEventListener) {
tenantUnSubscribedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof SubscriptionDomainsAddedEventListener) {
+ subscriptionDomainsAddedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof SubscriptionDomainsRemovedEventListener) {
+ subscriptionDomainsRemovedMessageProcessor.addEventListener(eventListener);
}
else {
throw new RuntimeException("Unknown event listener");
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantSubscribedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantSubscribedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantSubscribedMessageProcessor.java
index aae074b..147ebde 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantSubscribedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantSubscribedMessageProcessor.java
@@ -21,6 +21,7 @@ package org.apache.stratos.messaging.message.processor.tenant;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.tenant.Subscription;
import org.apache.stratos.messaging.domain.tenant.Tenant;
import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -62,10 +63,11 @@ public class TenantSubscribedMessageProcessor extends MessageProcessor {
}
return false;
}
- tenant.addServiceSubscription(event.getServiceName());
+ Subscription subscription = new Subscription(event.getServiceName(), event.getClusterIds(), event.getDomains());
+ tenant.addSubscription(subscription);
if(log.isInfoEnabled()) {
- log.info(String.format("Tenant subscribed to service: [tenant-id] %d [tenant-domain] %s [service] %s",
- tenant.getTenantId(), tenant.getTenantDomain(), event.getServiceName()));
+ log.info(String.format("Tenant subscribed to service: [tenant-id] %d [tenant-domain] %s [service] %s [domains] %s",
+ tenant.getTenantId(), tenant.getTenantDomain(), event.getServiceName(), event.getDomains()));
}
// Notify event listeners
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUnSubscribedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUnSubscribedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUnSubscribedMessageProcessor.java
index 6c4157c..ee929a3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUnSubscribedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUnSubscribedMessageProcessor.java
@@ -62,7 +62,7 @@ public class TenantUnSubscribedMessageProcessor extends MessageProcessor {
}
return false;
}
- tenant.removeServiceSubscription(event.getServiceName());
+ tenant.removeSubscription(event.getServiceName());
if(log.isInfoEnabled()) {
log.info(String.format("Tenant un-subscribed from service: [tenant-id] %d [tenant-domain] %s [service] %s",
tenant.getTenantId(), tenant.getTenantDomain(), event.getServiceName()));
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/TenantDomainTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/TenantDomainTest.java b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/TenantDomainTest.java
new file mode 100644
index 0000000..adb11e1
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/TenantDomainTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.test;
+
+import junit.framework.Assert;
+import org.apache.stratos.messaging.domain.tenant.Subscription;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.HashSet;
+
+/**
+ * Tenant domain model test.
+ */
+@RunWith(JUnit4.class)
+public class TenantDomainTest {
+ @Test
+ public void testSubscriptionModel() {
+ Tenant tenant = new Tenant(1, "domain.org");
+ Subscription subscription = new Subscription("subscription1", new HashSet<String>(), new HashSet<String>());
+ tenant.addSubscription(subscription);
+ Assert.assertTrue("Subscription not added", tenant.isSubscribed("subscription1"));
+ tenant.removeSubscription("subscription1");
+ Assert.assertTrue("Subscription not removed", !tenant.isSubscribed("subscription1"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/bean/CartridgeInfoBean.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/bean/CartridgeInfoBean.java b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/bean/CartridgeInfoBean.java
index e29d028..48fd5ce 100644
--- a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/bean/CartridgeInfoBean.java
+++ b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/bean/CartridgeInfoBean.java
@@ -19,6 +19,8 @@
package org.apache.stratos.rest.endpoint.bean;
import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
@XmlRootElement
public class CartridgeInfoBean {
@@ -38,6 +40,11 @@ public class CartridgeInfoBean {
private String size;
private boolean removeOnTermination;
private String serviceGroup;
+ private List<String> domains;
+
+ public CartridgeInfoBean() {
+ this.domains = new ArrayList<String>();
+ }
public String getCartridgeType() {
return cartridgeType;
@@ -158,5 +165,8 @@ public class CartridgeInfoBean {
public void setServiceGroup(String serviceGroup) {
this.serviceGroup = serviceGroup;
}
-
+
+ public List<String> getDomains() { return domains; }
+
+ public void setDomains(List<String> domains) { this.domains = domains; }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/ServiceUtils.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/ServiceUtils.java b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/ServiceUtils.java
index 44b2c26..554accf 100644
--- a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/ServiceUtils.java
+++ b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/ServiceUtils.java
@@ -52,8 +52,7 @@ import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Constants;
-import org.apache.stratos.rest.endpoint.bean.CartridgeInfoBean;
-import org.apache.stratos.rest.endpoint.bean.StratosAdminResponse;
+import org.apache.stratos.rest.endpoint.bean.*;
import org.apache.stratos.rest.endpoint.bean.autoscaler.partition.Partition;
import org.apache.stratos.rest.endpoint.bean.autoscaler.partition.PartitionGroup;
import org.apache.stratos.rest.endpoint.bean.autoscaler.policy.autoscale.AutoscalePolicy;
@@ -992,6 +991,7 @@ public class ServiceUtils {
subscriptionData.setRepositoryPassword(cartridgeInfoBean.getRepoPassword());
subscriptionData.setCommitsEnabled(cartridgeInfoBean.isCommitsEnabled());
subscriptionData.setServiceGroup(cartridgeInfoBean.getServiceGroup());
+ subscriptionData.addDomains(new HashSet<String>(cartridgeInfoBean.getDomains()));
if (cartridgeInfoBean.isPersistanceRequired()) {
// Add persistence related properties to PersistenceContext
@@ -1187,4 +1187,44 @@ public class ServiceUtils {
return stratosAdminResponse;
}
+ public static StratosAdminResponse addSubscriptionDomains(ConfigurationContext configurationContext, String cartridgeType,
+ String subscriptionAlias, List<String> domains) throws RestAPIException {
+ try {
+ int tenantId = ApplicationManagementUtil.getTenantId(configurationContext);
+ cartridgeSubsciptionManager.addSubscriptionDomains(tenantId, subscriptionAlias, domains);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ throw new RestAPIException(e.getMessage(), e);
+ }
+
+ StratosAdminResponse stratosAdminResponse = new StratosAdminResponse();
+ stratosAdminResponse.setMessage("Successfully added domains to cartridge subscription");
+ return stratosAdminResponse;
+ }
+
+ public static List<String> getSubscriptionDomains(ConfigurationContext configurationContext, String cartridgeType,
+ String subscriptionAlias) throws RestAPIException {
+ try {
+ int tenantId = ApplicationManagementUtil.getTenantId(configurationContext);
+ return cartridgeSubsciptionManager.getSubscriptionDomains(tenantId, subscriptionAlias);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ throw new RestAPIException(e.getMessage(), e);
+ }
+ }
+
+ public static StratosAdminResponse removeSubscriptionDomains(ConfigurationContext configurationContext, String cartridgeType,
+ String subscriptionAlias, List<String> domains) throws RestAPIException {
+ try {
+ int tenantId = ApplicationManagementUtil.getTenantId(configurationContext);
+ cartridgeSubsciptionManager.removeSubscriptionDomains(tenantId, subscriptionAlias, domains);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ throw new RestAPIException(e.getMessage(), e);
+ }
+
+ StratosAdminResponse stratosAdminResponse = new StratosAdminResponse();
+ stratosAdminResponse.setMessage("Successfully removed domains from cartridge subscription");
+ return stratosAdminResponse;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/StratosAdmin.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/StratosAdmin.java b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/StratosAdmin.java
index 7b744a5..c5db7ff 100644
--- a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/StratosAdmin.java
+++ b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/StratosAdmin.java
@@ -33,6 +33,7 @@ import org.apache.stratos.rest.endpoint.annotation.AuthorizationAction;
import org.apache.stratos.rest.endpoint.annotation.SuperTenantService;
import org.apache.stratos.rest.endpoint.bean.CartridgeInfoBean;
import org.apache.stratos.rest.endpoint.bean.StratosAdminResponse;
+import org.apache.stratos.rest.endpoint.bean.SubscriptionDomainRequest;
import org.apache.stratos.rest.endpoint.bean.autoscaler.partition.Partition;
import org.apache.stratos.rest.endpoint.bean.autoscaler.partition.PartitionGroup;
import org.apache.stratos.rest.endpoint.bean.autoscaler.policy.autoscale.AutoscalePolicy;
@@ -40,7 +41,6 @@ import org.apache.stratos.rest.endpoint.bean.autoscaler.policy.deployment.Deploy
import org.apache.stratos.rest.endpoint.bean.cartridge.definition.CartridgeDefinitionBean;
import org.apache.stratos.rest.endpoint.bean.cartridge.definition.ServiceDefinitionBean;
import org.apache.stratos.rest.endpoint.bean.repositoryNotificationInfoBean.Payload;
-import org.apache.stratos.rest.endpoint.bean.repositoryNotificationInfoBean.Repository;
import org.apache.stratos.rest.endpoint.bean.topology.Cluster;
import org.apache.stratos.rest.endpoint.exception.RestAPIException;
import org.apache.stratos.tenant.mgt.core.TenantPersistor;
@@ -1018,4 +1018,36 @@ public class StratosAdmin extends AbstractAdmin {
}
return tenantList;
}
+
+ @POST
+ @Path("/cartridge/{cartridgeType}/subscription/{subscriptionAlias}/domain/")
+ @Consumes("application/json")
+ @AuthorizationAction("/permission/protected/manage/monitor/tenants")
+ public StratosAdminResponse addSubscriptionDomains(@PathParam("cartridgeType") String cartridgeType,
+ @PathParam("subscriptionAlias") String subscriptionAlias,
+ SubscriptionDomainRequest request) throws RestAPIException {
+
+ return ServiceUtils.addSubscriptionDomains(getConfigContext(), cartridgeType, subscriptionAlias, request.getDomains());
+ }
+
+ @GET
+ @Path("/cartridge/{cartridgeType}/subscription/{subscriptionAlias}/domain/")
+ @Consumes("application/json")
+ @AuthorizationAction("/permission/protected/manage/monitor/tenants")
+ public String[] getSubscriptionDomains(@PathParam("cartridgeType") String cartridgeType,
+ @PathParam("subscriptionAlias") String subscriptionAlias) throws RestAPIException {
+
+ return (String[]) ServiceUtils.getSubscriptionDomains(getConfigContext(), cartridgeType, subscriptionAlias).toArray();
+ }
+
+ @DELETE
+ @Path("/cartridge/{cartridgeType}/subscription/{subscriptionAlias}/domain/")
+ @Consumes("application/json")
+ @AuthorizationAction("/permission/protected/manage/monitor/tenants")
+ public StratosAdminResponse removeSubscriptionDomains(@PathParam("cartridgeType") String cartridgeType,
+ @PathParam("subscriptionAlias") String subscriptionAlias,
+ SubscriptionDomainRequest request) throws RestAPIException {
+
+ return ServiceUtils.removeSubscriptionDomains(getConfigContext(), cartridgeType, subscriptionAlias, request.getDomains());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.rest.endpoint/src/main/webapp/stratos/WEB-INF/cxf-servlet.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.rest.endpoint/src/main/webapp/stratos/WEB-INF/cxf-servlet.xml b/components/org.apache.stratos.rest.endpoint/src/main/webapp/stratos/WEB-INF/cxf-servlet.xml
index fbb8b10..fac0fb9 100644
--- a/components/org.apache.stratos.rest.endpoint/src/main/webapp/stratos/WEB-INF/cxf-servlet.xml
+++ b/components/org.apache.stratos.rest.endpoint/src/main/webapp/stratos/WEB-INF/cxf-servlet.xml
@@ -68,6 +68,7 @@
<value>hostNames</value>
<value>portMappings</value>
<value>volumes</value>
+ <value>domains</value>
</list>
</property>
</bean>
[3/4] git commit: Adding SubscriptionDomainRequest.java
Posted by im...@apache.org.
Adding SubscriptionDomainRequest.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/33471692
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/33471692
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/33471692
Branch: refs/heads/4.0.0-incubating
Commit: 33471692bce9f0cd927705db03839fe8cd2d4e12
Parents: 6c34420
Author: Imesh Gunaratne <im...@apache.org>
Authored: Sun Apr 27 22:47:38 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sun Apr 27 22:47:38 2014 +0530
----------------------------------------------------------------------
.../bean/SubscriptionDomainRequest.java | 39 ++++++++++++++++++++
1 file changed, 39 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/33471692/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/bean/SubscriptionDomainRequest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/bean/SubscriptionDomainRequest.java b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/bean/SubscriptionDomainRequest.java
new file mode 100644
index 0000000..9b066cb
--- /dev/null
+++ b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/bean/SubscriptionDomainRequest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.rest.endpoint.bean;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.List;
+
+/**
+ * Defines subscription domain post request.
+ */
+@XmlRootElement(name = "subscriptionDomainRequest")
+public class SubscriptionDomainRequest {
+ private List<String> domains;
+
+ public List<String> getDomains() {
+ return domains;
+ }
+
+ public void setDomains(List<String> domains) {
+ this.domains = domains;
+ }
+}
[2/4] git commit: Introduced new functionality to add domains to
cartridge subscriptions
Posted by im...@apache.org.
Introduced new functionality to add domains to cartridge subscriptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/6c34420d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/6c34420d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/6c34420d
Branch: refs/heads/4.0.0-incubating
Commit: 6c34420de9aca70f8282ddff3d59fd56c1ece4bb
Parents: 57bffdf
Author: Imesh Gunaratne <im...@apache.org>
Authored: Sun Apr 27 21:54:39 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sun Apr 27 21:54:39 2014 +0530
----------------------------------------------------------------------
.../LoadBalancerTenantEventReceiver.java | 180 +++++----
.../LoadBalancerTopologyEventReceiver.java | 8 +-
.../conf/LoadBalancerConfiguration.java | 2 +-
.../balancer/context/LoadBalancerContext.java | 4 +-
.../context/LoadBalancerContextUtil.java | 364 ++++++++++++++++++-
.../context/map/MultiTenantClusterMap.java | 13 +-
.../TenantAwareLoadBalanceEndpoint.java | 5 +-
.../manager/CartridgeSubscriptionManager.java | 94 ++++-
.../publisher/TenantSynzhronizerTask.java | 15 +-
.../DataInsertionAndRetrievalManager.java | 25 ++
.../subscription/CartridgeSubscription.java | 26 +-
.../manager/subscription/SubscriptionData.java | 27 +-
.../utils/CartridgeSubscriptionUtils.java | 22 +-
.../messaging/domain/tenant/Subscription.java | 65 ++++
.../stratos/messaging/domain/tenant/Tenant.java | 29 +-
.../tenant/SubscriptionDomainsAddedEvent.java | 60 +++
.../tenant/SubscriptionDomainsRemovedEvent.java | 60 +++
.../event/tenant/TenantSubscribedEvent.java | 15 +-
.../event/tenant/TenantUnSubscribedEvent.java | 10 +-
.../SubscriptionDomainsAddedEventListener.java | 28 ++
...SubscriptionDomainsRemovedEventListener.java | 28 ++
...ubscriptionDomainsAddedMessageProcessor.java | 96 +++++
...scriptionDomainsRemovedMessageProcessor.java | 96 +++++
.../tenant/TenantMessageProcessorChain.java | 12 +
.../TenantSubscribedMessageProcessor.java | 8 +-
.../TenantUnSubscribedMessageProcessor.java | 2 +-
.../messaging/test/TenantDomainTest.java | 45 +++
.../rest/endpoint/bean/CartridgeInfoBean.java | 12 +-
.../rest/endpoint/services/ServiceUtils.java | 44 ++-
.../rest/endpoint/services/StratosAdmin.java | 34 +-
.../main/webapp/stratos/WEB-INF/cxf-servlet.xml | 1 +
31 files changed, 1267 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantEventReceiver.java
index ad1907a..52886d1 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantEventReceiver.java
@@ -21,24 +21,17 @@ package org.apache.stratos.load.balancer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.context.LoadBalancerContext;
+import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil;
+import org.apache.stratos.messaging.domain.tenant.Subscription;
import org.apache.stratos.messaging.domain.tenant.Tenant;
-import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.ServiceType;
import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
-import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
-import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent;
-import org.apache.stratos.messaging.listener.tenant.CompleteTenantEventListener;
-import org.apache.stratos.messaging.listener.tenant.TenantSubscribedEventListener;
-import org.apache.stratos.messaging.listener.tenant.TenantUnSubscribedEventListener;
+import org.apache.stratos.messaging.event.tenant.*;
+import org.apache.stratos.messaging.listener.tenant.*;
import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* Load balancer tenant receiver updates load balancer context according to
* incoming tenant events.
@@ -60,11 +53,22 @@ public class LoadBalancerTenantEventReceiver implements Runnable {
@Override
protected void onEvent(Event event) {
CompleteTenantEvent completeTenantEvent = (CompleteTenantEvent) event;
+ if (log.isDebugEnabled()) {
+ log.debug("Complete tenant event received");
+ }
for (Tenant tenant : completeTenantEvent.getTenants()) {
- for (String serviceName : tenant.getServiceSubscriptions()) {
- if(isMultiTenantService(serviceName)) {
- addTenantSubscriptionToLbContext(serviceName, tenant.getTenantId());
+ for (Subscription subscription : tenant.getSubscriptions()) {
+ if (isMultiTenantService(subscription.getServiceName())) {
+ LoadBalancerContextUtil.addClustersAgainstHostNamesAndTenantIds(
+ subscription.getServiceName(),
+ tenant.getTenantId(),
+ subscription.getClusterIds());
}
+
+ LoadBalancerContextUtil.addClustersAgainstDomains(
+ subscription.getServiceName(),
+ subscription.getClusterIds(),
+ subscription.getDomains());
}
}
}
@@ -73,103 +77,97 @@ public class LoadBalancerTenantEventReceiver implements Runnable {
@Override
protected void onEvent(Event event) {
TenantSubscribedEvent tenantSubscribedEvent = (TenantSubscribedEvent) event;
- if(log.isDebugEnabled()) {
- log.debug(String.format("Tenant subscribed event received: [tenant-id] %d [service] %s",
- tenantSubscribedEvent.getTenantId(), tenantSubscribedEvent.getServiceName()));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Tenant subscribed event received: [tenant-id] %d [service] %s [cluster-ids] %s",
+ tenantSubscribedEvent.getTenantId(),
+ tenantSubscribedEvent.getServiceName(),
+ tenantSubscribedEvent.getClusterIds()));
}
- if(isMultiTenantService(tenantSubscribedEvent.getServiceName())) {
- addTenantSubscriptionToLbContext(tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId());
+
+ if (isMultiTenantService(tenantSubscribedEvent.getServiceName())) {
+ LoadBalancerContextUtil.addClustersAgainstHostNamesAndTenantIds(
+ tenantSubscribedEvent.getServiceName(),
+ tenantSubscribedEvent.getTenantId(),
+ tenantSubscribedEvent.getClusterIds());
}
+
+ LoadBalancerContextUtil.addClustersAgainstDomains(
+ tenantSubscribedEvent.getServiceName(),
+ tenantSubscribedEvent.getClusterIds(),
+ tenantSubscribedEvent.getDomains());
}
});
tenantEventReceiver.addEventListener(new TenantUnSubscribedEventListener() {
@Override
protected void onEvent(Event event) {
TenantUnSubscribedEvent tenantUnSubscribedEvent = (TenantUnSubscribedEvent) event;
- if(log.isDebugEnabled()) {
- log.debug(String.format("Tenant un-subscribed event received: [tenant-id] %d [service] %s",
- tenantUnSubscribedEvent.getTenantId(), tenantUnSubscribedEvent.getServiceName()));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Tenant un-subscribed event received: [tenant-id] %d [service] %s [cluster-ids] %s",
+ tenantUnSubscribedEvent.getTenantId(),
+ tenantUnSubscribedEvent.getServiceName(),
+ tenantUnSubscribedEvent.getClusterIds()));
}
- if(isMultiTenantService(tenantUnSubscribedEvent.getServiceName())) {
- removeTenantSubscriptionFromLbContext(tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId());
+
+ if (isMultiTenantService(tenantUnSubscribedEvent.getServiceName())) {
+ LoadBalancerContextUtil.removeClustersAgainstHostNamesAndTenantIds(
+ tenantUnSubscribedEvent.getServiceName(),
+ tenantUnSubscribedEvent.getTenantId(),
+ tenantUnSubscribedEvent.getClusterIds()
+ );
}
- }
- });
- }
- private boolean isMultiTenantService(String serviceName) {
- try {
- TopologyManager.acquireReadLock();
- Service service = TopologyManager.getTopology().getService(serviceName);
- if(service != null) {
- return (service.getServiceType() == ServiceType.MultiTenant);
+ LoadBalancerContextUtil.removeClustersAgainstAllDomains(
+ tenantUnSubscribedEvent.getServiceName(),
+ tenantUnSubscribedEvent.getTenantId(),
+ tenantUnSubscribedEvent.getClusterIds());
}
- return false;
- }
- finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- private void addTenantSubscriptionToLbContext(String serviceName, int tenantId) {
- // Find cluster of tenant
- Cluster cluster = findCluster(serviceName, tenantId);
- if (cluster != null) {
- for (String hostName : cluster.getHostNames()) {
- // Add hostName, tenantId, cluster to multi-tenant map
- Map<Integer, Cluster> clusterMap = LoadBalancerContext.getInstance().getMultiTenantClusterMap().getClusters(hostName);
- if (clusterMap == null) {
- clusterMap = new HashMap<Integer, Cluster>();
- clusterMap.put(tenantId, cluster);
- LoadBalancerContext.getInstance().getMultiTenantClusterMap().addClusters(hostName, clusterMap);
- } else {
- clusterMap.put(tenantId, cluster);
- }
+ });
+ tenantEventReceiver.addEventListener(new SubscriptionDomainsAddedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ SubscriptionDomainsAddedEvent subscriptionDomainsAddedEvent = (SubscriptionDomainsAddedEvent) event;
if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster added to multi-tenant cluster map: [host-name] %s [tenant-id] %d [cluster] %s",
- hostName, tenantId, cluster.getClusterId()));
+ log.debug(String.format("Tenant subscription domains added event received: [tenant-id] %d " +
+ "[service] %s [cluster-ids] %s [domains] %s",
+ subscriptionDomainsAddedEvent.getTenantId(),
+ subscriptionDomainsAddedEvent.getServiceName(),
+ subscriptionDomainsAddedEvent.getClusterIds(),
+ subscriptionDomainsAddedEvent.getDomains()));
}
+ LoadBalancerContextUtil.addClustersAgainstDomains(
+ subscriptionDomainsAddedEvent.getServiceName(),
+ subscriptionDomainsAddedEvent.getClusterIds(),
+ subscriptionDomainsAddedEvent.getDomains());
}
- } else {
- if (log.isErrorEnabled()) {
- log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
- serviceName, tenantId));
- }
- }
- }
-
- private void removeTenantSubscriptionFromLbContext(String serviceName, int tenantId) {
- // Find cluster of tenant
- Cluster cluster = findCluster(serviceName, tenantId);
- if (cluster != null) {
- for (String hostName : cluster.getHostNames()) {
- LoadBalancerContext.getInstance().getMultiTenantClusterMap().removeClusters(hostName);
+ });
+ tenantEventReceiver.addEventListener(new SubscriptionDomainsRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ SubscriptionDomainsRemovedEvent subscriptionDomainsRemovedEvent = (SubscriptionDomainsRemovedEvent) event;
if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster removed from multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
- hostName, tenantId, cluster.getClusterId()));
+ log.debug(String.format("Tenant subscription domains removed event received: [tenant-id] %d " +
+ "[service] %s [cluster-ids] %s [domains] %s",
+ subscriptionDomainsRemovedEvent.getTenantId(),
+ subscriptionDomainsRemovedEvent.getServiceName(),
+ subscriptionDomainsRemovedEvent.getClusterIds(),
+ subscriptionDomainsRemovedEvent.getDomains()));
}
+ LoadBalancerContextUtil.removeClustersAgainstDomains(
+ subscriptionDomainsRemovedEvent.getServiceName(),
+ subscriptionDomainsRemovedEvent.getClusterIds(),
+ subscriptionDomainsRemovedEvent.getDomains());
}
- } else {
- if (log.isErrorEnabled()) {
- log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
- serviceName, tenantId));
- }
- }
+ });
}
- private Cluster findCluster(String serviceName, int tenantId) {
+ private boolean isMultiTenantService(String serviceName) {
try {
TopologyManager.acquireReadLock();
Service service = TopologyManager.getTopology().getService(serviceName);
- if (service == null) {
- throw new RuntimeException(String.format("Service not found: %s", serviceName));
- }
- for (Cluster cluster : service.getClusters()) {
- if (cluster.tenantIdInRange(tenantId)) {
- return cluster;
- }
+ if (service != null) {
+ return (service.getServiceType() == ServiceType.MultiTenant);
}
- return null;
+ return false;
} finally {
TopologyManager.releaseReadLock();
}
@@ -182,10 +180,10 @@ public class LoadBalancerTenantEventReceiver implements Runnable {
// Keep the thread live until terminated
while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
}
if (log.isInfoEnabled()) {
log.info("Load balancer tenant receiver thread terminated");
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
index 7efaa1c..0796945 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
@@ -84,7 +84,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
for (Service service : TopologyManager.getTopology().getServices()) {
for (Cluster cluster : service.getClusters()) {
if (clusterHasActiveMembers(cluster)) {
- LoadBalancerContextUtil.addClusterToLbContext(cluster);
+ LoadBalancerContextUtil.addClusterAgainstHostNames(cluster);
} else {
if (log.isDebugEnabled()) {
log.debug("Cluster does not have any active members");
@@ -128,7 +128,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
if (service != null) {
Cluster cluster = service.getCluster(memberActivatedEvent.getClusterId());
if (cluster != null) {
- LoadBalancerContextUtil.addClusterToLbContext(cluster);
+ LoadBalancerContextUtil.addClusterAgainstHostNames(cluster);
} else {
if (log.isErrorEnabled()) {
log.error(String.format("Cluster not found in topology: [service] %s [cluster] %s",
@@ -157,7 +157,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId());
if (cluster != null) {
- LoadBalancerContextUtil.removeClusterFromLbContext(cluster.getClusterId());
+ LoadBalancerContextUtil.removeClusterAgainstHostNames(cluster.getClusterId());
} else {
if (log.isWarnEnabled()) {
log.warn(String.format("Cluster not found in load balancer context: [service] %s [cluster] %s",
@@ -182,7 +182,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName());
if (service != null) {
for (Cluster cluster : service.getClusters()) {
- LoadBalancerContextUtil.removeClusterFromLbContext(cluster.getClusterId());
+ LoadBalancerContextUtil.removeClusterAgainstHostNames(cluster.getClusterId());
}
} else {
if (log.isWarnEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
index 65f7857..a246c1f 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
@@ -515,7 +515,7 @@ public class LoadBalancerConfiguration {
}
// Add cluster to load balancer context
- LoadBalancerContextUtil.addClusterToLbContext(cluster);
+ LoadBalancerContextUtil.addClusterAgainstHostNames(cluster);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java
index 90c8e9a..fe341d7 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java
@@ -59,7 +59,7 @@ public class LoadBalancerContext {
// Map<ClusterId, Cluster>
// Keep track of all clusters
private ClusterIdClusterMap clusterIdClusterMap;
- // Map<HostName, Cluster>
+ // Map<Host/Domain-Name, Cluster>
// Keep tack of all clusters
private HostNameClusterMap hostNameClusterMap;
// Map<HostName, Map<TenantId, Cluster>>
@@ -183,6 +183,6 @@ public class LoadBalancerContext {
}
public MultiTenantClusterMap getMultiTenantClusterMap() {
- return multiTenantClusterMap;
+ return multiTenantClusterMap;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java
index 037a7c3..c65d29e 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java
@@ -21,17 +21,29 @@ package org.apache.stratos.load.balancer.context;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.tenant.Subscription;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
/**
* Load balancer context utility class.
*/
public class LoadBalancerContextUtil {
private static final Log log = LogFactory.getLog(LoadBalancerContextUtil.class);
- public static void addClusterToLbContext(Cluster cluster) {
+ /**
+ * Add cluster against its host names.
+ *
+ * @param cluster
+ */
+ public static void addClusterAgainstHostNames(Cluster cluster) {
if (cluster == null)
return;
@@ -48,16 +60,21 @@ public class LoadBalancerContextUtil {
// Add cluster to HostNameClusterMap
for (String hostName : cluster.getHostNames()) {
- if (!LoadBalancerContext.getInstance().getHostNameClusterMap().containsCluster((hostName))) {
- LoadBalancerContext.getInstance().getHostNameClusterMap().addCluster(hostName, cluster);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster added to hostname -> cluster map: [hostname] %s [cluster] %s ", hostName, cluster.getClusterId()));
- }
+ addClusterToHostNameClusterMap(hostName, cluster);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster added to host/domain name -> cluster map: [hostName] %s [cluster] %s",
+ hostName, cluster.getClusterId()));
}
}
}
- public static void removeClusterFromLbContext(String clusterId) {
+ /**
+ * Remove cluster mapped against its host names.
+ *
+ * @param clusterId
+ */
+ public static void removeClusterAgainstHostNames(String clusterId) {
Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterId);
if (cluster == null) {
return;
@@ -70,11 +87,10 @@ public class LoadBalancerContextUtil {
// Remove cluster from HostNameClusterMap
for (String hostName : cluster.getHostNames()) {
- if (LoadBalancerContext.getInstance().getHostNameClusterMap().containsCluster(hostName)) {
- LoadBalancerContext.getInstance().getHostNameClusterMap().removeCluster(hostName);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster removed from hostname -> cluster map: [hostname] %s [cluster] %s ", hostName, cluster.getClusterId()));
- }
+ removeClusterFromHostNameClusterMap(hostName, cluster);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster removed from host/domain name -> clusters map: [host-name] %s [cluster] %s",
+ hostName, cluster.getClusterId()));
}
}
@@ -84,4 +100,328 @@ public class LoadBalancerContextUtil {
log.debug(String.format("Cluster removed from cluster-id -> cluster map: [cluster] %s ", cluster.getClusterId()));
}
}
+
+ /**
+ * Add clusters against host names, tenant id for the given service, cluster ids.
+ *
+ * @param serviceName
+ * @param tenantId
+ * @param clusterIds
+ */
+ public static void addClustersAgainstHostNamesAndTenantIds(String serviceName, int tenantId, Set<String> clusterIds) {
+ try {
+ TopologyManager.acquireReadLock();
+
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if (service == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Service not found in topology: [service] %s", serviceName));
+ }
+ return;
+ }
+ Cluster cluster;
+ for (String clusterId : clusterIds) {
+ cluster = service.getCluster(clusterId);
+ if (cluster != null) {
+ // Add cluster against host names and tenant id
+ addClusterAgainstHostNamesAndTenantId(serviceName, tenantId, cluster);
+ } else {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not found in service: [service] %s [cluster] %s", serviceName, clusterId));
+ }
+ }
+ }
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ /**
+ * Remove clusters mapped against host names and tenant id.
+ *
+ * @param serviceName
+ * @param tenantId
+ */
+ public static void removeClustersAgainstHostNamesAndTenantIds(String serviceName, int tenantId, Set<String> clusterIds) {
+ try {
+ TopologyManager.acquireReadLock();
+
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if (service == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Service not found in topology: [service] %s", serviceName));
+ }
+ return;
+ }
+ Cluster cluster;
+ for (String clusterId : clusterIds) {
+ cluster = service.getCluster(clusterId);
+ if (cluster != null) {
+ // Remove cluster mapped against host names and tenant id
+ removeClusterAgainstHostNamesAndTenantId(serviceName, tenantId, cluster);
+ } else {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not found in service: [service] %s [cluster] %s", serviceName, clusterId));
+ }
+ }
+ }
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ /**
+ * Add clusters against domains for the given service, cluster ids.
+ *
+ * @param serviceName
+ * @param clusterIds
+ * @param domains
+ */
+ public static void addClustersAgainstDomains(String serviceName, Set<String> clusterIds, Set<String> domains) {
+ try {
+ TopologyManager.acquireReadLock();
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if (service == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Service not found in topology: [service] %s", serviceName));
+ }
+ return;
+ }
+ Cluster cluster;
+ for (String clusterId : clusterIds) {
+ cluster = service.getCluster(clusterId);
+ if (cluster != null) {
+ addClusterAgainstDomains(serviceName, cluster, domains);
+ } else {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not found in service: [service] %s [cluster] %s", serviceName, clusterId));
+ }
+ }
+ }
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ /**
+ * Remove clusters mapped against domains for the given service, cluster ids.
+ *
+ * @param serviceName
+ * @param clusterIds
+ * @param domains
+ */
+ public static void removeClustersAgainstDomains(String serviceName, Set<String> clusterIds, Set<String> domains) {
+ try {
+ TopologyManager.acquireReadLock();
+
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if (service == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Service not found in topology: [service] %s", serviceName));
+ }
+ return;
+ }
+ Cluster cluster;
+ for (String clusterId : clusterIds) {
+ cluster = service.getCluster(clusterId);
+ if (cluster != null) {
+ // Remove clusters mapped against domain names
+ removeClusterAgainstDomains(cluster, domains);
+ } else {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not found in service: [service] %s [cluster] %s", serviceName, clusterId));
+ }
+ }
+ }
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ /**
+ * Find cluster from service name, tenant id.
+ * Acquire a topology manager read lock appropriately.
+ *
+ * @param serviceName
+ * @param tenantId
+ * @return
+ */
+ private static Cluster findCluster(String serviceName, int tenantId) {
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if (service == null) {
+ throw new RuntimeException(String.format("Service not found: %s", serviceName));
+ }
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.tenantIdInRange(tenantId)) {
+ return cluster;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Add clusters against host names and tenant id to load balancer context.
+ *
+ * @param serviceName
+ * @param tenantId
+ * @param cluster
+ */
+ private static void addClusterAgainstHostNamesAndTenantId(String serviceName, int tenantId, Cluster cluster) {
+ // Add clusters against host names
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Adding cluster to multi-tenant cluster map against host names: [service] %s " +
+ "[tenant-id] %d [cluster] %s", serviceName, tenantId, cluster.getClusterId()));
+ }
+ for (String hostName : cluster.getHostNames()) {
+ addClusterToMultiTenantClusterMap(hostName, tenantId, cluster);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster added to multi-tenant cluster map: [host-name] %s [tenant-id] %d [cluster] %s",
+ hostName, tenantId, cluster.getClusterId()));
+ }
+ }
+ }
+
+ /**
+ * Remove clusters mapped against host names and tenant id from load balancer context.
+ *
+ * @param serviceName
+ * @param tenantId
+ * @param cluster
+ */
+ private static void removeClusterAgainstHostNamesAndTenantId(String serviceName, int tenantId, Cluster cluster) {
+ // Remove clusters mapped against host names
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Removing cluster from multi-tenant cluster map against host names: [service] %s " +
+ "[tenant-id] %d [cluster] %s", serviceName, tenantId, cluster.getClusterId()));
+ }
+ for (String hostName : cluster.getHostNames()) {
+ LoadBalancerContext.getInstance().getMultiTenantClusterMap().removeCluster(hostName, tenantId);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster removed from multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
+ hostName, tenantId, cluster.getClusterId()));
+ }
+ }
+ }
+
+
+ /**
+ * Add clusters against domains to load balancer context.
+ *
+ * @param serviceName
+ * @param cluster
+ * @param domains
+ */
+ private static void addClusterAgainstDomains(String serviceName, Cluster cluster, Set<String> domains) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Adding cluster to host/domain name -> cluster map against domain names: [service] %s " +
+ "[domains] %s [cluster] %s", serviceName, domains, cluster.getClusterId()));
+ }
+ if ((domains != null) && (domains.size() > 0)) {
+ for (String domain : domains) {
+ addClusterToHostNameClusterMap(domain, cluster);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster added to host/domain name -> cluster map: [domain-name] %s [cluster] %s",
+ domain, cluster.getClusterId()));
+ }
+ }
+ }
+ }
+
+ /**
+ * Remove clusters mapped against all subscription domain names for the given service, tenant, cluster ids.
+ *
+ * @param serviceName
+ * @param tenantId
+ * @param clusterIds
+ */
+ public static void removeClustersAgainstAllDomains(String serviceName, int tenantId, Set<String> clusterIds) {
+ try {
+ TenantManager.acquireReadLock();
+ TopologyManager.acquireReadLock();
+
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if (service == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Service not found in topology: [service] %s", serviceName));
+ }
+ return;
+ }
+ for (String clusterId : clusterIds) {
+ Cluster cluster = service.getCluster(clusterId);
+ Tenant tenant = TenantManager.getInstance().getTenant(tenantId);
+ if (tenant != null) {
+ for (Subscription subscription : tenant.getSubscriptions()) {
+ if (subscription.getServiceName().equals(serviceName)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Removing cluster from host/domain name -> cluster map against domain names: [service] %s " +
+ "[tenant-id] %d [domains] %s", serviceName, tenantId, subscription.getDomains()));
+ }
+ removeClusterAgainstDomains(cluster, subscription.getDomains());
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Tenant not subscribed to service: %s", serviceName));
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ TopologyManager.releaseReadLock();
+ TenantManager.releaseReadLock();
+ }
+ }
+
+ private static void removeClusterAgainstDomains(Cluster cluster, Set<String> domains) {
+ for (String domain : domains) {
+ removeClusterFromHostNameClusterMap(domain, cluster);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster removed from host/domain name -> cluster map: [domain-name] %s [cluster] %s",
+ domain, cluster.getClusterId()));
+ }
+ }
+ }
+
+ /**
+ * Add cluster to host/domain name cluster map.
+ *
+ * @param hostName
+ * @param cluster
+ */
+ private static void addClusterToHostNameClusterMap(String hostName, Cluster cluster) {
+ if (!LoadBalancerContext.getInstance().getHostNameClusterMap().containsCluster((hostName))) {
+ LoadBalancerContext.getInstance().getHostNameClusterMap().addCluster(hostName, cluster);
+ }
+ }
+
+ /**
+ * Remove cluseter from host/domain names cluster map.
+ *
+ * @param hostName
+ * @param cluster
+ */
+ private static void removeClusterFromHostNameClusterMap(String hostName, Cluster cluster) {
+ if (LoadBalancerContext.getInstance().getHostNameClusterMap().containsCluster(hostName)) {
+ LoadBalancerContext.getInstance().getHostNameClusterMap().removeCluster(hostName);
+ }
+ }
+
+ /**
+ * Add cluster to multi-tenant cluster map.
+ *
+ * @param hostName
+ * @param tenantId
+ * @param cluster
+ */
+ private static void addClusterToMultiTenantClusterMap(String hostName, int tenantId, Cluster cluster) {
+ // Add hostName, tenantId, cluster to multi-tenant map
+ Map<Integer, Cluster> clusterMap = LoadBalancerContext.getInstance().getMultiTenantClusterMap().getClusters(hostName);
+ if (clusterMap == null) {
+ clusterMap = new HashMap<Integer, Cluster>();
+ clusterMap.put(tenantId, cluster);
+ LoadBalancerContext.getInstance().getMultiTenantClusterMap().addClusters(hostName, clusterMap);
+ } else {
+ clusterMap.put(tenantId, cluster);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/MultiTenantClusterMap.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/MultiTenantClusterMap.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/MultiTenantClusterMap.java
index 339ad02..d038fbc 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/MultiTenantClusterMap.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/MultiTenantClusterMap.java
@@ -39,7 +39,9 @@ public class MultiTenantClusterMap {
public Cluster getCluster(String hostName, int tenantId) {
Map<Integer, Cluster> clusterMap = getClusters(hostName);
if (clusterMap != null) {
- return clusterMap.get(tenantId);
+ if(clusterMap.containsKey(tenantId)) {
+ return clusterMap.get(tenantId);
+ }
}
return null;
}
@@ -63,4 +65,13 @@ public class MultiTenantClusterMap {
public void clear() {
concurrentHashMap.clear();
}
+
+ public void removeCluster(String hostName, int tenantId) {
+ Map<Integer, Cluster> clusterMap = getClusters(hostName);
+ if(clusterMap != null) {
+ if(clusterMap.containsKey(tenantId)) {
+ clusterMap.remove(tenantId);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index 3d71a0a..364e869 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -213,13 +213,14 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
String url = extractUrl(synCtx);
int tenantId = scanUrlForTenantId(url);
if (tenantExists(tenantId)) {
+ // Tenant found, find member from hostname and tenant id
member = requestDelegator.findNextMemberFromTenantId(targetHost, tenantId);
} else {
- // Multi-tenant cluster not found, try single tenant
+ // Tenant id not found in URL, find member from host name
member = requestDelegator.findNextMemberFromHostName(targetHost);
}
} else {
- // Find next member from single tenant cluster map
+ // Find next member from host name
member = requestDelegator.findNextMemberFromHostName(targetHost);
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
index e7058f5..053f234 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
@@ -46,14 +46,17 @@ import org.apache.stratos.manager.topology.model.TopologyClusterInformationModel
import org.apache.stratos.manager.utils.ApplicationManagementUtil;
import org.apache.stratos.manager.utils.CartridgeConstants;
import org.apache.stratos.manager.utils.RepoPasswordMgtUtil;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.event.tenant.SubscriptionDomainsAddedEvent;
+import org.apache.stratos.messaging.event.tenant.SubscriptionDomainsRemovedEvent;
import org.apache.stratos.messaging.util.Constants;
import org.wso2.carbon.context.CarbonContext;
import org.apache.stratos.manager.publisher.CartridgeSubscriptionDataPublisher;
-import java.util.Collection;
-import java.util.Random;
+import java.util.*;
/**
* Manager class for the purpose of managing CartridgeSubscriptionInfo subscriptions, groupings, etc.
@@ -195,7 +198,7 @@ public class CartridgeSubscriptionManager {
if (lbDataContext.getLbCategory() == null || lbDataContext.getLbCategory().equals(Constants.NO_LOAD_BALANCER)) {
- // no load balancer subscription requiredgenerateSubscriptionKey
+ // no load balancer subscription required generate SubscriptionKey
log.info("No LB subscription required for the Subscription with alias: " + subscriptionData.getCartridgeAlias() + ", type: " +
subscriptionData.getCartridgeType());
return null;
@@ -246,7 +249,7 @@ public class CartridgeSubscriptionManager {
// create subscription
cartridgeSubscription.createSubscription(subscriber, lbAlias, lbDataContext.getAutoscalePolicy(),
- lbDataContext.getDeploymentPolicy(), repository);
+ lbDataContext.getDeploymentPolicy(), repository, new HashSet<String>());
// add LB category to the payload
if (cartridgeSubscription.getPayloadData() != null) {
@@ -315,7 +318,7 @@ public class CartridgeSubscriptionManager {
//create subscription
cartridgeSubscription.createSubscription(subscriber, subscriptionData.getCartridgeAlias(), subscriptionData.getAutoscalingPolicyName(),
- subscriptionData.getDeploymentPolicyName(), repository);
+ subscriptionData.getDeploymentPolicyName(), repository, subscriptionData.getDomains());
// publishing to bam
CartridgeSubscriptionDataPublisher.publish(
@@ -376,12 +379,85 @@ public class CartridgeSubscriptionManager {
// Publish tenant subscribed event to message broker
CartridgeSubscriptionUtils.publishTenantSubscribedEvent(cartridgeSubscription.getSubscriber().getTenantId(),
- cartridgeSubscription.getCartridgeInfo().getType());
+ cartridgeSubscription.getCartridgeInfo().getType(), new HashSet<String>(cartridgeSubscription.getCluster().getId()), cartridgeSubscription.getDomains());
return ApplicationManagementUtil.
createSubscriptionResponse(cartridgeSubscriptionInfo, cartridgeSubscription.getRepository());
}
+ public void addSubscriptionDomains(int tenantId, String subscriptionAlias, List<String> domains)
+ throws ADCException {
+
+ CartridgeSubscription cartridgeSubscription;
+ try {
+ cartridgeSubscription = getCartridgeSubscription(tenantId, subscriptionAlias);
+ if(cartridgeSubscription == null) {
+ throw new ADCException("Cartridge subscription not found");
+ }
+ cartridgeSubscription.addDomains(new HashSet<String>(domains));
+ new DataInsertionAndRetrievalManager().cacheAndUpdateSubscription(cartridgeSubscription);
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Could not add domains to cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias +
+ " [domains] " + domains;
+ log.error(errorMsg);
+ throw new ADCException(errorMsg, e);
+ }
+
+ log.info("Successfully added domains to cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias +
+ " [domains] " + domains);
+
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
+ SubscriptionDomainsAddedEvent event = new SubscriptionDomainsAddedEvent(tenantId, cartridgeSubscription.getType(),
+ new HashSet<String>(cartridgeSubscription.getCluster().getId()),
+ new HashSet<String>(domains));
+ eventPublisher.publish(event);
+ }
+
+ public void removeSubscriptionDomains(int tenantId, String subscriptionAlias, List<String> domains)
+ throws ADCException {
+
+ CartridgeSubscription cartridgeSubscription;
+ try {
+ cartridgeSubscription = getCartridgeSubscription(tenantId, subscriptionAlias);
+ if(cartridgeSubscription == null) {
+ throw new ADCException("Cartridge subscription not found");
+ }
+ cartridgeSubscription.removeDomains(new HashSet<String>(domains));
+ new DataInsertionAndRetrievalManager().cacheAndUpdateSubscription(cartridgeSubscription);
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Could not remove domains from cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias +
+ " [domains] " + domains;
+ log.error(errorMsg);
+ throw new ADCException(errorMsg, e);
+ }
+
+ log.info("Successfully removed domains from cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias +
+ " [domains] " + domains);
+
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
+ SubscriptionDomainsRemovedEvent event = new SubscriptionDomainsRemovedEvent(tenantId, cartridgeSubscription.getType(),
+ new HashSet<String>(cartridgeSubscription.getCluster().getId()),
+ new HashSet<String>(domains));
+ eventPublisher.publish(event);
+ }
+
+ public List<String> getSubscriptionDomains(int tenantId, String subscriptionAlias)
+ throws ADCException {
+
+ try {
+ CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(tenantId, subscriptionAlias);
+ if(cartridgeSubscription == null) {
+ throw new ADCException("Cartridge subscription not found");
+ }
+ Set<String> domains = cartridgeSubscription.getDomains();
+ return new ArrayList<String>(domains != null ? domains : new ArrayList<String>());
+ } catch (Exception e) {
+ String errorMsg = "Could not get domains of cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias;
+ log.error(errorMsg);
+ throw new ADCException(errorMsg, e);
+ }
+ }
+
public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId, String type) throws ADCException {
if (type == null || type.isEmpty()) {
@@ -430,8 +506,10 @@ public class CartridgeSubscriptionManager {
}
// Publish tenant un-subscribed event to message broker
- CartridgeSubscriptionUtils.publishTenantUnSubscribedEvent(cartridgeSubscription.getSubscriber().getTenantId(),
- cartridgeSubscription.getCartridgeInfo().getType());
+ CartridgeSubscriptionUtils.publishTenantUnSubscribedEvent(
+ cartridgeSubscription.getSubscriber().getTenantId(),
+ cartridgeSubscription.getCartridgeInfo().getType(),
+ new HashSet<String>(cartridgeSubscription.getCluster().getId()));
// publishing to the unsubscribed event details to bam
CartridgeSubscriptionDataPublisher.publish(cartridgeSubscription
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
index 3eac3f5..a8f0cf0 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
@@ -26,16 +26,14 @@ import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
import org.apache.stratos.manager.subscription.CartridgeSubscription;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.domain.tenant.Subscription;
import org.apache.stratos.messaging.domain.tenant.Tenant;
import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
import org.apache.stratos.messaging.util.Constants;
import org.wso2.carbon.ntask.core.Task;
import org.wso2.carbon.user.core.tenant.TenantManager;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* Tenant synchronizer task for publishing complete tenant event periodically
@@ -71,12 +69,15 @@ public class TenantSynzhronizerTask implements Task {
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Collection<CartridgeSubscription> cartridgeSubscriptions = new DataInsertionAndRetrievalManager().getCartridgeSubscriptions(tenant.getTenantId());
if (cartridgeSubscriptions != null && !cartridgeSubscriptions.isEmpty()) {
- for (CartridgeSubscription subscription : cartridgeSubscriptions) {
+ for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) {
if(log.isDebugEnabled()) {
log.debug(String.format("Tenant subscription found: [tenant-id] %d [tenant-domain] %s [service] %s",
- carbonTenant.getId(), carbonTenant.getDomain(), subscription.getType()));
+ carbonTenant.getId(), carbonTenant.getDomain(), cartridgeSubscription.getType()));
}
- tenant.addServiceSubscription(subscription.getType());
+ Subscription subscription = new Subscription(cartridgeSubscription.getType(),
+ new HashSet<String>(cartridgeSubscription.getCluster().getId()),
+ cartridgeSubscription.getDomains());
+ tenant.addSubscription(subscription);
}
}
tenants.add(tenant);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java
index 386758f..0504fd3 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java
@@ -68,6 +68,31 @@ public class DataInsertionAndRetrievalManager {
}
}
+ public void cacheAndUpdateSubscription(CartridgeSubscription cartridgeSubscription) throws PersistenceManagerException {
+
+ // get the write lock
+ LookupDataHolder.getInstance().acquireWriteLock();
+
+ try {
+ // store in LookupDataHolder
+ LookupDataHolder.getInstance().putSubscription(cartridgeSubscription);
+
+ try {
+ // store in Persistence Manager
+ persistenceManager.persistCartridgeSubscription(cartridgeSubscription);
+
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Error in updating cartridge subscription in persistence manager";
+ log.error(errorMsg, e);
+ throw e;
+ }
+
+ } finally {
+ // release the write lock
+ LookupDataHolder.getInstance().releaseWriteLock();
+ }
+ }
+
public void removeSubscription (int tenantId, String subscriptionAlias) throws PersistenceManagerException {
CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(tenantId, subscriptionAlias);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/CartridgeSubscription.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/CartridgeSubscription.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/CartridgeSubscription.java
index e143bc3..f32cd2f 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/CartridgeSubscription.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/CartridgeSubscription.java
@@ -34,7 +34,7 @@ import org.apache.stratos.manager.utils.ApplicationManagementUtil;
import org.apache.stratos.manager.utils.CartridgeConstants;
import java.io.Serializable;
-import java.util.Map;
+import java.util.*;
public abstract class CartridgeSubscription implements Serializable {
@@ -58,7 +58,7 @@ public abstract class CartridgeSubscription implements Serializable {
//private List<String> connectedSubscriptionAliases;
private String subscriptionKey;
private SubscriptionTenancyBehaviour subscriptionTenancyBehaviour;
-
+ private Set<String> domains;
/**
* Constructor
@@ -79,6 +79,7 @@ public abstract class CartridgeSubscription implements Serializable {
//this.setSubscriptionStatus(CartridgeConstants.SUBSCRIBED);
//this.connectedSubscriptionAliases = new ArrayList<String>();
this.setSubscriptionTenancyBehaviour(subscriptionTenancyBehaviour);
+ this.domains = new HashSet<String>();
}
/**
@@ -102,7 +103,7 @@ public abstract class CartridgeSubscription implements Serializable {
* @throws org.apache.stratos.manager.exception.RepositoryTransportException
*/
public void createSubscription (Subscriber subscriber, String alias, String autoscalingPolicy,
- String deploymentPolicyName, Repository repository)
+ String deploymentPolicyName, Repository repository, Set<String> domains)
throws ADCException, PolicyException, UnregisteredCartridgeException, InvalidCartridgeAliasException,
DuplicateCartridgeAliasException, RepositoryRequiredException, AlreadySubscribedException,
RepositoryCredentialsRequiredException, InvalidRepositoryException, RepositoryTransportException {
@@ -112,11 +113,28 @@ public abstract class CartridgeSubscription implements Serializable {
setAutoscalingPolicyName(autoscalingPolicy);
setDeploymentPolicyName(deploymentPolicyName);
setRepository(repository);
+ addDomains(domains);
setPayloadData(getSubscriptionTenancyBehaviour().create(getAlias(), getCluster(), getSubscriber(), getRepository(), getCartridgeInfo(),
getSubscriptionKey(), getCustomPayloadEntries()));
}
+ public void addDomains(Set<String> domains) {
+ domains.addAll(domains);
+ }
+
+ public void removeDomain(String domain) {
+ domains.remove(domain);
+ }
+
+ public void removeDomains(Set<String> domains) {
+ domains.removeAll(domains);
+ }
+
+ public Set<String> getDomains() {
+ return Collections.unmodifiableSet(domains);
+ }
+
/**
* Unsubscribe from this cartridge subscription
*
@@ -387,7 +405,7 @@ public abstract class CartridgeSubscription implements Serializable {
", alias=" + alias + ", autoscalingPolicyName=" + autoscalingPolicyName +
", deploymentPolicyName=" + deploymentPolicyName + ", subscriber=" + subscriber +
", repository=" + repository + ", cartridgeInfo=" + cartridgeInfo + ", payload=" +
- payloadData + ", cluster=" + cluster + "]";
+ payloadData + ", cluster=" + cluster + "]" + ", domains=" + domains.toString();
}
public String getLbClusterId() {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/SubscriptionData.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/SubscriptionData.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/SubscriptionData.java
index e267c24..cf4d91b 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/SubscriptionData.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/SubscriptionData.java
@@ -20,12 +20,15 @@ package org.apache.stratos.manager.subscription;
import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
/**
* This holds the data that are gathered at the time of subscription. This is usefull when passing subscription details to the method calls.
*/
public class SubscriptionData {
-
private String cartridgeType;
private String cartridgeAlias;
private String autoscalingPolicyName;
@@ -42,6 +45,11 @@ public class SubscriptionData {
private PersistenceContext persistanceCtxt;
private boolean isCommitsEnabled;
private String serviceGroup;
+ private Set<String> domains;
+
+ public SubscriptionData() {
+ this.domains = new HashSet<String>();
+ }
public String getCartridgeType() {
return cartridgeType;
@@ -170,5 +178,20 @@ public class SubscriptionData {
public void setServiceGroup(String serviceGroup) {
this.serviceGroup = serviceGroup;
}
-
+
+ public void addDomains(Set<String> domains) {
+ domains.addAll(domains);
+ }
+
+ public void removeDomain(String domain) {
+ domains.remove(domain);
+ }
+
+ public void removeDomains(Set<String> domains) {
+ domains.removeAll(domains);
+ }
+
+ public Set<String> getDomains() {
+ return Collections.unmodifiableSet(domains);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
index a5c5517..20e23e1 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
@@ -44,6 +44,8 @@ import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent;
import org.apache.stratos.messaging.util.Constants;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.regex.Pattern;
@@ -153,12 +155,16 @@ public class CartridgeSubscriptionUtils {
static class TenantSubscribedEventPublisher implements Runnable {
- int tenantId;
- String serviceName;
+ private int tenantId;
+ private String serviceName;
+ private Set<String> clusterIds;
+ private Set<String> domains;
- public TenantSubscribedEventPublisher(int tenantId, String service) {
+ public TenantSubscribedEventPublisher(int tenantId, String service, Set<String> clusterIds, Set<String> domains) {
this.tenantId = tenantId;
this.serviceName = service;
+ this.clusterIds = clusterIds;
+ this.domains = domains;
}
@Override
public void run() {
@@ -166,7 +172,7 @@ public class CartridgeSubscriptionUtils {
if(log.isInfoEnabled()) {
log.info(String.format("Publishing tenant subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName));
}
- TenantSubscribedEvent subscribedEvent = new TenantSubscribedEvent(tenantId, serviceName);
+ TenantSubscribedEvent subscribedEvent = new TenantSubscribedEvent(tenantId, serviceName, clusterIds, domains);
EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(subscribedEvent);
} catch (Exception e) {
@@ -178,7 +184,7 @@ public class CartridgeSubscriptionUtils {
}
}
- public static void publishTenantSubscribedEvent(int tenantId, String serviceName) {
+ public static void publishTenantSubscribedEvent(int tenantId, String serviceName, Set<String> clusterIds, Set<String> domains) {
Executor exec = new Executor() {
@@ -188,15 +194,15 @@ public class CartridgeSubscriptionUtils {
}
};
- exec.execute(new TenantSubscribedEventPublisher(tenantId, serviceName));
+ exec.execute(new TenantSubscribedEventPublisher(tenantId, serviceName, clusterIds, domains));
}
- public static void publishTenantUnSubscribedEvent(int tenantId, String serviceName) {
+ public static void publishTenantUnSubscribedEvent(int tenantId, String serviceName, Set<String> clusterIds) {
try {
if(log.isInfoEnabled()) {
log.info(String.format("Publishing tenant un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName));
}
- TenantUnSubscribedEvent event = new TenantUnSubscribedEvent(tenantId, serviceName);
+ TenantUnSubscribedEvent event = new TenantUnSubscribedEvent(tenantId, serviceName, clusterIds);
EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Subscription.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Subscription.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Subscription.java
new file mode 100644
index 0000000..36fca2b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Subscription.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.domain.tenant;
+
+import java.util.*;
+
+/**
+ * Tenant's service subscription.
+ */
+public class Subscription {
+ private final String serviceName;
+ private final Set<String> clusterIds;
+ private final Set<String> domains;
+
+ public Subscription(String serviceName, Set<String> clusterIds, Set<String> domains) {
+ this.serviceName = serviceName;
+ this.clusterIds = clusterIds;
+ this.domains = (domains != null) ? domains : new HashSet<String>();
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public Set<String> getClusterIds() {
+ return Collections.unmodifiableSet(clusterIds);
+ }
+
+ public void addDomain(String domain) {
+ domains.add(domain);
+ }
+
+ public void addDomains(Set<String> domains) {
+ domains.addAll(domains);
+ }
+
+ public void removeDomain(String domain) {
+ domains.remove(domain);
+ }
+
+ public void removeDomains(Set<String> domains) {
+ domains.removeAll(domains);
+ }
+
+ public Set<String> getDomains() {
+ return Collections.unmodifiableSet(domains);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
index bc4244a..ed20bd0 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
@@ -32,13 +32,13 @@ public class Tenant implements Serializable{
private int tenantId;
private String tenantDomain;
- // Map<ServiceName, Subscribed>
- private Map<String, Boolean> serviceNameMap;
+ // Map<ServiceName, Subscription>
+ private Map<String, Subscription> serviceNameSubscriptionMap;
public Tenant(int tenantId, String tenantDomain) {
this.tenantId = tenantId;
this.tenantDomain = tenantDomain;
- this.serviceNameMap = new HashMap<String, Boolean>();
+ this.serviceNameSubscriptionMap = new HashMap<String, Subscription>();
}
public int getTenantId() {
@@ -53,19 +53,26 @@ public class Tenant implements Serializable{
this.tenantDomain = tenantDomain;
}
- public Collection<String> getServiceSubscriptions() {
- return serviceNameMap.keySet();
+ public Subscription getSubscription(String serviceName) {
+ if(serviceNameSubscriptionMap.containsKey(serviceName)) {
+ return serviceNameSubscriptionMap.get(serviceName);
+ }
+ return null;
}
- public boolean isServiceSubscribed(String serviceName) {
- return serviceNameMap.containsKey(serviceName);
+ public Collection<Subscription> getSubscriptions() {
+ return serviceNameSubscriptionMap.values();
}
- public void addServiceSubscription(String serviceName) {
- serviceNameMap.put(serviceName, true);
+ public boolean isSubscribed(String serviceName) {
+ return serviceNameSubscriptionMap.containsKey(serviceName);
}
- public void removeServiceSubscription(String serviceName) {
- serviceNameMap.remove(serviceName);
+ public void addSubscription(Subscription subscription) {
+ serviceNameSubscriptionMap.put(subscription.getServiceName(), subscription);
+ }
+
+ public void removeSubscription(String serviceName) {
+ serviceNameSubscriptionMap.remove(serviceName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsAddedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsAddedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsAddedEvent.java
new file mode 100644
index 0000000..312571a
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsAddedEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.tenant;
+
+import org.apache.stratos.messaging.event.Event;
+
+import java.io.Serializable;
+import java.util.*;
+
+/**
+ * This event is fired when domains are added to a tenant subscription.
+ */
+public class SubscriptionDomainsAddedEvent extends Event implements Serializable {
+ private static final long serialVersionUID = 3457484382856403382L;
+
+ private final int tenantId;
+ private final String serviceName;
+ private final Set<String> clusterIds;
+ private final Set<String> domains;
+
+ public SubscriptionDomainsAddedEvent(int tenantId, String serviceName, Set<String> clusterIds, Set<String> domains) {
+ this.tenantId = tenantId;
+ this.serviceName = serviceName;
+ this.clusterIds = clusterIds;
+ this.domains = (domains != null) ? domains : new HashSet<String>();
+ }
+
+ public int getTenantId() {
+ return tenantId;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public Set<String> getClusterIds() {
+ return Collections.unmodifiableSet(clusterIds);
+ }
+
+ public Set<String> getDomains() {
+ return Collections.unmodifiableSet(domains);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsRemovedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsRemovedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsRemovedEvent.java
new file mode 100644
index 0000000..3cc5664
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsRemovedEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.tenant;
+
+import org.apache.stratos.messaging.event.Event;
+
+import java.io.Serializable;
+import java.util.*;
+
+/**
+ * This event is fired when domains are removed from a tenant subscription.
+ */
+public class SubscriptionDomainsRemovedEvent extends Event implements Serializable {
+ private static final long serialVersionUID = -8837521344795740210L;
+
+ private final int tenantId;
+ private final String serviceName;
+ private final Set<String> clusterIds;
+ private Set<String> domains;
+
+ public SubscriptionDomainsRemovedEvent(int tenantId, String serviceName, Set<String> clusterIds, Set<String> domains) {
+ this.tenantId = tenantId;
+ this.serviceName = serviceName;
+ this.clusterIds = clusterIds;
+ this.domains = (domains != null) ? domains : new HashSet<String>();
+ }
+
+ public int getTenantId() {
+ return tenantId;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public Set<String> getClusterIds() {
+ return Collections.unmodifiableSet(clusterIds);
+ }
+
+ public Set<String> getDomains() {
+ return Collections.unmodifiableSet(domains);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantSubscribedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantSubscribedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantSubscribedEvent.java
index 2be1ad3..bf0e288 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantSubscribedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantSubscribedEvent.java
@@ -20,6 +20,7 @@
package org.apache.stratos.messaging.event.tenant;
import java.io.Serializable;
+import java.util.*;
/**
* This event is fired when a tenant is subscribed to a service.
@@ -29,10 +30,14 @@ public class TenantSubscribedEvent extends TenantEvent implements Serializable {
private final int tenantId;
private final String serviceName;
+ private final Set<String> clusterIds;
+ private final Set<String> domains;
- public TenantSubscribedEvent(int tenantId, String serviceName) {
+ public TenantSubscribedEvent(int tenantId, String serviceName, Set<String> clusterIds, Set<String> domains) {
this.tenantId = tenantId;
this.serviceName = serviceName;
+ this.clusterIds = clusterIds;
+ this.domains = (domains != null) ? domains : new HashSet<String>();
}
public int getTenantId() {
@@ -42,4 +47,12 @@ public class TenantSubscribedEvent extends TenantEvent implements Serializable {
public String getServiceName() {
return serviceName;
}
+
+ public Set<String> getClusterIds() {
+ return Collections.unmodifiableSet(clusterIds);
+ }
+
+ public Set<String> getDomains() {
+ return Collections.unmodifiableSet(domains);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantUnSubscribedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantUnSubscribedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantUnSubscribedEvent.java
index 2262cd4..5cfcfdf 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantUnSubscribedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantUnSubscribedEvent.java
@@ -20,6 +20,8 @@
package org.apache.stratos.messaging.event.tenant;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.Set;
/**
* This event is fired when a tenant is un-subscribed from a service.
@@ -29,10 +31,12 @@ public class TenantUnSubscribedEvent extends TenantEvent implements Serializable
private final int tenantId;
private final String serviceName;
+ private final Set<String> clusterIds;
- public TenantUnSubscribedEvent(int tenantId, String serviceName) {
+ public TenantUnSubscribedEvent(int tenantId, String serviceName, Set<String> clusterIds) {
this.tenantId = tenantId;
this.serviceName = serviceName;
+ this.clusterIds = clusterIds;
}
public int getTenantId() {
@@ -42,4 +46,8 @@ public class TenantUnSubscribedEvent extends TenantEvent implements Serializable
public String getServiceName() {
return serviceName;
}
+
+ public Set<String> getClusterIds() {
+ return Collections.unmodifiableSet(clusterIds);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsAddedEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsAddedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsAddedEventListener.java
new file mode 100644
index 0000000..3f169b7
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsAddedEventListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.listener.tenant;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * Tenant subscription domains added event listener.
+ */
+public abstract class SubscriptionDomainsAddedEventListener extends EventListener {
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6c34420d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsRemovedEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsRemovedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsRemovedEventListener.java
new file mode 100644
index 0000000..5c81ad6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsRemovedEventListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.listener.tenant;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * Tenant subscription domains removed event listener.
+ */
+public abstract class SubscriptionDomainsRemovedEventListener extends EventListener {
+}
[4/4] git commit: Adding missing files of previous commit
Posted by im...@apache.org.
Adding missing files of previous commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/f888f854
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/f888f854
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/f888f854
Branch: refs/heads/master
Commit: f888f854c44198bcf3e2d110367ef60d69ca8fd4
Parents: 5eee96d
Author: Imesh Gunaratne <im...@apache.org>
Authored: Tue Apr 29 00:45:30 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Tue Apr 29 00:45:30 2014 +0530
----------------------------------------------------------------------
.../messaging/domain/tenant/Subscription.java | 65 +++++++++++++
.../tenant/SubscriptionDomainsAddedEvent.java | 60 ++++++++++++
.../tenant/SubscriptionDomainsRemovedEvent.java | 60 ++++++++++++
.../SubscriptionDomainsAddedEventListener.java | 28 ++++++
...SubscriptionDomainsRemovedEventListener.java | 28 ++++++
...ubscriptionDomainsAddedMessageProcessor.java | 96 ++++++++++++++++++++
...scriptionDomainsRemovedMessageProcessor.java | 96 ++++++++++++++++++++
.../messaging/test/TenantDomainTest.java | 45 +++++++++
8 files changed, 478 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f888f854/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Subscription.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Subscription.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Subscription.java
new file mode 100644
index 0000000..36fca2b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Subscription.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.domain.tenant;
+
+import java.util.*;
+
+/**
+ * Tenant's service subscription.
+ */
+public class Subscription {
+ private final String serviceName;
+ private final Set<String> clusterIds;
+ private final Set<String> domains;
+
+ public Subscription(String serviceName, Set<String> clusterIds, Set<String> domains) {
+ this.serviceName = serviceName;
+ this.clusterIds = clusterIds;
+ this.domains = (domains != null) ? domains : new HashSet<String>();
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public Set<String> getClusterIds() {
+ return Collections.unmodifiableSet(clusterIds);
+ }
+
+ public void addDomain(String domain) {
+ domains.add(domain);
+ }
+
+ public void addDomains(Set<String> domains) {
+ domains.addAll(domains);
+ }
+
+ public void removeDomain(String domain) {
+ domains.remove(domain);
+ }
+
+ public void removeDomains(Set<String> domains) {
+ domains.removeAll(domains);
+ }
+
+ public Set<String> getDomains() {
+ return Collections.unmodifiableSet(domains);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f888f854/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsAddedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsAddedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsAddedEvent.java
new file mode 100644
index 0000000..312571a
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsAddedEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.tenant;
+
+import org.apache.stratos.messaging.event.Event;
+
+import java.io.Serializable;
+import java.util.*;
+
+/**
+ * This event is fired when domains are added to a tenant subscription.
+ */
+public class SubscriptionDomainsAddedEvent extends Event implements Serializable {
+ private static final long serialVersionUID = 3457484382856403382L;
+
+ private final int tenantId;
+ private final String serviceName;
+ private final Set<String> clusterIds;
+ private final Set<String> domains;
+
+ public SubscriptionDomainsAddedEvent(int tenantId, String serviceName, Set<String> clusterIds, Set<String> domains) {
+ this.tenantId = tenantId;
+ this.serviceName = serviceName;
+ this.clusterIds = clusterIds;
+ this.domains = (domains != null) ? domains : new HashSet<String>();
+ }
+
+ public int getTenantId() {
+ return tenantId;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public Set<String> getClusterIds() {
+ return Collections.unmodifiableSet(clusterIds);
+ }
+
+ public Set<String> getDomains() {
+ return Collections.unmodifiableSet(domains);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f888f854/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsRemovedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsRemovedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsRemovedEvent.java
new file mode 100644
index 0000000..3cc5664
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/SubscriptionDomainsRemovedEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.tenant;
+
+import org.apache.stratos.messaging.event.Event;
+
+import java.io.Serializable;
+import java.util.*;
+
+/**
+ * This event is fired when domains are removed from a tenant subscription.
+ */
+public class SubscriptionDomainsRemovedEvent extends Event implements Serializable {
+ private static final long serialVersionUID = -8837521344795740210L;
+
+ private final int tenantId;
+ private final String serviceName;
+ private final Set<String> clusterIds;
+ private Set<String> domains;
+
+ public SubscriptionDomainsRemovedEvent(int tenantId, String serviceName, Set<String> clusterIds, Set<String> domains) {
+ this.tenantId = tenantId;
+ this.serviceName = serviceName;
+ this.clusterIds = clusterIds;
+ this.domains = (domains != null) ? domains : new HashSet<String>();
+ }
+
+ public int getTenantId() {
+ return tenantId;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public Set<String> getClusterIds() {
+ return Collections.unmodifiableSet(clusterIds);
+ }
+
+ public Set<String> getDomains() {
+ return Collections.unmodifiableSet(domains);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f888f854/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsAddedEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsAddedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsAddedEventListener.java
new file mode 100644
index 0000000..3f169b7
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsAddedEventListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.listener.tenant;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * Tenant subscription domains added event listener.
+ */
+public abstract class SubscriptionDomainsAddedEventListener extends EventListener {
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f888f854/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsRemovedEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsRemovedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsRemovedEventListener.java
new file mode 100644
index 0000000..5c81ad6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/tenant/SubscriptionDomainsRemovedEventListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.listener.tenant;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * Tenant subscription domains removed event listener.
+ */
+public abstract class SubscriptionDomainsRemovedEventListener extends EventListener {
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f888f854/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsAddedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsAddedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsAddedMessageProcessor.java
new file mode 100644
index 0000000..aeea6bf
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsAddedMessageProcessor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.tenant.Subscription;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.apache.stratos.messaging.event.tenant.SubscriptionDomainsAddedEvent;
+import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * Tenant subscribed message processor for adding domains to tenant subscriptions.
+ */
+public class SubscriptionDomainsAddedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(SubscriptionDomainsAddedMessageProcessor.class);
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (SubscriptionDomainsAddedEvent.class.getName().equals(type)) {
+ // Return if tenant manager has not initialized
+ if(!TenantManager.getInstance().isInitialized()) {
+ return false;
+ }
+
+ // Parse complete message and build event
+ SubscriptionDomainsAddedEvent event = (SubscriptionDomainsAddedEvent) Util.jsonToObject(message, TenantSubscribedEvent.class);
+
+ try {
+ TenantManager.acquireWriteLock();
+ Tenant tenant = TenantManager.getInstance().getTenant(event.getTenantId());
+ if(tenant == null) {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Tenant not found: [tenant-id] %d", event.getTenantId()));
+ }
+ return false;
+ }
+ Subscription subscription = tenant.getSubscription(event.getServiceName());
+ if(subscription == null) {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Subscription not found: [tenant-id] %d", event.getTenantId()));
+ }
+ return false;
+ }
+ subscription.addDomains(event.getDomains());
+ if(log.isInfoEnabled()) {
+ log.info(String.format("Domains added to tenant subscription: [tenant-id] %d [tenant-domain] %s [service] %s [domains] %s",
+ tenant.getTenantId(), tenant.getTenantDomain(), event.getServiceName(), event.getDomains()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+ finally {
+ TenantManager.releaseWriteLock();
+ }
+ }
+ else {
+ if(nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ }
+ else {
+ throw new RuntimeException(String.format("Failed to process tenant message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f888f854/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsRemovedMessageProcessor.java
new file mode 100644
index 0000000..fc08357
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/SubscriptionDomainsRemovedMessageProcessor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.tenant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.tenant.Subscription;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.apache.stratos.messaging.event.tenant.SubscriptionDomainsAddedEvent;
+import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * Tenant subscribed message processor for removing domains from tenant subscriptions.
+ */
+public class SubscriptionDomainsRemovedMessageProcessor extends MessageProcessor {
+
+ private static final Log log = LogFactory.getLog(SubscriptionDomainsRemovedMessageProcessor.class);
+
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (SubscriptionDomainsAddedEvent.class.getName().equals(type)) {
+ // Return if tenant manager has not initialized
+ if(!TenantManager.getInstance().isInitialized()) {
+ return false;
+ }
+
+ // Parse complete message and build event
+ SubscriptionDomainsAddedEvent event = (SubscriptionDomainsAddedEvent) Util.jsonToObject(message, TenantSubscribedEvent.class);
+
+ try {
+ TenantManager.acquireWriteLock();
+ Tenant tenant = TenantManager.getInstance().getTenant(event.getTenantId());
+ if(tenant == null) {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Tenant not found: [tenant-id] %d", event.getTenantId()));
+ }
+ return false;
+ }
+ Subscription subscription = tenant.getSubscription(event.getServiceName());
+ if(subscription == null) {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Subscription not found: [tenant-id] %d", event.getTenantId()));
+ }
+ return false;
+ }
+ subscription.removeDomains(event.getDomains());
+ if(log.isInfoEnabled()) {
+ log.info(String.format("Domains removed from tenant subscription: [tenant-id] %d [tenant-domain] %s [service] %s [domains] %s",
+ tenant.getTenantId(), tenant.getTenantDomain(), event.getServiceName(), event.getDomains()));
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+ finally {
+ TenantManager.releaseWriteLock();
+ }
+ }
+ else {
+ if(nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ }
+ else {
+ throw new RuntimeException(String.format("Failed to process tenant message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f888f854/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/TenantDomainTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/TenantDomainTest.java b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/TenantDomainTest.java
new file mode 100644
index 0000000..adb11e1
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/TenantDomainTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.test;
+
+import junit.framework.Assert;
+import org.apache.stratos.messaging.domain.tenant.Subscription;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.HashSet;
+
+/**
+ * Tenant domain model test.
+ */
+@RunWith(JUnit4.class)
+public class TenantDomainTest {
+ @Test
+ public void testSubscriptionModel() {
+ Tenant tenant = new Tenant(1, "domain.org");
+ Subscription subscription = new Subscription("subscription1", new HashSet<String>(), new HashSet<String>());
+ tenant.addSubscription(subscription);
+ Assert.assertTrue("Subscription not added", tenant.isSubscribed("subscription1"));
+ tenant.removeSubscription("subscription1");
+ Assert.assertTrue("Subscription not removed", !tenant.isSubscribed("subscription1"));
+ }
+}