You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2014/01/08 06:51:41 UTC
[30/46] renaming package adc.mgt to manager
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d280533/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/persistence/RegistryBasedPersistenceManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/persistence/RegistryBasedPersistenceManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/persistence/RegistryBasedPersistenceManager.java
new file mode 100644
index 0000000..a389b93
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/persistence/RegistryBasedPersistenceManager.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.persistence;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.deploy.service.Service;
+import org.apache.stratos.manager.exception.PersistenceManagerException;
+import org.apache.stratos.manager.registry.RegistryManager;
+import org.apache.stratos.manager.subscription.CartridgeSubscription;
+import org.apache.stratos.manager.utils.Deserializer;
+import org.apache.stratos.manager.utils.Serializer;
+import org.wso2.carbon.context.CarbonContext;
+import org.wso2.carbon.context.PrivilegedCarbonContext;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+public class RegistryBasedPersistenceManager extends PersistenceManager {
+
+ private static final Log log = LogFactory.getLog(RegistryBasedPersistenceManager.class);
+ // Registry paths
+ private static final String STRATOS_MANAGER_REOSURCE = "/stratos_manager";
+ private static final String ACTIVE_SUBSCRIPTIONS = "/subscriptions/active";
+ private static final String INACTIVE_SUBSCRIPTIONS = "/subscriptions/inactive";
+ private static final String SERVICES = "/services";
+
+ @Override
+ public void persistCartridgeSubscription (CartridgeSubscription cartridgeSubscription) throws PersistenceManagerException {
+
+ int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
+ if (tenantId != MultitenantConstants.SUPER_TENANT_ID) {
+ // TODO: This is only a workaround. Proper fix is to write to tenant registry
+ try {
+ PrivilegedCarbonContext.startTenantFlow();
+ PrivilegedCarbonContext carbonContext = PrivilegedCarbonContext.getThreadLocalCarbonContext();
+ carbonContext.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
+ carbonContext.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
+
+ persistSubscription(cartridgeSubscription);
+
+ } finally {
+ PrivilegedCarbonContext.endTenantFlow();
+ }
+
+ } else {
+ persistSubscription(cartridgeSubscription);
+ }
+ }
+
+ private void persistSubscription (CartridgeSubscription cartridgeSubscription) throws PersistenceManagerException {
+
+ // persist
+ try {
+ RegistryManager.getInstance().persist(STRATOS_MANAGER_REOSURCE + ACTIVE_SUBSCRIPTIONS + "/" +
+ Integer.toString(cartridgeSubscription.getSubscriber().getTenantId()) + "/" +
+ cartridgeSubscription.getType() + "/" +
+ cartridgeSubscription.getAlias(), Serializer.serializeSubscriptionSontextToByteArray(cartridgeSubscription), cartridgeSubscription.getClusterDomain());
+
+ if (log.isDebugEnabled()) {
+ log.debug("Persisted CartridgeSubscription successfully: [ " + cartridgeSubscription.getSubscriber().getTenantDomain()
+ + ", " + cartridgeSubscription.getType() + ", " + cartridgeSubscription.getAlias() + " ] ");
+ }
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+
+ } catch (IOException e) {
+ throw new PersistenceManagerException(e);
+ }
+ }
+
+ @Override
+ public void removeCartridgeSubscription (int tenantId, String type, String alias) throws PersistenceManagerException {
+
+ if (tenantId != MultitenantConstants.SUPER_TENANT_ID) {
+ // TODO: This is only a workaround. Proper fix is to write to tenant registry
+ try {
+ PrivilegedCarbonContext.startTenantFlow();
+ PrivilegedCarbonContext carbonContext = PrivilegedCarbonContext.getThreadLocalCarbonContext();
+ carbonContext.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
+ carbonContext.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
+
+ removeSubscription(tenantId, type, alias);
+
+ } finally {
+ PrivilegedCarbonContext.endTenantFlow();
+ }
+
+ } else {
+ removeSubscription(tenantId, type, alias);
+ }
+ }
+
+ private void removeSubscription (int tenantId, String type, String alias) throws PersistenceManagerException {
+
+ /*String resourcePath = STRATOS_MANAGER_REOSURCE + ACTIVE_SUBSCRIPTIONS + "/" + Integer.toString(tenantId) + "/" + type + "/" + alias;
+
+ try {
+ RegistryManager.getInstance().delete(resourcePath);
+ if (log.isDebugEnabled()) {
+ log.debug("Deleted CartridgeSubscription on path " + resourcePath + " successfully");
+ }
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }*/
+
+ // move the subscription from active set to inactive set
+ String sourcePath = STRATOS_MANAGER_REOSURCE + ACTIVE_SUBSCRIPTIONS + "/" + Integer.toString(tenantId) + "/" + type + "/" + alias;
+ String targetPath = STRATOS_MANAGER_REOSURCE + INACTIVE_SUBSCRIPTIONS + "/" + Integer.toString(tenantId) + "/" + type + "/" + alias;
+
+ try {
+ RegistryManager.getInstance().move(sourcePath, targetPath);
+ if (log.isDebugEnabled()) {
+ log.debug("Moved CartridgeSubscription on " + sourcePath + " to " + targetPath + " successfully");
+ }
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+ }
+
+ /*@Override
+ public CartridgeSubscription getCartridgeSubscription (int tenantId, String alias) throws PersistenceManagerException {
+
+ Object byteObj;
+
+ try {
+ byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + SUBSCRIPTION_CONTEXT + "/" +
+ Integer.toString(tenantId));
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ if (byteObj == null) {
+ return null;
+ }
+
+ Object subscriptionContextObj;
+
+ try {
+ subscriptionContextObj = Deserializer.deserializeFromByteArray((byte[]) byteObj);
+
+ } catch (Exception e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ SubscriptionContext subscriptionContext;
+ if (subscriptionContextObj instanceof SubscriptionContext) {
+ subscriptionContext = (SubscriptionContext) subscriptionContextObj;
+ return subscriptionContext.getSubscriptionForAlias(alias);
+ }
+
+ return null;
+ }*/
+
+ /*@Override
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions () throws PersistenceManagerException {
+
+ Object resourceObj;
+
+ try {
+ resourceObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + SUBSCRIPTION_CONTEXT);
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ if ((resourceObj == null) || !(resourceObj instanceof String[])) {
+ return null;
+ }
+
+ // get the paths for all SubscriptionContext instances
+ String[] subscriptionCtxtResourcePaths = (String[]) resourceObj;
+
+ Collection<CartridgeSubscription> cartridgeSubscriptions = new ArrayList<CartridgeSubscription>();
+ //for each path, get the SubscriptionContext instance
+ for (String subscriptionCtxResourcePath : subscriptionCtxtResourcePaths) {
+
+ Object serializedSubscriptionCtxObj = null;
+ try {
+ serializedSubscriptionCtxObj = RegistryManager.getInstance().retrieve(subscriptionCtxResourcePath);
+
+ } catch (RegistryException e) {
+ // issue might be at only this path, therefore log and continue
+ log.error("Error while retrieving Resource at " + subscriptionCtxResourcePath, e);
+ continue;
+ }
+
+ //De-serialize
+ Object subscriptionCtxObj = null;
+ try {
+ subscriptionCtxObj = Deserializer.deserializeFromByteArray((byte[]) serializedSubscriptionCtxObj);
+
+ } catch (Exception e) {
+ // issue might be de-serializing only this object, therefore log and continue
+ log.error("Error while de-serializing the object retrieved from " + subscriptionCtxResourcePath, e);
+ continue;
+ }
+
+ if (subscriptionCtxObj != null && subscriptionCtxObj instanceof SubscriptionContext) {
+ SubscriptionContext subscriptionContext = (SubscriptionContext) subscriptionCtxObj;
+ cartridgeSubscriptions.addAll(subscriptionContext.getSubscriptions());
+ }
+ }
+
+ return cartridgeSubscriptions;
+ }*/
+
+ @Override
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions () throws PersistenceManagerException {
+
+ return traverseAndGetCartridgeSubscriptions(STRATOS_MANAGER_REOSURCE + ACTIVE_SUBSCRIPTIONS);
+ }
+
+ private Collection<CartridgeSubscription> traverseAndGetCartridgeSubscriptions (String resourcePath) throws PersistenceManagerException {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Root resource path: " + resourcePath);
+ }
+
+ Object resourceObj;
+
+ try {
+ resourceObj = RegistryManager.getInstance().retrieve(resourcePath);
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ Collection<CartridgeSubscription> cartridgeSubscriptions = new ArrayList<CartridgeSubscription>();
+
+ if (resourceObj == null) {
+ // there is no resource at the given path
+ return null;
+
+ } else if (resourceObj instanceof String[]) {
+
+ // get the paths for all SubscriptionContext instances
+ String[] subscriptionResourcePaths = (String[]) resourceObj;
+ if (log.isDebugEnabled()) {
+ for (String retrievedResourcePath : subscriptionResourcePaths) {
+ log.debug("Retrieved resource sub-path " + retrievedResourcePath);
+ }
+ }
+
+ // traverse the paths recursively
+ for (String subscriptionResourcePath : subscriptionResourcePaths) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Traversing resource path " + subscriptionResourcePath);
+ }
+
+ cartridgeSubscriptions.addAll(traverseAndGetCartridgeSubscriptions(subscriptionResourcePath));
+ }
+
+ } else {
+ // De-serialize
+ Object subscriptionObj;
+
+ try {
+ subscriptionObj = Deserializer.deserializeFromByteArray((byte[]) resourceObj);
+
+ } catch (Exception e) {
+ // issue might be de-serializing only this object, therefore log and continue without throwing
+ log.error("Error while de-serializing the object retrieved from " + resourcePath, e);
+ return null;
+ }
+
+ if (subscriptionObj != null && subscriptionObj instanceof CartridgeSubscription) {
+
+ CartridgeSubscription deserilizedCartridgeSubscription = (CartridgeSubscription) subscriptionObj;
+ if (log.isDebugEnabled()) {
+ log.debug("Successfully de-serialized CartridgeSubscription: " + deserilizedCartridgeSubscription.toString());
+ }
+
+ //return Collections.singletonList(deserilizedCartridgeSubscription);
+ cartridgeSubscriptions.add(deserilizedCartridgeSubscription);
+
+ }
+ }
+
+ // remove any nulls
+ cartridgeSubscriptions.removeAll(Collections.singleton(null));
+ return cartridgeSubscriptions;
+ }
+
+ /*@Override
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId) throws PersistenceManagerException {
+
+ Object byteObj;
+
+ try {
+ byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId));
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ Object subscriptionContextObj;
+
+ try {
+ subscriptionContextObj = Deserializer.deserializeFromByteArray((byte[]) byteObj);
+
+ } catch (Exception e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ if (subscriptionContextObj instanceof SubscriptionContext) {
+ //get all Subscriptions for this tenant
+ return ((SubscriptionContext) subscriptionContextObj).getSubscriptions();
+ }
+
+ return null;
+ }*/
+
+ @Override
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId) throws PersistenceManagerException {
+
+ return traverseAndGetCartridgeSubscriptions(STRATOS_MANAGER_REOSURCE + ACTIVE_SUBSCRIPTIONS + "/" + Integer.toString(tenantId));
+ }
+
+ @Override
+ public void persistService(Service service) throws PersistenceManagerException {
+
+ int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
+ if (tenantId != MultitenantConstants.SUPER_TENANT_ID) {
+ // TODO: This is only a workaround. Proper fix is to write to tenant registry
+ try {
+ PrivilegedCarbonContext.startTenantFlow();
+ PrivilegedCarbonContext carbonContext = PrivilegedCarbonContext.getThreadLocalCarbonContext();
+ carbonContext.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
+ carbonContext.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
+
+ persistDeployedService(service);
+
+ } finally {
+ PrivilegedCarbonContext.endTenantFlow();
+ }
+
+ } else {
+ persistDeployedService(service);
+ }
+ }
+
+ private void persistDeployedService (Service service) throws PersistenceManagerException {
+
+ // persist Service
+ try {
+ RegistryManager.getInstance().persist(STRATOS_MANAGER_REOSURCE + SERVICES + "/" + service.getType(),
+ Serializer.serializeServiceToByteArray(service), null);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Persisted Service successfully: [ " + service.getType() + ", " + service.getTenantRange() + " ]");
+ }
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+
+ } catch (IOException e) {
+ throw new PersistenceManagerException(e);
+ }
+ }
+
+ @Override
+ public Service getService(String cartridgeType) throws PersistenceManagerException {
+
+ int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
+ if (tenantId != MultitenantConstants.SUPER_TENANT_ID) {
+ // TODO: This is only a workaround. Proper fix is to write to tenant registry
+ try {
+ PrivilegedCarbonContext.startTenantFlow();
+ PrivilegedCarbonContext carbonContext = PrivilegedCarbonContext.getThreadLocalCarbonContext();
+ carbonContext.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
+ carbonContext.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
+
+ return getDeployedService(cartridgeType);
+
+ } finally {
+ PrivilegedCarbonContext.endTenantFlow();
+ }
+
+ } else {
+ return getDeployedService(cartridgeType);
+ }
+ }
+
+ public Service getDeployedService (String cartridgeType) throws PersistenceManagerException {
+
+ Object byteObj;
+
+ try {
+ byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + SERVICES + "/" + cartridgeType);
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ if (byteObj == null) {
+ return null;
+ }
+
+ Object serviceObj;
+
+ try {
+ serviceObj = Deserializer.deserializeFromByteArray((byte[]) byteObj);
+
+ } catch (Exception e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ if (serviceObj instanceof Service) {
+ return (Service) serviceObj;
+ }
+
+ return null;
+ }
+
+ @Override
+ public void removeService(String cartridgeType) throws PersistenceManagerException {
+
+ int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
+ if (tenantId != MultitenantConstants.SUPER_TENANT_ID) {
+ // TODO: This is only a workaround. Proper fix is to write to tenant registry
+ try {
+ PrivilegedCarbonContext.startTenantFlow();
+ PrivilegedCarbonContext carbonContext = PrivilegedCarbonContext.getThreadLocalCarbonContext();
+ carbonContext.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
+ carbonContext.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
+
+ removeDeployedService(cartridgeType);
+
+ } finally {
+ PrivilegedCarbonContext.endTenantFlow();
+ }
+
+ } else {
+ removeDeployedService(cartridgeType);
+ }
+ }
+
+ private void removeDeployedService (String cartridgeType) throws PersistenceManagerException {
+
+ String resourcePath = STRATOS_MANAGER_REOSURCE + SERVICES + "/" + cartridgeType;
+
+ try {
+ RegistryManager.getInstance().delete(resourcePath);
+ if (log.isDebugEnabled()) {
+ log.debug("Deleted Service on path " + resourcePath + " successfully");
+ }
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+ }
+
+ /*@Override
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId) throws PersistenceManagerException {
+
+ Object byteObj;
+
+ try {
+ byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId));
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ Object subscriptionContextObj;
+
+ try {
+ subscriptionContextObj = Deserializer.deserializeFromByteArray((byte[]) byteObj);
+
+ } catch (Exception e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ if (subscriptionContextObj instanceof SubscriptionContext) {
+ //get all Subscriptions for this tenant
+ return ((SubscriptionContext) subscriptionContextObj).getSubscriptions();
+ }
+
+ return null;
+ }*/
+
+ /*@Override
+ public CartridgeSubscription getCartridgeSubscription (String clusterDomain) throws PersistenceManagerException {
+
+ Object byteObj;
+
+ try {
+ byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + CLUSTER_ID_TO_SUBSCRIPTION + "/" + clusterDomain);
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ Object clusterIdToSubscriptionObj;
+
+ try {
+ clusterIdToSubscriptionObj = Deserializer.deserializeFromByteArray((byte[]) byteObj);
+
+ } catch (Exception e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ if (clusterIdToSubscriptionObj instanceof ClusterIdToSubscription) {
+ ((ClusterIdToSubscription) clusterIdToSubscriptionObj).getSubscription(clusterDomain);
+ }
+
+ return null;
+ }*/
+
+ /*@Override
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId, String cartridgeType) throws PersistenceManagerException {
+
+ Object byteObj;
+
+ try {
+ byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId));
+
+ } catch (RegistryException e) {
+ throw new PersistenceManagerException(e);
+ }
+
+ Object subscriptionContextObj;
+
+ try {
+ subscriptionContextObj = Deserializer.deserializeFromByteArray((byte[]) byteObj);
+
+ } catch (Exception e) {
+ throw new PersistenceManagerException(e);
+ }
+
+
+ if (subscriptionContextObj instanceof SubscriptionContext) {
+ //get all Subscriptions for this tenant and the type
+ return ((SubscriptionContext) subscriptionContextObj).getSubscriptionsOfType(cartridgeType);
+ }
+
+ return null;
+ }*/
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d280533/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/ArtifactUpdatePublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/ArtifactUpdatePublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/ArtifactUpdatePublisher.java
new file mode 100644
index 0000000..6cb2022
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/ArtifactUpdatePublisher.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.manager.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.internal.DataHolder;
+import org.apache.stratos.manager.repository.Repository;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
+
+public class ArtifactUpdatePublisher {
+
+ private static final Log log = LogFactory.getLog(ArtifactUpdatePublisher.class);
+
+ private Repository repository;
+ private String clusterId;
+ private String tenantId;
+
+ public ArtifactUpdatePublisher(Repository repository, String clusterId, String tenantId) {
+ this.repository = repository;
+ this.clusterId = clusterId;
+ this.tenantId = tenantId;
+
+ }
+
+ public void publish() {
+ EventPublisher depsyncEventPublisher = DataHolder.getEventPublisher();
+ log.info("publishing ** ");
+ depsyncEventPublisher.publish(createArtifactUpdateEvent());
+ }
+
+ private ArtifactUpdatedEvent createArtifactUpdateEvent() {
+ ArtifactUpdatedEvent artifactUpdateEvent = new ArtifactUpdatedEvent();
+ artifactUpdateEvent.setClusterId(clusterId);
+ artifactUpdateEvent.setRepoUserName(repository.getUserName());
+ artifactUpdateEvent.setRepoPassword(repository.getPassword());
+ artifactUpdateEvent.setRepoURL(repository.getUrl());
+ artifactUpdateEvent.setTenantId(tenantId);
+
+ log.info("Creating artifact updated event ");
+ log.info("cluster Id : " + artifactUpdateEvent.getClusterId());
+ log.info("repo url : " + artifactUpdateEvent.getRepoURL());
+ log.info("repo username : " + artifactUpdateEvent.getRepoUserName());
+ log.info("repo pwd : " + artifactUpdateEvent.getRepoPassword());
+ log.info("tenant Id : " + artifactUpdateEvent.getTenantId());
+ return artifactUpdateEvent;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d280533/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
new file mode 100644
index 0000000..2df631a
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.beans.TenantInfoBean;
+import org.apache.stratos.common.exception.StratosException;
+import org.apache.stratos.common.listeners.TenantMgtListener;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.apache.stratos.messaging.event.tenant.TenantCreatedEvent;
+import org.apache.stratos.messaging.event.tenant.TenantRemovedEvent;
+import org.apache.stratos.messaging.event.tenant.TenantUpdatedEvent;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * Tenant event publisher to publish tenant events to the message broker by
+ * listening to the tenant manager.
+ */
+public class TenantEventPublisher implements TenantMgtListener {
+
+ private static final Log log = LogFactory.getLog(TenantEventPublisher.class);
+ private static final int EXEC_ORDER = 1;
+
+
+ @Override
+ public void onTenantCreate(TenantInfoBean tenantInfo) throws StratosException {
+ try {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Publishing tenant created event: [tenant-id] %d [tenant-domain] %s", tenantInfo.getTenantId(), tenantInfo.getTenantDomain()));
+ }
+ Tenant tenant = new Tenant(tenantInfo.getTenantId(), tenantInfo.getTenantDomain());
+ TenantCreatedEvent event = new TenantCreatedEvent(tenant);
+ EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ eventPublisher.publish(event);
+ }
+ catch (Exception e) {
+ log.error("Could not publish tenant created event", e);
+ }
+ }
+
+ @Override
+ public void onTenantUpdate(TenantInfoBean tenantInfo) throws StratosException {
+ try {
+ if(log.isInfoEnabled()) {
+ log.info(String.format("Publishing tenant updated event: [tenant-id] %d [tenant-domain] %s", tenantInfo.getTenantId(), tenantInfo.getTenantDomain()));
+ }
+ TenantUpdatedEvent event = new TenantUpdatedEvent(tenantInfo.getTenantId(), tenantInfo.getTenantDomain());
+ EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ eventPublisher.publish(event);
+ }
+ catch (Exception e) {
+ log.error("Could not publish tenant updated event");
+ }
+ }
+
+ @Override
+ public void onTenantDelete(int tenantId) {
+ try {
+ if(log.isInfoEnabled()) {
+ log.info(String.format("Publishing tenant removed event: [tenant-id] %d", tenantId));
+ }
+ TenantRemovedEvent event = new TenantRemovedEvent(tenantId);
+ EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ eventPublisher.publish(event);
+ }
+ catch (Exception e) {
+ log.error("Could not publish tenant removed event");
+ }
+ }
+
+ @Override
+ public void onTenantRename(int tenantId, String oldDomainName, String newDomainName) throws StratosException {
+ }
+
+ @Override
+ public void onTenantInitialActivation(int tenantId) throws StratosException {
+ }
+
+ @Override
+ public void onTenantActivation(int tenantId) throws StratosException {
+ }
+
+ @Override
+ public void onTenantDeactivation(int tenantId) throws StratosException {
+ }
+
+ @Override
+ public void onSubscriptionPlanChange(int tenantId, String oldPlan, String newPlan) throws StratosException {
+ }
+
+ @Override
+ public int getListenerOrder() {
+ return EXEC_ORDER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d280533/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynchronizerTaskScheduler.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynchronizerTaskScheduler.java
new file mode 100644
index 0000000..f97b897
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynchronizerTaskScheduler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.ntask.common.TaskException;
+import org.wso2.carbon.ntask.core.TaskInfo;
+import org.wso2.carbon.ntask.core.TaskManager;
+import org.wso2.carbon.ntask.core.service.TaskService;
+
+import java.util.HashMap;
+
+/**
+ * Tenant synchronizer task scheduler for scheduling the tenant synchronizer task
+ * using carbon task service.
+ */
+public class TenantSynchronizerTaskScheduler {
+
+ private static final Log log = LogFactory.getLog(TenantSynzhronizerTask.class);
+
+ private static final String TENANT_SYNC_TASK_TYPE = "TENANT_SYNC_TASK_TYPE";
+ private static final String TENANT_SYNC_TASK_NAME = "TENANT_SYNC_TASK";
+ private static final String DEFAULT_CRON = "1 * * * * ? *";
+
+ public static void schedule(TaskService taskService) {
+ TaskManager taskManager = null;
+ try {
+
+ if (!taskService.getRegisteredTaskTypes().contains(TENANT_SYNC_TASK_TYPE)) {
+ // Register task type
+ taskService.registerTaskType(TENANT_SYNC_TASK_TYPE);
+
+ // Register task
+ taskManager = taskService.getTaskManager(TENANT_SYNC_TASK_TYPE);
+ TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(DEFAULT_CRON);
+ TaskInfo taskInfo = new TaskInfo(TENANT_SYNC_TASK_NAME,
+ TenantSynzhronizerTask.class.getName(),
+ new HashMap<String, String>(), triggerInfo);
+ taskManager.registerTask(taskInfo);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Tenant synchronization task scheduled: %s", TENANT_SYNC_TASK_NAME));
+ }
+ }
+
+ } catch (Exception e) {
+ if (taskManager != null) {
+ try {
+ taskManager.deleteTask(TENANT_SYNC_TASK_NAME);
+ } catch (TaskException te) {
+ if (log.isErrorEnabled()) {
+ log.error(te);
+ }
+ }
+ }
+ throw new RuntimeException(String.format("Could not schedule tenant synchronization task: %s", TENANT_SYNC_TASK_NAME), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d280533/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
new file mode 100644
index 0000000..a3e0444
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.internal.DataHolder;
+import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
+import org.apache.stratos.manager.subscription.CartridgeSubscription;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
+import org.apache.stratos.messaging.util.Constants;
+import org.wso2.carbon.ntask.core.Task;
+import org.wso2.carbon.user.core.tenant.TenantManager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tenant synchronizer task for publishing complete tenant event periodically
+ * to message broker.
+ */
+public class TenantSynzhronizerTask implements Task {
+
+ private static final Log log = LogFactory.getLog(TenantSynzhronizerTask.class);
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public void execute() {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Publishing complete tenant event"));
+ }
+ Tenant tenant;
+ List<Tenant> tenants = new ArrayList<Tenant>();
+ TenantManager tenantManager = DataHolder.getRealmService().getTenantManager();
+ org.wso2.carbon.user.api.Tenant[] carbonTenants = tenantManager.getAllTenants();
+ for (org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) {
+ // Create tenant
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Tenant found: [tenant-id] %d [tenant-domain] %s", carbonTenant.getId(), carbonTenant.getDomain()));
+ }
+ tenant = new Tenant(carbonTenant.getId(), carbonTenant.getDomain());
+ // Add subscriptions
+ /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ //List<CartridgeSubscriptionInfo> cartridgeSubscriptions = PersistenceManager.getSubscriptionsForTenant(tenant.getTenantId());
+ /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ Collection<CartridgeSubscription> cartridgeSubscriptions = new DataInsertionAndRetrievalManager().getCartridgeSubscriptions(tenant.getTenantId());
+ for (CartridgeSubscription subscription : cartridgeSubscriptions) {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Tenant subscription found: [tenant-id] %d [tenant-domain] %s [service] %s",
+ carbonTenant.getId(), carbonTenant.getDomain(), subscription.getType()));
+ }
+ tenant.addServiceSubscription(subscription.getType());
+ }
+ tenants.add(tenant);
+ }
+ CompleteTenantEvent event = new CompleteTenantEvent(tenants);
+ EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ eventPublisher.publish(event);
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not publish complete tenant event", e);
+ }
+ }
+ }
+
+ @Override
+ public void setProperties(Map<String, String> stringStringMap) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d280533/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/registry/RegistryManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/registry/RegistryManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/registry/RegistryManager.java
new file mode 100644
index 0000000..5e92400
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/registry/RegistryManager.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.registry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.internal.DataHolder;
+import org.wso2.carbon.registry.core.Resource;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+import org.wso2.carbon.registry.core.exceptions.ResourceNotFoundException;
+import org.wso2.carbon.registry.core.service.RegistryService;
+import org.wso2.carbon.registry.core.session.UserRegistry;
+
+public class RegistryManager {
+
+ private final static Log log = LogFactory.getLog(RegistryManager.class);
+
+ private static final String STRATOS_MANAGER_REOSURCE = "/stratos_manager";
+
+ private static RegistryService registryService;
+ private static volatile RegistryManager registryManager;
+
+ public static RegistryManager getInstance() {
+
+ if (registryManager == null) {
+ synchronized (RegistryManager.class) {
+ if (registryManager == null) {
+ registryManager = new RegistryManager();
+ }
+ }
+ }
+ return registryManager;
+ }
+
+ private RegistryManager() {
+ registryService = DataHolder.getRegistryService();
+ }
+
+ private UserRegistry initRegistry (int tenantId) throws RegistryException {
+
+ UserRegistry tenantGovRegistry = registryService.getGovernanceSystemRegistry(tenantId);
+ /*if (tenantGovRegistry == null) {
+ String errorMsg = "Tenant " + tenantId + "'s governance registry is not initialized";
+ log.error(errorMsg);
+ throw new ADCException(errorMsg);
+ }*/
+
+ // check if the resource is available, else create it
+ if (!tenantGovRegistry.resourceExists(STRATOS_MANAGER_REOSURCE)) {
+ synchronized (RegistryManager.class) {
+ try {
+ if (!tenantGovRegistry.resourceExists(STRATOS_MANAGER_REOSURCE)) {
+ tenantGovRegistry.put(STRATOS_MANAGER_REOSURCE, tenantGovRegistry.newCollection());
+ }
+ } catch (RegistryException e) {
+ String errorMsg = "Failed to create the registry resource " + STRATOS_MANAGER_REOSURCE;
+ log.error(errorMsg, e);
+ throw e;
+ }
+ }
+ }
+
+ return tenantGovRegistry;
+ }
+
+ private UserRegistry initRegistry () throws RegistryException {
+
+ UserRegistry govRegistry = registryService.getGovernanceSystemRegistry();
+ /*if (govRegistry == null) {
+ String errorMsg = "Governance registry is not initialized";
+ log.error(errorMsg);
+ throw new ADCException(errorMsg);
+ }*/
+
+ // check if the resource is available, else create it
+ if (!govRegistry.resourceExists(STRATOS_MANAGER_REOSURCE)) {
+ synchronized (RegistryManager.class) {
+ try {
+ if (!govRegistry.resourceExists(STRATOS_MANAGER_REOSURCE)) {
+ govRegistry.put(STRATOS_MANAGER_REOSURCE, govRegistry.newCollection());
+ }
+ } catch (RegistryException e) {
+ String errorMsg = "Failed to create the registry resource " + STRATOS_MANAGER_REOSURCE;
+ log.error(errorMsg, e);
+ throw e;
+ }
+ }
+ }
+
+ return govRegistry;
+ }
+
+ /*public void persistSubscriptionContext (int tenantId, SubscriptionContext subscriptionContext)
+ throws RegistryException, ADCException {
+
+ //TODO: uncomment
+ //UserRegistry tenantGovRegistry = initRegistry(tenantId);
+ //temporary
+ UserRegistry tenantGovRegistry = initRegistry();
+
+ try {
+ tenantGovRegistry.beginTransaction();
+ Resource nodeResource = tenantGovRegistry.newResource();
+ nodeResource.setContent(Serializer.serializeSubscriptionSontextToByteArray(subscriptionContext));
+ tenantGovRegistry.putSubscription(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT, nodeResource);
+ tenantGovRegistry.commitTransaction();
+
+ } catch (Exception e) {
+ String errorMsg = "Failed to persist SubscriptionContext in registry.";
+ tenantGovRegistry.rollbackTransaction();
+ log.error(errorMsg, e);
+ throw new ADCException(errorMsg, e);
+ }
+ }
+
+ //TODO: retun the de-serialized object
+ public Object getSubscriptionContext(int tenantId) throws ADCException, RegistryException {
+
+ //TODO: uncomment
+ //UserRegistry tenantGovRegistry = registryService.getGovernanceSystemRegistry(tenantId);
+ //temporary
+ UserRegistry tenantGovRegistry = registryService.getGovernanceSystemRegistry();
+
+ if (tenantGovRegistry == null) {
+ String errorMsg = "Tenant " + tenantId + "'s governance registry is not initialized";
+ log.error(errorMsg);
+ throw new ADCException(errorMsg);
+ }
+
+ try {
+ Resource resource = tenantGovRegistry.get(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT);
+ return resource.getContent();
+
+ } catch (ResourceNotFoundException ignore) {
+ log.error("Sepcified resource not found at " + STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT);
+ return null;
+
+ } catch (RegistryException e) {
+ String errorMsg = "Failed to retrieve SubscriptionContext from registry.";
+ log.error(errorMsg, e);
+ throw new ADCException(errorMsg, e);
+ }
+ }*/
+
+ /*public Object getSubscriptionContexts() throws RegistryException {
+
+ UserRegistry registry = registryService.getGovernanceSystemRegistry();
+ return retrieve(registry, STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT);
+
+
+ if ((resourceObj == null) || !(resourceObj instanceof String[])) {
+ return null;
+ }
+
+ // get the paths for all SubscriptionContext instnaces
+ String[] subscriptionCtxtResourcePaths = (String[]) resourceObj;
+
+ Collection<SubscriptionContext> cartridgeSubscriptionCtxts;
+ //for each path, get the SubscriptionContext instance
+ for (String subscriptionCtxResourcePath : subscriptionCtxtResourcePaths) {
+ Object subscriptionCtxObj = retrieve(registry, subscriptionCtxResourcePath);
+ if (subscriptionCtxObj != null && subscriptionCtxObj instanceof SubscriptionContext) {
+
+ }
+ }
+
+ }*/
+
+ /*public void persistClusterIdToSubscription (ClusterIdToSubscription clusterIdToSubscription)
+ throws RegistryException, ADCException {
+
+ UserRegistry govRegistry = initRegistry();
+
+ try {
+ govRegistry.beginTransaction();
+ Resource nodeResource = govRegistry.newResource();
+ nodeResource.setContent(Serializer.serializeClusterIdToSubscriptionToByteArray(clusterIdToSubscription));
+ govRegistry.putSubscription(STRATOS_MANAGER_REOSURCE + CLUSTER_ID_TO_SUBSCRIPTION, nodeResource);
+ govRegistry.commitTransaction();
+
+ } catch (Exception e) {
+ String errorMsg = "Failed to persist ClusterIdToSubscription in registry.";
+ govRegistry.rollbackTransaction();
+ log.error(errorMsg, e);
+ throw new ADCException(errorMsg, e);
+ }
+ }
+
+ //TODO: retun the de-serialized object
+ public Object getClusterIdToSubscription () throws ADCException, RegistryException {
+
+ UserRegistry govRegistry = registryService.getGovernanceSystemRegistry();
+ if (govRegistry == null) {
+ String errorMsg = "Governance registry is not initialized";
+ log.error(errorMsg);
+ throw new ADCException(errorMsg);
+ }
+
+ try {
+ Resource resource = govRegistry.get(STRATOS_MANAGER_REOSURCE + CLUSTER_ID_TO_SUBSCRIPTION);
+ return resource.getContent();
+
+ } catch (ResourceNotFoundException ignore) {
+ return null;
+
+ } catch (RegistryException e) {
+ String errorMsg = "Failed to retrieve ClusterIdToSubscription from registry.";
+ log.error(errorMsg, e);
+ throw new ADCException(errorMsg, e);
+ }
+ }*/
+
+ public void persist (String path, byte [] resourceBytes, String tag) throws RegistryException {
+
+ UserRegistry registry = initRegistry();
+
+ try {
+ registry.beginTransaction();
+ Resource nodeResource = registry.newResource();
+ nodeResource.setContent(resourceBytes);
+ // store the resource in the registry
+ registry.put(path, nodeResource);
+ if (tag != null) {
+ // apply the tag
+ registry.applyTag(path, tag);
+ }
+ // commit
+ registry.commitTransaction();
+
+ } catch (RegistryException e) {
+ String errorMsg = "Failed to persist the given resource in registry path " + path;
+ log.error(errorMsg, e);
+ // rollback
+ try {
+ registry.rollbackTransaction();
+
+ } catch (RegistryException e1) {
+ errorMsg = "Failed to rollback the transaction in registry path " + path;
+ log.error(errorMsg, e1);
+ throw e1;
+ }
+ throw e;
+ }
+ }
+
+ public void move (String sourcePath, String targetPath) throws RegistryException {
+
+ UserRegistry registry = initRegistry();
+
+ try {
+ registry.beginTransaction();
+ registry.move(sourcePath, targetPath);
+ registry.commitTransaction();
+
+ } catch (RegistryException e) {
+ String errorMsg = "Could not move the resource at "+ sourcePath + " to " + targetPath;
+ log.error(errorMsg, e);
+ // rollback
+ try {
+ registry.rollbackTransaction();
+
+ } catch (RegistryException e1) {
+ errorMsg = "Failed to rollback moving the resource at " + sourcePath + " to " + targetPath;
+ log.error(errorMsg, e1);
+ throw e1;
+ }
+ throw e;
+ }
+ }
+
+ public void delete (String resourcePath) throws RegistryException {
+
+ UserRegistry registry = initRegistry();
+
+ try {
+ registry.beginTransaction();
+ registry.delete(resourcePath);
+ registry.commitTransaction();
+
+ } catch (RegistryException e) {
+ String errorMsg = "Could not delete resource at "+ resourcePath;
+ log.error(errorMsg, e);
+ // rollback
+ try {
+ registry.rollbackTransaction();
+
+ } catch (RegistryException e1) {
+ errorMsg = "Failed to rollback the transaction in registry path " + resourcePath;
+ log.error(errorMsg, e1);
+ throw e1;
+ }
+ throw e;
+ }
+ }
+
+ public Object retrieve (String resourcePath) throws RegistryException {
+
+ UserRegistry registry = initRegistry();
+
+ Resource resource;
+
+ try {
+ resource = registry.get(resourcePath);
+
+ } catch (ResourceNotFoundException ignore) {
+ // nothing has been persisted in the registry yet
+ if(log.isDebugEnabled()) {
+ log.debug("No resource found in the registry path " + resourcePath);
+ }
+ return null;
+
+ } catch (RegistryException e) {
+ String errorMsg = "Failed to retrieve the Resource at " + resourcePath + " from registry.";
+ log.error(errorMsg, e);
+ throw e;
+ }
+
+ return resource.getContent();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d280533/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/repository/Repository.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/repository/Repository.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/repository/Repository.java
new file mode 100644
index 0000000..16cc2c4
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/repository/Repository.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.repository;
+
+import java.io.Serializable;
+
+public class Repository implements Serializable {
+
+ private static final long serialVersionUID = 8386330511938705573L;
+ private int id;
+ private String url;
+ private String userName;
+ private String password;
+ private boolean isPrivateRepository;
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public boolean isPrivateRepository() {
+ return isPrivateRepository;
+ }
+
+ public void setPrivateRepository(boolean privateRepository) {
+ isPrivateRepository = privateRepository;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return "Repository [id=" + id + ", url=" + url + ", userName=" + userName +
+ ", isPrivateRepository=" + isPrivateRepository + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d280533/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java
new file mode 100644
index 0000000..90aa5cc
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.retriever;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.deploy.service.Service;
+import org.apache.stratos.manager.exception.PersistenceManagerException;
+import org.apache.stratos.manager.lookup.LookupDataHolder;
+import org.apache.stratos.manager.persistence.PersistenceManager;
+import org.apache.stratos.manager.persistence.RegistryBasedPersistenceManager;
+import org.apache.stratos.manager.subscription.CartridgeSubscription;
+
+import java.util.Collection;
+
+public class DataInsertionAndRetrievalManager {
+
+ private static final Log log = LogFactory.getLog(DataInsertionAndRetrievalManager.class);
+
+ // TODO: use a global object
+ private static PersistenceManager persistenceManager = new RegistryBasedPersistenceManager();
+
+ public void cacheAndPersistSubcription (CartridgeSubscription cartridgeSubscription) throws PersistenceManagerException {
+
+ // get the write lock
+ LookupDataHolder.getInstance().acquireWriteLock();
+
+ try {
+ // store in LookupDataHolder
+ LookupDataHolder.getInstance().putSubscription(cartridgeSubscription);
+
+ try {
+ // store in Persistence Manager
+ persistenceManager.persistCartridgeSubscription(cartridgeSubscription);
+
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Error in persisting CartridgeSubscription in Persistence Manager";
+ log.error(errorMsg, e);
+ // remove from the in memory model since persisting failed
+ LookupDataHolder.getInstance().removeSubscription(cartridgeSubscription.getSubscriber().getTenantId(), cartridgeSubscription.getType(),
+ cartridgeSubscription.getAlias(), cartridgeSubscription.getClusterDomain());
+
+ throw e;
+ }
+
+ } finally {
+ // release the write lock
+ LookupDataHolder.getInstance().releaseWriteLock();
+ }
+ }
+
+ public void removeSubscription (int tenantId, String subscriptionAlias) throws PersistenceManagerException {
+
+ CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(tenantId, subscriptionAlias);
+
+ String cartridgeType = cartridgeSubscription.getType();
+ String clusterId = cartridgeSubscription.getClusterDomain();
+
+ LookupDataHolder.getInstance().acquireWriteLock();
+
+ try {
+ // remove from persistence manager
+ try {
+ persistenceManager.removeCartridgeSubscription(tenantId, cartridgeType, subscriptionAlias);
+
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Error in removing CartridgeSubscription from Persistence Manager";
+ log.error(errorMsg, e);
+ throw e;
+ }
+
+ // remove from cache
+ LookupDataHolder.getInstance().removeSubscription(tenantId, cartridgeType, subscriptionAlias, clusterId);
+
+ } finally {
+ LookupDataHolder.getInstance().releaseWriteLock();
+ }
+ }
+
+ public void cachePersistedSubscriptions () throws PersistenceManagerException {
+
+ Collection<CartridgeSubscription> cartridgeSubscriptions;
+
+ // get the write lock
+ LookupDataHolder.getInstance().acquireWriteLock();
+
+ try {
+ try {
+ cartridgeSubscriptions = persistenceManager.getCartridgeSubscriptions();
+
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Error in retrieving CartridgeSubscriptions from Persistence Manager";
+ log.error(errorMsg, e);
+ throw e;
+ }
+
+ if(cartridgeSubscriptions == null || cartridgeSubscriptions.isEmpty()) {
+ if(log.isDebugEnabled()) {
+ log.debug("No CartridgeSubscriptions found to add to the cache");
+ }
+ return;
+ }
+ cacheSubscriptions(cartridgeSubscriptions);
+
+ } finally {
+ // release the write lock
+ LookupDataHolder.getInstance().releaseWriteLock();
+ }
+ }
+
+ public void cachePersistedSubscriptions (int tenantId) throws PersistenceManagerException {
+
+ Collection<CartridgeSubscription> cartridgeSubscriptions;
+
+ // get the write lock
+ LookupDataHolder.getInstance().acquireWriteLock();
+
+ try {
+ try {
+ cartridgeSubscriptions = persistenceManager.getCartridgeSubscriptions(tenantId);
+
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Error in retrieving CartridgeSubscriptions from Persistence Manager";
+ log.error(errorMsg, e);
+ throw e;
+ }
+
+ if(cartridgeSubscriptions == null || cartridgeSubscriptions.isEmpty()) {
+ if(log.isDebugEnabled()) {
+ log.debug("No CartridgeSubscriptions found to add to the cache");
+ }
+ return;
+ }
+ cacheSubscriptions(cartridgeSubscriptions);
+
+ } finally {
+ // release the write lock
+ LookupDataHolder.getInstance().releaseWriteLock();
+ }
+ }
+
+ private void cacheSubscriptions (Collection<CartridgeSubscription> cartridgeSubscriptions) {
+
+ // cache all
+ for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) {
+ LookupDataHolder.getInstance().putSubscription(cartridgeSubscription);
+ if (log.isDebugEnabled()) {
+ log.debug("Updated the in memory cache with the CartridgeSubscription: " + cartridgeSubscription.toString());
+ }
+ }
+ }
+
+ public void persistService (Service service) throws PersistenceManagerException {
+
+ persistenceManager.persistService(service);
+ }
+
+ public Service getService (String cartridgeType) throws PersistenceManagerException {
+
+ return persistenceManager.getService(cartridgeType);
+ }
+
+ public void removeService (String cartridgeType) throws PersistenceManagerException {
+
+ persistenceManager.removeService(cartridgeType);
+ }
+
+ /*public void persistAll (int tenantId) {
+
+ Collection<CartridgeSubscription> cartridgeSubscriptions = LookupDataHolder.getInstance().getSubscriptions(tenantId);
+
+ writeLock.lock();
+
+ try {
+ for(CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) {
+ try {
+ // store in Persistence Manager
+ persistenceManager.persistCartridgeSubscription(cartridgeSubscription);
+
+ } catch (PersistenceManagerException e) {
+ log.error("Error in persisting CartridgeSubscription in Persistence Manager", e);
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }*/
+
+ public CartridgeSubscription getCartridgeSubscription (int tenantId, String subscriptionAlias) {
+
+ // acquire read lock
+ LookupDataHolder.getInstance().acquireReadLock();
+
+ try {
+ CartridgeSubscription cartridgeSubscription = LookupDataHolder.getInstance().getSubscriptionForAlias(tenantId, subscriptionAlias);
+
+ /*if (cartridgeSubscription == null) {
+ // not available in the cache, look in the registry
+ if (log.isDebugEnabled()) {
+ log.debug("CartridgeSubscription for tenant " + tenantId + ", alias " + subscriptionAlias + " not available in memory");
+ }
+
+ try {
+ cartridgeSubscription = persistenceManager.getCartridgeSubscription(tenantId, subscriptionAlias);
+
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Error in accessing Persistence Manager";
+ log.error(errorMsg, e);
+ return null;
+ }
+
+ // add to the LookupDataHolder
+ // LookupDataHolder.getInstance().putSubscription(cartridgeSubscription);
+ }*/
+
+ return cartridgeSubscription;
+
+ } finally {
+ // release read lock
+ LookupDataHolder.getInstance().releaseReadLock();
+ }
+ }
+
+ public CartridgeSubscription getCartridgeSubscription (String clusterId) {
+
+ // acquire read lock
+ LookupDataHolder.getInstance().acquireReadLock();
+
+ try {
+ CartridgeSubscription cartridgeSubscription = LookupDataHolder.getInstance().getSubscription(clusterId);
+ /*if (cartridgeSubscription == null) {
+ // not available in the cache, look in the registry
+ if (log.isDebugEnabled()) {
+ log.debug("CartridgeSubscription for cluster " + clusterId + " not available in memory");
+ }
+
+ try {
+ cartridgeSubscription = persistenceManager.getCartridgeSubscription(clusterId);
+
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Error in accessing Persistence Manager";
+ log.error(errorMsg, e);
+ return null;
+ }
+
+ // add to the LookupDataHolder
+ // LookupDataHolder.getInstance().putSubscription(cartridgeSubscription);
+ }*/
+
+ return cartridgeSubscription;
+
+ } finally {
+ // release read lock
+ LookupDataHolder.getInstance().releaseReadLock();
+ }
+ }
+
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId) {
+
+ // acquire read lock
+ LookupDataHolder.getInstance().acquireReadLock();
+
+ try {
+ Collection<CartridgeSubscription> cartridgeSubscriptions = LookupDataHolder.getInstance().getSubscriptions(tenantId);
+ /*if (cartridgeSubscriptions == null) {
+ // not available in the cache, look in the registry
+ if (log.isDebugEnabled()) {
+ log.debug("CartridgeSubscriptions for tenant " + tenantId + " not available in memory");
+ }
+
+ try {
+ cartridgeSubscriptions = persistenceManager.getCartridgeSubscriptions(tenantId);
+
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Error in accessing Persistence Manager";
+ log.error(errorMsg, e);
+ return null;
+ }
+
+ // add to the LookupDataHolder
+ //Iterator<CartridgeSubscription> iterator = cartridgeSubscriptions.iterator();
+ //while (iterator.hasNext()) {
+ // LookupDataHolder.getInstance().putSubscription(iterator.next());
+ //}
+ }*/
+
+ return cartridgeSubscriptions;
+
+ } finally {
+ // release read lock
+ LookupDataHolder.getInstance().releaseReadLock();
+ }
+ }
+
+ public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId, String cartridgeType) {
+
+ // acquire read lock
+ LookupDataHolder.getInstance().acquireReadLock();
+
+ try {
+
+ Collection<CartridgeSubscription> cartridgeSubscriptions = LookupDataHolder.getInstance().getSubscriptionForType(tenantId, cartridgeType);
+ /*if (cartridgeSubscriptions == null) {
+ // not available in the cache, look in the registry
+ if (log.isDebugEnabled()) {
+ log.debug("CartridgeSubscriptions for tenant " + tenantId + ", type " + cartridgeType + " not available in memory");
+ }
+
+ try {
+ cartridgeSubscriptions = persistenceManager.getCartridgeSubscriptions(tenantId, cartridgeType);
+
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Error in accessing Persistence Manager";
+ log.error(errorMsg, e);
+ return null;
+ }
+
+ // add to the LookupDataHolder
+ // Iterator<CartridgeSubscription> iterator = cartridgeSubscriptions.iterator();
+ // while (iterator.hasNext()) {
+ // LookupDataHolder.getInstance().putSubscription(iterator.next());
+ //}
+ }*/
+
+ return cartridgeSubscriptions;
+
+ } finally {
+ // release read lock
+ LookupDataHolder.getInstance().releaseReadLock();
+ }
+ }
+}