You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2014/01/04 09:27:31 UTC
[05/12] git commit: registry based persistence - referring the new
implementation
registry based persistence - referring the new implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/c36bd253
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/c36bd253
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/c36bd253
Branch: refs/heads/master
Commit: c36bd253cffcf2cf44329d7bf8d31b171cdfc901
Parents: 2c0dc27
Author: Isuru <is...@wso2.com>
Authored: Sun Dec 29 08:51:15 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Sun Dec 29 08:51:15 2013 +0530
----------------------------------------------------------------------
.../adc/mgt/custom/domain/RegistryManager.java | 218 +++++++--------
.../InvalidCartridgeAliasException.java | 10 +
.../internal/ADCManagementServerComponent.java | 4 +
.../mgt/listener/InstanceStatusListener.java | 15 +-
.../adc/mgt/lookup/ClusterIdToSubscription.java | 13 +-
.../adc/mgt/lookup/LookupDataHolder.java | 120 ++++----
.../adc/mgt/lookup/SubscriptionContext.java | 18 +-
.../manager/CartridgeSubscriptionManager.java | 199 +++++++------
.../stratos/adc/mgt/payload/PayloadData.java | 2 +-
.../adc/mgt/persistence/PersistenceManager.java | 21 +-
.../RegistryBasedPersistenceManager.java | 170 ++++++++---
.../mgt/publisher/TenantSynzhronizerTask.java | 16 +-
.../adc/mgt/registry/RegistryManager.java | 36 ++-
.../DataInsertionAndRetrievalManager.java | 280 +++++++++++++------
.../service/ApplicationManagementService.java | 19 +-
.../mgt/subscription/CartridgeSubscription.java | 27 +-
.../cache/CartridgeInstanceCache.java | 2 +-
.../SubscriptionMultiTenantBehaviour.java | 16 +-
.../utils/CartridgeSubscriptionUtils.java | 39 +++
.../model/TopologyClusterInformationModel.java | 6 +-
.../StratosManagerTopologyReceiver.java | 89 +++---
.../stratos/adc/mgt/utils/Serializer.java | 8 +-
.../rest/endpoint/services/ServiceUtils.java | 4 +-
23 files changed, 810 insertions(+), 522 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/custom/domain/RegistryManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/custom/domain/RegistryManager.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/custom/domain/RegistryManager.java
index 5e3ac19..6eb28f3 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/custom/domain/RegistryManager.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/custom/domain/RegistryManager.java
@@ -1,109 +1,109 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.adc.mgt.custom.domain;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.adc.mgt.exception.ADCException;
-import org.apache.stratos.adc.mgt.exception.DomainMappingExistsException;
-import org.apache.stratos.adc.mgt.internal.DataHolder;
-import org.apache.stratos.adc.mgt.utils.CartridgeConstants;
-import org.wso2.carbon.registry.core.Collection;
-import org.wso2.carbon.registry.core.Registry;
-import org.wso2.carbon.registry.core.Resource;
-import org.wso2.carbon.registry.core.exceptions.RegistryException;
-
-public class RegistryManager {
- private static Log log = LogFactory.getLog(RegistryManager.class);
- private static Registry registry = DataHolder.getRegistryService();
-
- public RegistryManager() {
- try {
- if (!registry.resourceExists(CartridgeConstants.DomainMappingInfo.HOSTINFO)) {
- registry.put(CartridgeConstants.DomainMappingInfo.HOSTINFO,
- registry.newCollection());
- }
- } catch (RegistryException e) {
- String msg =
- "Error while accessing registry or initializing domain mapping registry path\n";
- log.error(msg + e.getMessage());
- }
- }
-
- /**
- *
- */
- public void addDomainMappingToRegistry(String hostName, String actualHost)
- throws ADCException, RegistryException, DomainMappingExistsException {
- try {
- registry.beginTransaction();
- Resource hostResource = registry.newResource();
- hostResource.addProperty(CartridgeConstants.DomainMappingInfo.ACTUAL_HOST, actualHost);
- if (!registry.resourceExists(CartridgeConstants.DomainMappingInfo.HOSTINFO +
- hostName)) {
- registry.put(CartridgeConstants.DomainMappingInfo.HOSTINFO + hostName,
- hostResource);
- } else {
- registry.rollbackTransaction();
- String msg = "Requested domain is already taken!";
- log.error(msg);
- throw new DomainMappingExistsException(msg, hostName);
- }
- registry.commitTransaction();
- } catch (RegistryException e) {
- registry.rollbackTransaction();
- throw e;
- }
- }
-
-
- /**
- *
- */
- public void removeDomainMappingFromRegistry(String actualHost) throws Exception {
- try {
- registry.beginTransaction();
- String hostResourcePath = CartridgeConstants.DomainMappingInfo.HOSTINFO;
- if (registry.resourceExists(hostResourcePath)) {
- Resource hostResource = registry.get(hostResourcePath);
- Collection hostInfoCollection;
- if(hostResource instanceof Collection){
- hostInfoCollection = (Collection) hostResource;
- } else {
- throw new Exception("Resource is not a collection " + hostResourcePath );
- }
- String[] paths = hostInfoCollection.getChildren();
- for (String path: paths){
- Resource domainMapping = registry.get(path);
- String actualHostInRegistry = domainMapping.getProperty(CartridgeConstants.DomainMappingInfo.ACTUAL_HOST);
- if(actualHostInRegistry != null && actualHost.equalsIgnoreCase(actualHostInRegistry)){
- registry.delete(path);
- }
- }
- }
- registry.commitTransaction();
- } catch (RegistryException e) {
- registry.rollbackTransaction();
- log.error("Unable to remove the mapping", e);
- throw e;
- }
- }
-
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//
+//package org.apache.stratos.adc.mgt.custom.domain;
+//
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+//import org.apache.stratos.adc.mgt.exception.ADCException;
+//import org.apache.stratos.adc.mgt.exception.DomainMappingExistsException;
+//import org.apache.stratos.adc.mgt.internal.DataHolder;
+//import org.apache.stratos.adc.mgt.utils.CartridgeConstants;
+//import org.wso2.carbon.registry.core.Collection;
+//import org.wso2.carbon.registry.core.Registry;
+//import org.wso2.carbon.registry.core.Resource;
+//import org.wso2.carbon.registry.core.exceptions.RegistryException;
+//
+//public class RegistryManager {
+// private static Log log = LogFactory.getLog(RegistryManager.class);
+// private static Registry registry = DataHolder.getRegistryService();
+//
+// public RegistryManager() {
+// try {
+// if (!registry.resourceExists(CartridgeConstants.DomainMappingInfo.HOSTINFO)) {
+// registry.put(CartridgeConstants.DomainMappingInfo.HOSTINFO,
+// registry.newCollection());
+// }
+// } catch (RegistryException e) {
+// String msg =
+// "Error while accessing registry or initializing domain mapping registry path\n";
+// log.error(msg + e.getMessage());
+// }
+// }
+//
+// /**
+// *
+// */
+// public void addDomainMappingToRegistry(String hostName, String actualHost)
+// throws ADCException, RegistryException, DomainMappingExistsException {
+// try {
+// registry.beginTransaction();
+// Resource hostResource = registry.newResource();
+// hostResource.addProperty(CartridgeConstants.DomainMappingInfo.ACTUAL_HOST, actualHost);
+// if (!registry.resourceExists(CartridgeConstants.DomainMappingInfo.HOSTINFO +
+// hostName)) {
+// registry.put(CartridgeConstants.DomainMappingInfo.HOSTINFO + hostName,
+// hostResource);
+// } else {
+// registry.rollbackTransaction();
+// String msg = "Requested domain is already taken!";
+// log.error(msg);
+// throw new DomainMappingExistsException(msg, hostName);
+// }
+// registry.commitTransaction();
+// } catch (RegistryException e) {
+// registry.rollbackTransaction();
+// throw e;
+// }
+// }
+//
+//
+// /**
+// *
+// */
+// public void removeDomainMappingFromRegistry(String actualHost) throws Exception {
+// try {
+// registry.beginTransaction();
+// String hostResourcePath = CartridgeConstants.DomainMappingInfo.HOSTINFO;
+// if (registry.resourceExists(hostResourcePath)) {
+// Resource hostResource = registry.get(hostResourcePath);
+// Collection hostInfoCollection;
+// if(hostResource instanceof Collection){
+// hostInfoCollection = (Collection) hostResource;
+// } else {
+// throw new Exception("Resource is not a collection " + hostResourcePath );
+// }
+// String[] paths = hostInfoCollection.getChildren();
+// for (String path: paths){
+// Resource domainMapping = registry.get(path);
+// String actualHostInRegistry = domainMapping.getProperty(CartridgeConstants.DomainMappingInfo.ACTUAL_HOST);
+// if(actualHostInRegistry != null && actualHost.equalsIgnoreCase(actualHostInRegistry)){
+// registry.delete(path);
+// }
+// }
+// }
+// registry.commitTransaction();
+// } catch (RegistryException e) {
+// registry.rollbackTransaction();
+// log.error("Unable to remove the mapping", e);
+// throw e;
+// }
+// }
+//
+//}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/exception/InvalidCartridgeAliasException.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/exception/InvalidCartridgeAliasException.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/exception/InvalidCartridgeAliasException.java
index 971d432..deec790 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/exception/InvalidCartridgeAliasException.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/exception/InvalidCartridgeAliasException.java
@@ -28,6 +28,8 @@ public class InvalidCartridgeAliasException extends Exception {
private final String cartridgeAlias;
+ private int tenantId;
+
public InvalidCartridgeAliasException(String message, String cartridgeType, String cartridgeAlias, Throwable cause) {
super(message, cause);
this.message = message;
@@ -42,6 +44,14 @@ public class InvalidCartridgeAliasException extends Exception {
this.cartridgeAlias = cartridgeAlias;
}
+ public InvalidCartridgeAliasException(String message, int tenantId, String cartridgeType, String cartridgeAlias) {
+ super(message);
+ this.message = message;
+ this.tenantId = tenantId;
+ this.cartridgeType = cartridgeType;
+ this.cartridgeAlias = cartridgeAlias;
+ }
+
public String getMessage() {
return message;
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
index fa262cf..14545b6 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.adc.mgt.listener.InstanceStatusListener;
import org.apache.stratos.adc.mgt.publisher.TenantEventPublisher;
import org.apache.stratos.adc.mgt.publisher.TenantSynchronizerTaskScheduler;
+import org.apache.stratos.adc.mgt.retriever.DataInsertionAndRetrievalManager;
import org.apache.stratos.adc.mgt.topology.receiver.StratosManagerTopologyReceiver;
import org.apache.stratos.adc.mgt.utils.CartridgeConfigFileReader;
import org.apache.stratos.adc.mgt.utils.StratosDBUtils;
@@ -115,6 +116,9 @@ public class ADCManagementServerComponent {
topologyReceiverThread.start();
log.info("Topology receiver thread started");
+ // retrieve persisted CartridgeSubscriptions
+ new DataInsertionAndRetrievalManager().cachePersistedSubscriptions();
+
//Component activated successfully
log.info("ADC management server component is activated");
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/InstanceStatusListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/InstanceStatusListener.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/InstanceStatusListener.java
index 37d08c1..8adf021 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/InstanceStatusListener.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/InstanceStatusListener.java
@@ -20,9 +20,9 @@ package org.apache.stratos.adc.mgt.listener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
import org.apache.stratos.adc.mgt.publisher.ArtifactUpdatePublisher;
-import org.apache.stratos.adc.mgt.utils.PersistenceManager;
+import org.apache.stratos.adc.mgt.retriever.DataInsertionAndRetrievalManager;
+import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription;
import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
import org.apache.stratos.messaging.util.Constants;
import org.apache.stratos.messaging.util.Util;
@@ -57,8 +57,9 @@ public class InstanceStatusListener implements MessageListener {
if(log.isInfoEnabled()) {
log.info("Cluster id: " + clusterId);
}
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /*CartridgeSubscriptionInfo subscription = PersistenceManager.getSubscriptionFromClusterId(clusterId);
- CartridgeSubscriptionInfo subscription = PersistenceManager.getSubscriptionFromClusterId(clusterId);
if (subscription.getRepository() != null) {
ArtifactUpdatePublisher publisher = new ArtifactUpdatePublisher(subscription.getRepository(), clusterId, String.valueOf(subscription.getTenantId()));
publisher.publish();
@@ -67,7 +68,15 @@ public class InstanceStatusListener implements MessageListener {
//TODO: make this log debug
log.info("No repository found for subscription with alias: " + subscription.getAlias() + ", type: " + subscription.getCartridge() +
". Not sending the Depsync event");
+ }*/
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ CartridgeSubscription cartridgeSubscription = new DataInsertionAndRetrievalManager().getCartridgeSubscription(clusterId);
+ if (cartridgeSubscription.getRepository() != null) {
+ ArtifactUpdatePublisher publisher = new ArtifactUpdatePublisher(cartridgeSubscription.getRepository(), clusterId,
+ String.valueOf(cartridgeSubscription.getSubscriber().getTenantId()));
+ publisher.publish();
}
+
}
} catch (Exception e) {
if(log.isErrorEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/ClusterIdToSubscription.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/ClusterIdToSubscription.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/ClusterIdToSubscription.java
index cdb5f69..95cd03f 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/ClusterIdToSubscription.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/ClusterIdToSubscription.java
@@ -26,7 +26,6 @@ import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ClusterIdToSubscription implements Serializable {
@@ -35,23 +34,13 @@ public class ClusterIdToSubscription implements Serializable {
// Map: Cluster Id (Domain) -> CartridgeSubscription
private Map<String, CartridgeSubscription> clusterIdToCartridgeSubscription;
- //locks
- private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
- private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
-
public ClusterIdToSubscription() {
clusterIdToCartridgeSubscription = new HashMap<String, CartridgeSubscription>();
}
public void addSubscription (CartridgeSubscription cartridgeSubscription) {
- writeLock.lock();
- try {
- add(cartridgeSubscription);
-
- } finally {
- writeLock.unlock();
- }
+ add(cartridgeSubscription);
}
private void add (CartridgeSubscription cartridgeSubscription) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/LookupDataHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/LookupDataHolder.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/LookupDataHolder.java
index 2db367f..86cbd48 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/LookupDataHolder.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/LookupDataHolder.java
@@ -57,100 +57,90 @@ public class LookupDataHolder implements Serializable {
return lookupDataHolder;
}
- public void put (CartridgeSubscription cartridgeSubscription) {
+ public void putSubscription (CartridgeSubscription cartridgeSubscription) {
- writeLock.lock();
-
- try {
- if (clusterIdToSubscription.getSubscription(cartridgeSubscription.getClusterDomain()) != null) {
- if(log.isDebugEnabled()) {
- log.debug("Overwriting the existing CartridgeSubscription for tenant " + cartridgeSubscription.getSubscriber().getTenantId() +
- ", alias " + cartridgeSubscription.getAlias());
- }
- }
- // add or update
- clusterIdToSubscription.addSubscription(cartridgeSubscription);
-
- // check if an existing SubscriptionContext is available
- SubscriptionContext existingSubscriptionCtx = tenantIdToSubscriptionContext.getSubscriptionContext(cartridgeSubscription.getSubscriber().getTenantId());
- if(existingSubscriptionCtx != null) {
- existingSubscriptionCtx.addSubscription(cartridgeSubscription);
-
- } else {
- //create a new subscription context and add the subscription
- SubscriptionContext subscriptionContext = new SubscriptionContext();
- subscriptionContext.addSubscription(cartridgeSubscription);
- tenantIdToSubscriptionContext.addSubscriptionContext(cartridgeSubscription.getSubscriber().getTenantId(), subscriptionContext);
+ if (clusterIdToSubscription.getSubscription(cartridgeSubscription.getClusterDomain()) != null) {
+ if(log.isDebugEnabled()) {
+ log.debug("Overwriting the existing CartridgeSubscription for tenant " + cartridgeSubscription.getSubscriber().getTenantId() +
+ ", alias " + cartridgeSubscription.getAlias());
}
-
- } finally {
- writeLock.unlock();
}
+ // add or update
+ clusterIdToSubscription.addSubscription(cartridgeSubscription);
+
+ // check if an existing SubscriptionContext is available
+ SubscriptionContext existingSubscriptionCtx = tenantIdToSubscriptionContext.getSubscriptionContext(cartridgeSubscription.getSubscriber().getTenantId());
+ if(existingSubscriptionCtx != null) {
+ existingSubscriptionCtx.addSubscription(cartridgeSubscription);
+
+ } else {
+ //create a new subscription context and add the subscription
+ SubscriptionContext subscriptionContext = new SubscriptionContext();
+ subscriptionContext.addSubscription(cartridgeSubscription);
+ tenantIdToSubscriptionContext.addSubscriptionContext(cartridgeSubscription.getSubscriber().getTenantId(), subscriptionContext);
+ }
+
}
public Collection<CartridgeSubscription> getSubscriptions (int tenantId) {
- readLock.lock();
- try {
- SubscriptionContext subscriptionContext = tenantIdToSubscriptionContext.getSubscriptionContext(tenantId);
- if (subscriptionContext == null) {
- // no subscriptions
- return null;
- }
+ SubscriptionContext subscriptionContext = tenantIdToSubscriptionContext.getSubscriptionContext(tenantId);
+ if (subscriptionContext == null) {
+ // no subscriptions
+ return null;
+ }
- return subscriptionContext.getSubscriptions();
+ return subscriptionContext.getSubscriptions();
- } finally {
- readLock.unlock();
- }
}
public Collection<CartridgeSubscription> getSubscriptionForType (int tenantId, String cartridgeType) {
- readLock.lock();
+ SubscriptionContext subscriptionContext = tenantIdToSubscriptionContext.getSubscriptionContext(tenantId);
+ if (subscriptionContext == null) {
+ // no subscriptions
+ return null;
+ }
- try {
- SubscriptionContext subscriptionContext = tenantIdToSubscriptionContext.getSubscriptionContext(tenantId);
- if (subscriptionContext == null) {
- // no subscriptions
- return null;
- }
+ return subscriptionContext.getSubscriptionsOfType(cartridgeType);
+ }
- return subscriptionContext.getSubscriptionsOfType(cartridgeType);
+ public CartridgeSubscription getSubscriptionForAlias (int tenantId, String subscriptionAlias) {
- } finally {
- readLock.unlock();
+ SubscriptionContext subscriptionContext = tenantIdToSubscriptionContext.getSubscriptionContext(tenantId);
+ if (subscriptionContext == null) {
+ // no subscriptions
+ return null;
}
+
+ return subscriptionContext.getSubscriptionForAlias(subscriptionAlias);
+
}
- public CartridgeSubscription getSubscriptionForAlias (int tenantId, String subscriptionAlias) {
+ public CartridgeSubscription getSubscription (String clusterId) {
- readLock.lock();
+ return clusterIdToSubscription.getSubscription(clusterId);
- try {
- SubscriptionContext subscriptionContext = tenantIdToSubscriptionContext.getSubscriptionContext(tenantId);
- if (subscriptionContext == null) {
- // no subscriptions
- return null;
- }
+ }
- return subscriptionContext.getSubscriptionForAlias(subscriptionAlias);
+ public void acquireWriteLock () {
- } finally {
- readLock.unlock();
- }
+ writeLock.lock();
}
- public CartridgeSubscription getSubscription (String clusterId) {
+ public void releaseWriteLock () {
+
+ writeLock.unlock();
+ }
+
+ public void acquireReadLock () {
readLock.lock();
+ }
- try {
- return clusterIdToSubscription.getSubscription(clusterId);
+ public void releaseReadLock () {
- } finally {
- readLock.unlock();
- }
+ readLock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/SubscriptionContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/SubscriptionContext.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/SubscriptionContext.java
index d76b009..0af7728 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/SubscriptionContext.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/lookup/SubscriptionContext.java
@@ -25,7 +25,6 @@ import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription;
import java.io.Serializable;
import java.util.*;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SubscriptionContext implements Serializable {
@@ -37,10 +36,6 @@ public class SubscriptionContext implements Serializable {
// Map: Subscription Alias -> CartridgeSubscription
private Map<String, CartridgeSubscription> aliasToSubscription;
- //locks
- private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
- private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
-
public SubscriptionContext () {
cartridgeTypeToSubscriptions = new HashMap<String, Set<CartridgeSubscription>>();
@@ -49,17 +44,6 @@ public class SubscriptionContext implements Serializable {
public void addSubscription (CartridgeSubscription cartridgeSubscription) {
- writeLock.lock();
- try {
- add(cartridgeSubscription);
-
- } finally {
- writeLock.unlock();
- }
- }
-
- private void add (CartridgeSubscription cartridgeSubscription) {
-
String cartridgeType = cartridgeSubscription.getType();
if (cartridgeTypeToSubscriptions.containsKey(cartridgeType)) {
Set<CartridgeSubscription> existingSubscriptions = cartridgeTypeToSubscriptions.get(cartridgeType);
@@ -79,7 +63,7 @@ public class SubscriptionContext implements Serializable {
cartridgeTypeToSubscriptions.put(cartridgeType, subscriptions);
}
- // put to aliasToSubscription map
+ // putSubscription to aliasToSubscription map
if (aliasToSubscription.put(cartridgeSubscription.getAlias(), cartridgeSubscription) != null) {
if(log.isDebugEnabled()) {
log.debug("Overwrote the existing Cartridge Subscription for alias " + cartridgeSubscription.getAlias());
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java
index 92b5dd4..e530c12 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java
@@ -23,8 +23,6 @@ import org.apache.axis2.AxisFault;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.adc.mgt.client.CloudControllerServiceClient;
-import org.apache.stratos.adc.mgt.connector.CartridgeSubscriptionConnector;
-import org.apache.stratos.adc.mgt.connector.CartridgeSubscriptionConnectorFactory;
import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
import org.apache.stratos.adc.mgt.dto.SubscriptionInfo;
import org.apache.stratos.adc.mgt.exception.*;
@@ -33,6 +31,7 @@ import org.apache.stratos.adc.mgt.payload.PayloadData;
import org.apache.stratos.adc.mgt.payload.PayloadFactory;
import org.apache.stratos.adc.mgt.publisher.ArtifactUpdatePublisher;
import org.apache.stratos.adc.mgt.repository.Repository;
+import org.apache.stratos.adc.mgt.retriever.DataInsertionAndRetrievalManager;
import org.apache.stratos.adc.mgt.subscriber.Subscriber;
import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription;
import org.apache.stratos.adc.mgt.subscription.factory.CartridgeSubscriptionFactory;
@@ -42,14 +41,11 @@ import org.apache.stratos.adc.mgt.subscription.tenancy.SubscriptionTenancyBehavi
import org.apache.stratos.adc.mgt.subscription.utils.CartridgeSubscriptionUtils;
import org.apache.stratos.adc.mgt.utils.ApplicationManagementUtil;
import org.apache.stratos.adc.mgt.utils.CartridgeConstants;
-import org.apache.stratos.adc.mgt.utils.PersistenceManager;
import org.apache.stratos.cloud.controller.pojo.CartridgeInfo;
import org.apache.stratos.cloud.controller.pojo.Property;
import org.wso2.carbon.context.CarbonContext;
-import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
/**
@@ -58,12 +54,13 @@ import java.util.Set;
public class CartridgeSubscriptionManager {
private static Log log = LogFactory.getLog(CartridgeSubscriptionManager.class);
+ private DataInsertionAndRetrievalManager dataInsertionAndRetrievalManager = new DataInsertionAndRetrievalManager();
/**
* Subscribes to a cartridge
*
* @param cartridgeType Cartridge type
- * @param cartridgeAlias Cartridge alias
+ * @param subscriptionAlias Cartridge alias
* @param autoscalingPolicyName Autoscaling policy name
* @param deploymentPolicyName Deployment Policy name
* @param tenantDomain Subscriing tenant's domain
@@ -87,7 +84,7 @@ public class CartridgeSubscriptionManager {
* @throws AlreadySubscribedException
* @throws InvalidRepositoryException
*/
- public CartridgeSubscription subscribeToCartridge (String cartridgeType, String cartridgeAlias,
+ public CartridgeSubscription subscribeToCartridge (String cartridgeType, String subscriptionAlias,
String autoscalingPolicyName, String deploymentPolicyName,
String tenantDomain, int tenantId,
String tenantAdminUsername, String repositoryType,
@@ -98,7 +95,7 @@ public class CartridgeSubscriptionManager {
UnregisteredCartridgeException, RepositoryRequiredException, RepositoryCredentialsRequiredException,
RepositoryTransportException, AlreadySubscribedException, InvalidRepositoryException {
- return subscribeToCartridgeWithProperties(cartridgeType, cartridgeAlias, autoscalingPolicyName,
+ return subscribeToCartridgeWithProperties(cartridgeType, subscriptionAlias, autoscalingPolicyName,
deploymentPolicyName, tenantDomain, tenantId, tenantAdminUsername,
repositoryType, repositoryURL, isPrivateRepository, repositoryUsername,
repositoryPassword, null);
@@ -121,7 +118,7 @@ public class CartridgeSubscriptionManager {
InvalidRepositoryException {
// validate cartridge alias
- ApplicationManagementUtil.validateCartridgeAlias(cartridgeAlias, cartridgeType);
+ CartridgeSubscriptionUtils.validateCartridgeAlias(tenantId, cartridgeType, cartridgeAlias);
CartridgeInfo cartridgeInfo;
try {
@@ -217,9 +214,6 @@ public class CartridgeSubscriptionManager {
CartridgeSubscriptionUtils.publishTenantSubscribedEvent(cartridgeSubscription.getSubscriber().getTenantId(),
cartridgeSubscription.getCartridgeInfo().getType());
- // publish artifact-deployment event for multi-tenant subscription
- //CartridgeSubscriptionInfo subscription = PersistenceManager.getSubscriptionFromClusterId(clusterId);
-
if(cartridgeInfo.getMultiTenant()) {
log.info(" Multitenant --> Publishing Artifact update event -- ");
ArtifactUpdatePublisher publisher = new ArtifactUpdatePublisher(cartridgeSubscription.getRepository(),
@@ -243,61 +237,61 @@ public class CartridgeSubscriptionManager {
* @throws NotSubscribedException
* @throws AxisFault
*/
- public void connectCartridges (String tenantDomain, CartridgeSubscription cartridgeSubscription,
- String connectingSubscriptionAlias)
- throws ADCException, NotSubscribedException, AxisFault {
-
- //TODO: retrieve from the cache and connect. For now, new objects are created
-
- CartridgeSubscription connectingCartridgeSubscription = getCartridgeSubscription(tenantDomain,
- connectingSubscriptionAlias);
-
- if(cartridgeSubscription == null) {
- String errorMsg = "No cartridge subscription found in cache for tenant " + tenantDomain + " connecting aborted";
- log.error(errorMsg);
- return;
- }
-
- if(connectingCartridgeSubscription == null) {
- String errorMsg = "No cartridge subscription found in cache for tenant " + tenantDomain + ", alias " +
- connectingSubscriptionAlias + ", connecting aborted";
- log.error(errorMsg);
- return;
- }
-
- CartridgeSubscriptionConnector cartridgeSubscriptionConnector = CartridgeSubscriptionConnectorFactory.
- getCartridgeInstanceConnector(connectingCartridgeSubscription.getType());
-
- cartridgeSubscription.connect(connectingSubscriptionAlias);
-
- //PayloadArg payloadArg = cartridgeSubscription.createPayloadParameters();
-
- //get additional payload params for connecting cartridges
- Properties payloadProperties = cartridgeSubscriptionConnector.createConnection(cartridgeSubscription,
- connectingCartridgeSubscription);
- StringBuilder connectionParamsBuilder = new StringBuilder();
- Set<Map.Entry<Object,Object>> payloadParamEntries = payloadProperties.entrySet();
-
- for (Map.Entry<Object, Object> payloadParamEntry : payloadParamEntries) {
- connectionParamsBuilder.append(",");
- connectionParamsBuilder.append(payloadParamEntry.getKey().toString());
- connectionParamsBuilder.append("=");
- connectionParamsBuilder.append(payloadParamEntry.getValue().toString());
- }
-
- //add connection relates parameters to the payload
- if(cartridgeSubscription.getPayloadData() != null) {
- //cartridgeSubscription.getPayloadData().populatePayload(connectionParamsBuilder.toString());
- } else {
- //no existing payload
- /*Payload payload = PayloadFactory.getPayloadDataInstance(cartridgeSubscription.getCartridgeInfo().getProvider(),
- cartridgeSubscription.getType(), "/tmp/" + tenantDomain + "-" + cartridgeSubscription.getAlias() +
- ".zip");
- payload.populatePayload(connectionParamsBuilder.toString());
- cartridgeSubscription.setPayloadData(payload);*/
- }
-
- }
+// public void connectCartridges (String tenantDomain, CartridgeSubscription cartridgeSubscription,
+// String connectingSubscriptionAlias)
+// throws ADCException, NotSubscribedException, AxisFault {
+//
+// //TODO: retrieve from the cache and connect. For now, new objects are created
+//
+// CartridgeSubscription connectingCartridgeSubscription = getCartridgeSubscription(tenantDomain,
+// connectingSubscriptionAlias);
+//
+// if(cartridgeSubscription == null) {
+// String errorMsg = "No cartridge subscription found in cache for tenant " + tenantDomain + " connecting aborted";
+// log.error(errorMsg);
+// return;
+// }
+//
+// if(connectingCartridgeSubscription == null) {
+// String errorMsg = "No cartridge subscription found in cache for tenant " + tenantDomain + ", alias " +
+// connectingSubscriptionAlias + ", connecting aborted";
+// log.error(errorMsg);
+// return;
+// }
+//
+// CartridgeSubscriptionConnector cartridgeSubscriptionConnector = CartridgeSubscriptionConnectorFactory.
+// getCartridgeInstanceConnector(connectingCartridgeSubscription.getType());
+//
+// cartridgeSubscription.connect(connectingSubscriptionAlias);
+//
+// //PayloadArg payloadArg = cartridgeSubscription.createPayloadParameters();
+//
+// //get additional payload params for connecting cartridges
+// Properties payloadProperties = cartridgeSubscriptionConnector.createConnection(cartridgeSubscription,
+// connectingCartridgeSubscription);
+// StringBuilder connectionParamsBuilder = new StringBuilder();
+// Set<Map.Entry<Object,Object>> payloadParamEntries = payloadProperties.entrySet();
+//
+// for (Map.Entry<Object, Object> payloadParamEntry : payloadParamEntries) {
+// connectionParamsBuilder.append(",");
+// connectionParamsBuilder.append(payloadParamEntry.getKey().toString());
+// connectionParamsBuilder.append("=");
+// connectionParamsBuilder.append(payloadParamEntry.getValue().toString());
+// }
+//
+// //add connection relates parameters to the payload
+// if(cartridgeSubscription.getPayloadData() != null) {
+// //cartridgeSubscription.getPayloadData().populatePayload(connectionParamsBuilder.toString());
+// } else {
+// //no existing payload
+// /*Payload payload = PayloadFactory.getPayloadDataInstance(cartridgeSubscription.getCartridgeInfo().getProvider(),
+// cartridgeSubscription.getType(), "/tmp/" + tenantDomain + "-" + cartridgeSubscription.getAlias() +
+// ".zip");
+// payload.populatePayload(connectionParamsBuilder.toString());
+// cartridgeSubscription.setPayloadData(payload);*/
+// }
+//
+// }
/**
* Registers the cartridge subscription for the given CartridgeSubscriptionInfo object
@@ -313,7 +307,11 @@ public class CartridgeSubscriptionManager {
CartridgeSubscriptionInfo cartridgeSubscriptionInfo = cartridgeSubscription.registerSubscription(null);
- int subscriptionId;
+ //set status as 'SUBSCRIBED'
+ cartridgeSubscription.setSubscriptionStatus(CartridgeConstants.SUBSCRIBED);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /*int subscriptionId;
try {
subscriptionId = PersistenceManager.persistSubscription(cartridgeSubscriptionInfo);
@@ -322,12 +320,24 @@ public class CartridgeSubscriptionManager {
cartridgeSubscription.getSubscriber().getTenantDomain() + ", alias " + cartridgeSubscription.getType();
log.error(errorMsg);
throw new ADCException(errorMsg, e);
+ }*/
+
+ //cartridgeSubscription.setSubscriptionId(subscriptionId);
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ try {
+ dataInsertionAndRetrievalManager.cacheAndPersistSubcription(cartridgeSubscription);
+
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Error saving subscription for tenant " +
+ cartridgeSubscription.getSubscriber().getTenantDomain() + ", alias " + cartridgeSubscription.getType();
+ log.error(errorMsg);
+ throw new ADCException(errorMsg, e);
}
- cartridgeSubscription.setSubscriptionId(subscriptionId);
ApplicationManagementUtil.addDNSEntry(cartridgeSubscriptionInfo.getAlias(), cartridgeSubscription.getType());
- log.info("Successful Subscription: "+cartridgeSubscription.toString());
+ log.info("Successful Subscription: " + cartridgeSubscription.toString());
return ApplicationManagementUtil.
createSubscriptionResponse(cartridgeSubscriptionInfo, cartridgeSubscription.getRepository());
}
@@ -343,9 +353,8 @@ public class CartridgeSubscriptionManager {
public void unsubscribeFromCartridge (String tenantDomain, String alias)
throws ADCException, NotSubscribedException {
- //TODO: retrieve from the cache and connect. For now, new objects are created
-
- CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(tenantDomain, alias);
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /*CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(tenantDomain, alias);
if(cartridgeSubscription != null) {
cartridgeSubscription.removeSubscription();
@@ -358,9 +367,40 @@ public class CartridgeSubscriptionManager {
if(log.isDebugEnabled()) {
log.debug("No cartridge subscription found with alias " + alias + " for tenant " + tenantDomain);
}
+ }*/
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ //fix properly
+ CartridgeSubscription cartridgeSubscription = dataInsertionAndRetrievalManager.getCartridgeSubscription(CarbonContext.getThreadLocalCarbonContext().getTenantId(), alias);
+ if(cartridgeSubscription != null) {
+ cartridgeSubscription.removeSubscription();
+
+ //set status as 'UNSUBSCRIBED'
+ cartridgeSubscription.setSubscriptionStatus(CartridgeConstants.UNSUBSCRIBED);
+
+ // persist changes
+ try {
+ dataInsertionAndRetrievalManager.cacheAndPersistSubcription(cartridgeSubscription);
+
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Error saving subscription for tenant " +
+ cartridgeSubscription.getSubscriber().getTenantDomain() + ", alias " + cartridgeSubscription.getAlias();
+ log.error(errorMsg);
+ throw new ADCException(errorMsg, e);
+ }
+
+ // Publish tenant un-subscribed event to message broker
+ CartridgeSubscriptionUtils.publishTenantUnSubscribedEvent(cartridgeSubscription.getSubscriber().getTenantId(),
+ cartridgeSubscription.getCartridgeInfo().getType());
+ }
+ else {
+ String errorMsg = "No cartridge subscription found with alias " + alias + " for tenant " + tenantDomain;
+ log.error(errorMsg);
+ throw new NotSubscribedException(errorMsg, alias);
}
}
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /*
/**
* Creates and returns a CartridgeSubscription object
*
@@ -371,7 +411,7 @@ public class CartridgeSubscriptionManager {
* @throws ADCException
* @throws NotSubscribedException
*/
- public CartridgeSubscription getCartridgeSubscription(String tenantDomain, String alias)
+ /*public CartridgeSubscription getCartridgeSubscription(String tenantDomain, String alias)
throws ADCException, NotSubscribedException {
CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(tenantDomain, alias);
@@ -385,7 +425,8 @@ public class CartridgeSubscriptionManager {
}
return populateCartridgeSubscriptionInformation(cartridgeInfo, cartridgeSubscriptionInfo);
- }
+ }*/
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//TODO: remove
/*public CartridgeSubscription deployMultitenantService(String cartridgeType, String cartridgeAlias,
@@ -482,7 +523,7 @@ public class CartridgeSubscriptionManager {
}*/
- private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo(String tenantDomain, String alias)
+ /*private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String tenantDomain, String alias)
throws ADCException, NotSubscribedException {
CartridgeSubscriptionInfo subscription;
@@ -503,9 +544,9 @@ public class CartridgeSubscriptionManager {
return subscription;
- }
+ }*/
- private List<CartridgeSubscriptionInfo> getCartridgeSubscriptions (int tenantId) throws ADCException, NotSubscribedException {
+ /*private List<CartridgeSubscriptionInfo> getCartridgeSubscriptions (int tenantId) throws ADCException, NotSubscribedException {
List<CartridgeSubscriptionInfo> subscriptions;
try {
@@ -524,7 +565,7 @@ public class CartridgeSubscriptionManager {
}
return subscriptions;
- }
+ }*/
private CartridgeSubscription populateCartridgeSubscriptionInformation(CartridgeInfo cartridgeInfo,
CartridgeSubscriptionInfo cartridgeSubscriptionInfo)
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/payload/PayloadData.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/payload/PayloadData.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/payload/PayloadData.java
index af385ae..7482ab6 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/payload/PayloadData.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/payload/PayloadData.java
@@ -41,7 +41,7 @@ public abstract class PayloadData implements Serializable {
completePayloadDataBuilder.append(",");
}
- //payloadDataMap.put(payloadDataName, payloadDataValue);
+ //payloadDataMap.putSubscription(payloadDataName, payloadDataValue);
completePayloadDataBuilder.append(payloadDataName + "=" + payloadDataValue);
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/PersistenceManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/PersistenceManager.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/PersistenceManager.java
index e3afaf2..c4291ec 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/PersistenceManager.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/PersistenceManager.java
@@ -22,7 +22,7 @@ package org.apache.stratos.adc.mgt.persistence;
import org.apache.stratos.adc.mgt.exception.PersistenceManagerException;
import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription;
-import java.util.List;
+import java.util.Collection;
public abstract class PersistenceManager {
@@ -32,20 +32,23 @@ public abstract class PersistenceManager {
public abstract void removeCartridgeSubscription (int tenantId, String alias)
throws PersistenceManagerException;
- public abstract CartridgeSubscription getCartridgeSubscription(int tenantId, String alias)
- throws PersistenceManagerException;
+ //public abstract CartridgeSubscription getCartridgeSubscription(int tenantId, String alias)
+ // throws PersistenceManagerException;
- public abstract List<CartridgeSubscription> getCartridgeSubscriptions()
+ public abstract Collection<CartridgeSubscription> getCartridgeSubscriptions()
throws PersistenceManagerException;
- public abstract List<CartridgeSubscription> getCartridgeSubscriptions(int tenantId)
+ public abstract Collection<CartridgeSubscription> getCartridgeSubscriptions(int tenantId)
throws PersistenceManagerException;
- public abstract CartridgeSubscription getCartridgeSubscription (String clusterDomain)
- throws PersistenceManagerException;
+ //public abstract Collection<CartridgeSubscription> getCartridgeSubscriptions(int tenantId)
+ // throws PersistenceManagerException;
- public abstract List<CartridgeSubscription> getCartridgeSubscriptions(int tenantId, String cartridgeType)
- throws PersistenceManagerException;
+ //public abstract CartridgeSubscription getCartridgeSubscription (String clusterDomain)
+ // throws PersistenceManagerException;
+
+ //public abstract Collection<CartridgeSubscription> getCartridgeSubscriptions(int tenantId, String cartridgeType)
+ // throws PersistenceManagerException;
/*public abstract Repository getRepository (int tenantId, String alias)
throws PersistenceManagerException;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/RegistryBasedPersistenceManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/RegistryBasedPersistenceManager.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/RegistryBasedPersistenceManager.java
index 0db3c19..49f55b1 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/RegistryBasedPersistenceManager.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/RegistryBasedPersistenceManager.java
@@ -22,8 +22,6 @@ package org.apache.stratos.adc.mgt.persistence;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.adc.mgt.exception.PersistenceManagerException;
-import org.apache.stratos.adc.mgt.lookup.ClusterIdToSubscription;
-import org.apache.stratos.adc.mgt.lookup.SubscriptionContext;
import org.apache.stratos.adc.mgt.registry.RegistryManager;
import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription;
import org.apache.stratos.adc.mgt.utils.Deserializer;
@@ -32,27 +30,32 @@ import org.wso2.carbon.registry.core.exceptions.RegistryException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.Collections;
public class RegistryBasedPersistenceManager extends PersistenceManager {
private static final Log log = LogFactory.getLog(RegistryBasedPersistenceManager.class);
// Registry paths
- private static final String STRATOS_MANAGER_REOSURCE = "/stratos.manager";
- private static final String CLUSTER_ID_TO_SUBSCRIPTION = "/clusterIdToSubscription";
- private static final String TENANT_ID_TO_SUBSCRIPTION_CONTEXT = "/tenantIdToSubscriptionContext";
+ private static final String STRATOS_MANAGER_REOSURCE = "/stratos_manager";
+ //private static final String CLUSTER_ID_TO_SUBSCRIPTION = "/clusterIdToSubscription";
+ //private static final String SUBSCRIPTION_CONTEXT = "/subscription_context";
+ private static final String SUBSCRIPTIONS = "/subscriptions";
@Override
public void persistCartridgeSubscription (CartridgeSubscription cartridgeSubscription) throws PersistenceManagerException {
- SubscriptionContext subscriptionContext = new SubscriptionContext();
- subscriptionContext.addSubscription(cartridgeSubscription);
+ //SubscriptionContext subscriptionContext = new SubscriptionContext();
+ //subscriptionContext.addSubscription(cartridgeSubscription);
- //TODO: need to synchronize?
- // persist in the path TENANT_ID_TO_SUBSCRIPTION_CONTEXT
+ // persist in the path SUBSCRIPTION_CONTEXT
try {
- RegistryManager.getInstance().persist(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT + "/" +
- Integer.toString(cartridgeSubscription.getSubscriber().getTenantId()), Serializer.serializeSubscriptionSontextToByteArray(subscriptionContext));
+ //RegistryManager.getInstance().persist(STRATOS_MANAGER_REOSURCE + SUBSCRIPTION_CONTEXT + "/" +
+ // Integer.toString(cartridgeSubscription.getSubscriber().getTenantId()), Serializer.serializeSubscriptionSontextToByteArray(subscriptionContext));
+ RegistryManager.getInstance().persist(STRATOS_MANAGER_REOSURCE + SUBSCRIPTIONS + "/" +
+ Integer.toString(cartridgeSubscription.getSubscriber().getTenantId()) + "/" +
+ cartridgeSubscription.getType() + "/" +
+ cartridgeSubscription.getAlias(), Serializer.serializeSubscriptionSontextToByteArray(cartridgeSubscription), cartridgeSubscription.getClusterDomain());
} catch (RegistryException e) {
throw new PersistenceManagerException(e);
@@ -62,7 +65,7 @@ public class RegistryBasedPersistenceManager extends PersistenceManager {
}
// persist in the path CLUSTER_ID_TO_SUBSCRIPTION
- try {
+ /*try {
RegistryManager.getInstance().persist(STRATOS_MANAGER_REOSURCE + CLUSTER_ID_TO_SUBSCRIPTION + "/" +
cartridgeSubscription.getClusterDomain(), Serializer.serializeSubscriptionSontextToByteArray(subscriptionContext));
@@ -71,7 +74,7 @@ public class RegistryBasedPersistenceManager extends PersistenceManager {
} catch (IOException e) {
throw new PersistenceManagerException(e);
- }
+ }*/
}
@Override
@@ -79,13 +82,13 @@ public class RegistryBasedPersistenceManager extends PersistenceManager {
//TODO
}
- @Override
+ /*@Override
public CartridgeSubscription getCartridgeSubscription (int tenantId, String alias) throws PersistenceManagerException {
Object byteObj;
try {
- byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT + "/" +
+ byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + SUBSCRIPTION_CONTEXT + "/" +
Integer.toString(tenantId));
} catch (RegistryException e) {
@@ -112,15 +115,15 @@ public class RegistryBasedPersistenceManager extends PersistenceManager {
}
return null;
- }
+ }*/
- @Override
- public List<CartridgeSubscription> getCartridgeSubscriptions () throws PersistenceManagerException {
+ /*@Override
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions () throws PersistenceManagerException {
Object resourceObj;
try {
- resourceObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT);
+ resourceObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + SUBSCRIPTION_CONTEXT);
} catch (RegistryException e) {
throw new PersistenceManagerException(e);
@@ -130,10 +133,10 @@ public class RegistryBasedPersistenceManager extends PersistenceManager {
return null;
}
- // get the paths for all SubscriptionContext instnaces
+ // get the paths for all SubscriptionContext instances
String[] subscriptionCtxtResourcePaths = (String[]) resourceObj;
- List<CartridgeSubscription> cartridgeSubscriptions = new ArrayList<CartridgeSubscription>();
+ Collection<CartridgeSubscription> cartridgeSubscriptions = new ArrayList<CartridgeSubscription>();
//for each path, get the SubscriptionContext instance
for (String subscriptionCtxResourcePath : subscriptionCtxtResourcePaths) {
@@ -165,15 +168,74 @@ public class RegistryBasedPersistenceManager extends PersistenceManager {
}
return cartridgeSubscriptions;
- }
+ }*/
@Override
- public List<CartridgeSubscription> getCartridgeSubscriptions (int tenantId) throws PersistenceManagerException {
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions () throws PersistenceManagerException {
+
+ return traverseAndGetCartridgeSubscriptions(STRATOS_MANAGER_REOSURCE + SUBSCRIPTIONS);
+ }
+
+ private Collection<CartridgeSubscription> traverseAndGetCartridgeSubscriptions (String resourcePath) throws PersistenceManagerException {
+
+ Object resourceObj;
+
+ try {
+ resourceObj = RegistryManager.getInstance().retrieve(resourcePath);
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ if (resourceObj == null) {
+ // there is no resource at the given path
+ return null;
+
+ } else if (resourceObj instanceof String[]) {
+
+ // get the paths for all SubscriptionContext instances
+ String[] subscriptionResourcePaths = (String[]) resourceObj;
+
+ Collection<CartridgeSubscription> cartridgeSubscriptions = new ArrayList<CartridgeSubscription>();
+ // traverse the paths recursively
+ for (String subscriptionResourcePath : subscriptionResourcePaths) {
+ cartridgeSubscriptions.addAll(traverseAndGetCartridgeSubscriptions(subscriptionResourcePath));
+ // remove any nulls
+ cartridgeSubscriptions.removeAll(Collections.singleton(null));
+ // return the CartridgeSubscription list
+ return cartridgeSubscriptions;
+ }
+
+ } else {
+ // De-serialize
+ Object subscriptionObj;
+
+ try {
+ subscriptionObj = Deserializer.deserializeFromByteArray((byte[]) resourceObj);
+
+ } catch (Exception e) {
+ // issue might be de-serializing only this object, therefore log and continue without throwing
+ log.error("Error while de-serializing the object retrieved from " + resourcePath, e);
+ return null;
+ }
+
+ if (subscriptionObj != null && subscriptionObj instanceof CartridgeSubscription) {
+ // return a list out of the CartridgeSubscription instance
+ return Collections.singletonList((CartridgeSubscription) subscriptionObj);
+
+ }
+ }
+
+ return null;
+ }
+
+ /*@Override
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId) throws PersistenceManagerException {
Object byteObj;
try {
- byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId));
+ byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId));
} catch (RegistryException e) {
throw new PersistenceManagerException(e);
@@ -188,16 +250,50 @@ public class RegistryBasedPersistenceManager extends PersistenceManager {
throw new PersistenceManagerException(e);
}
- List<CartridgeSubscription> cartridgeSubscriptions = new ArrayList<CartridgeSubscription>();
if (subscriptionContextObj instanceof SubscriptionContext) {
//get all Subscriptions for this tenant
- cartridgeSubscriptions.addAll(((SubscriptionContext) subscriptionContextObj).getSubscriptions());
+ return ((SubscriptionContext) subscriptionContextObj).getSubscriptions();
}
- return cartridgeSubscriptions;
- }
+ return null;
+ }*/
@Override
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId) throws PersistenceManagerException {
+
+ return traverseAndGetCartridgeSubscriptions(STRATOS_MANAGER_REOSURCE + SUBSCRIPTIONS + "/" + Integer.toString(tenantId));
+ }
+
+ /*@Override
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId) throws PersistenceManagerException {
+
+ Object byteObj;
+
+ try {
+ byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId));
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ Object subscriptionContextObj;
+
+ try {
+ subscriptionContextObj = Deserializer.deserializeFromByteArray((byte[]) byteObj);
+
+ } catch (Exception e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ if (subscriptionContextObj instanceof SubscriptionContext) {
+ //get all Subscriptions for this tenant
+ return ((SubscriptionContext) subscriptionContextObj).getSubscriptions();
+ }
+
+ return null;
+ }*/
+
+ /*@Override
public CartridgeSubscription getCartridgeSubscription (String clusterDomain) throws PersistenceManagerException {
Object byteObj;
@@ -223,15 +319,15 @@ public class RegistryBasedPersistenceManager extends PersistenceManager {
}
return null;
- }
+ }*/
- @Override
- public List<CartridgeSubscription> getCartridgeSubscriptions (int tenantId, String cartridgeType) throws PersistenceManagerException {
+ /*@Override
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId, String cartridgeType) throws PersistenceManagerException {
Object byteObj;
try {
- byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId));
+ byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId));
} catch (RegistryException e) {
throw new PersistenceManagerException(e);
@@ -246,12 +342,12 @@ public class RegistryBasedPersistenceManager extends PersistenceManager {
throw new PersistenceManagerException(e);
}
- List<CartridgeSubscription> cartridgeSubscriptions = new ArrayList<CartridgeSubscription>();
+
if (subscriptionContextObj instanceof SubscriptionContext) {
//get all Subscriptions for this tenant and the type
- cartridgeSubscriptions.addAll(((SubscriptionContext) subscriptionContextObj).getSubscriptionsOfType(cartridgeType));
+ return ((SubscriptionContext) subscriptionContextObj).getSubscriptionsOfType(cartridgeType);
}
- return cartridgeSubscriptions;
- }
+ return null;
+ }*/
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
index ab267f7..07bf3b3 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
@@ -21,9 +21,9 @@ package org.apache.stratos.adc.mgt.publisher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
import org.apache.stratos.adc.mgt.internal.DataHolder;
-import org.apache.stratos.adc.mgt.utils.PersistenceManager;
+import org.apache.stratos.adc.mgt.retriever.DataInsertionAndRetrievalManager;
+import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.domain.tenant.Tenant;
import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
@@ -32,6 +32,7 @@ import org.wso2.carbon.ntask.core.Task;
import org.wso2.carbon.user.core.tenant.TenantManager;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -64,13 +65,16 @@ public class TenantSynzhronizerTask implements Task {
}
tenant = new Tenant(carbonTenant.getId(), carbonTenant.getDomain());
// Add subscriptions
- List<CartridgeSubscriptionInfo> subscriptions = PersistenceManager.getSubscriptionsForTenant(tenant.getTenantId());
- for (CartridgeSubscriptionInfo subscription : subscriptions) {
+ /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ //List<CartridgeSubscriptionInfo> cartridgeSubscriptions = PersistenceManager.getSubscriptionsForTenant(tenant.getTenantId());
+ /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ Collection<CartridgeSubscription> cartridgeSubscriptions = new DataInsertionAndRetrievalManager().getCartridgeSubscriptions(tenant.getTenantId());
+ for (CartridgeSubscription subscription : cartridgeSubscriptions) {
if(log.isDebugEnabled()) {
log.debug(String.format("Tenant subscription found: [tenant-id] %d [tenant-domain] %s [service] %s",
- carbonTenant.getId(), carbonTenant.getDomain(), subscription.getCartridge()));
+ carbonTenant.getId(), carbonTenant.getDomain(), subscription.getType()));
}
- tenant.addServiceSubscription(subscription.getCartridge());
+ tenant.addServiceSubscription(subscription.getType());
}
tenants.add(tenant);
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c36bd253/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/registry/RegistryManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/registry/RegistryManager.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/registry/RegistryManager.java
index 821418a..9e18811 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/registry/RegistryManager.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/registry/RegistryManager.java
@@ -31,8 +31,8 @@ import org.wso2.carbon.registry.core.session.UserRegistry;
public class RegistryManager {
private final static Log log = LogFactory.getLog(RegistryManager.class);
- ;
- private static final String STRATOS_MANAGER_REOSURCE = "/stratos.manager";
+
+ private static final String STRATOS_MANAGER_REOSURCE = "/stratos_manager";
private static RegistryService registryService;
private static volatile RegistryManager registryManager;
@@ -120,7 +120,7 @@ public class RegistryManager {
tenantGovRegistry.beginTransaction();
Resource nodeResource = tenantGovRegistry.newResource();
nodeResource.setContent(Serializer.serializeSubscriptionSontextToByteArray(subscriptionContext));
- tenantGovRegistry.put(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT, nodeResource);
+ tenantGovRegistry.putSubscription(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT, nodeResource);
tenantGovRegistry.commitTransaction();
} catch (Exception e) {
@@ -193,7 +193,7 @@ public class RegistryManager {
govRegistry.beginTransaction();
Resource nodeResource = govRegistry.newResource();
nodeResource.setContent(Serializer.serializeClusterIdToSubscriptionToByteArray(clusterIdToSubscription));
- govRegistry.put(STRATOS_MANAGER_REOSURCE + CLUSTER_ID_TO_SUBSCRIPTION, nodeResource);
+ govRegistry.putSubscription(STRATOS_MANAGER_REOSURCE + CLUSTER_ID_TO_SUBSCRIPTION, nodeResource);
govRegistry.commitTransaction();
} catch (Exception e) {
@@ -228,7 +228,7 @@ public class RegistryManager {
}
}*/
- public void persist (String path, byte [] resourceBytes) throws RegistryException {
+ public void persist (String path, byte [] resourceBytes, String tag) throws RegistryException {
UserRegistry registry = initRegistry();
@@ -236,13 +236,27 @@ public class RegistryManager {
registry.beginTransaction();
Resource nodeResource = registry.newResource();
nodeResource.setContent(resourceBytes);
+ // store the resource in the registry
registry.put(path, nodeResource);
+ if (tag != null) {
+ // apply the tag
+ registry.applyTag(path, tag);
+ }
+ // commit
registry.commitTransaction();
} catch (RegistryException e) {
- registry.rollbackTransaction();
String errorMsg = "Failed to persist the given resource in registry path " + path;
log.error(errorMsg, e);
+ // rollback
+ try {
+ registry.rollbackTransaction();
+
+ } catch (RegistryException e1) {
+ errorMsg = "Failed to rollback the transaction in registry path " + path;
+ log.error(errorMsg, e1);
+ throw e1;
+ }
throw e;
}
}
@@ -257,8 +271,10 @@ public class RegistryManager {
resource = registry.get(resourcePath);
} catch (ResourceNotFoundException ignore) {
- String errorMsg = "Resource not found at path " + resourcePath;
- log.error(errorMsg);
+ // nothing has been persisted in the registry yet
+ if(log.isDebugEnabled()) {
+ log.debug("No resource found in the registry path " + resourcePath);
+ }
return null;
} catch (RegistryException e) {
@@ -267,10 +283,6 @@ public class RegistryManager {
throw e;
}
- if(resource == null) {
- return null;
- }
-
return resource.getContent();
}
}