You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2013/12/11 11:04:48 UTC
git commit: adding a CompleteTopologyEventProcessor
Updated Branches:
refs/heads/master 4c2962f53 -> 221f3dbfe
adding a CompleteTopologyEventProcessor
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/221f3dbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/221f3dbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/221f3dbf
Branch: refs/heads/master
Commit: 221f3dbfea74b63344f3b6d3357bdec77139d3e1
Parents: 4c2962f
Author: Isuru <is...@wso2.com>
Authored: Wed Dec 11 15:34:11 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Wed Dec 11 15:34:11 2013 +0530
----------------------------------------------------------------------
.../CompleteTopologyEventProcessor.java | 122 +++++++++++++++++++
.../processor/InstanceStatusProcessor.java | 2 +
.../processor/TopologyEventProcessorChain.java | 15 ++-
3 files changed, 133 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/221f3dbf/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/CompleteTopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/CompleteTopologyEventProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/CompleteTopologyEventProcessor.java
new file mode 100644
index 0000000..c37355f
--- /dev/null
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/CompleteTopologyEventProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.event.processor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
+import org.apache.stratos.adc.mgt.topology.model.TopologyClusterInformationModel;
+import org.apache.stratos.adc.mgt.utils.PersistenceManager;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import java.util.Collection;
+
+public class CompleteTopologyEventProcessor extends TopologyEventProcessor {
+
+ private static final Log log = LogFactory.getLog(CompleteTopologyEventProcessor.class);
+
+ @Override
+ public void process(Message message) {
+
+ doProcessing(message);
+ //go to next processor in the chain
+ if(nextTopologyEventProcessor != null) {
+ nextTopologyEventProcessor.process(message);
+ }
+ }
+
+ private void doProcessing (Message message) {
+
+ String messageType = null;
+
+ try {
+ messageType = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+ } catch (JMSException e) {
+ log.error("Error in getting message type from received Message " + message.getClass().toString(), e);
+ return;
+ }
+
+ log.info("Received Complete Topology message: " + messageType);
+
+ if (CompleteTopologyEvent.class.getName().equals(messageType)) {
+ log.info("Received message: " + messageType);
+
+ CompleteTopologyEvent event = getCompleteTopologyEvent(message);
+ //get all services
+ Collection<Service> serviceCollection = event.getTopology().getServices();
+ //iterate through the services
+ for (Service service : serviceCollection) {
+ //get all clusters
+ Collection<Cluster> clusterCollection = service.getClusters();
+ //iterate through the clusters
+ for (Cluster cluster : clusterCollection) {
+ CartridgeSubscriptionInfo cartridgeSubscriptionInfo =
+ getCartridgeSubscriptionInfo(cluster.getClusterId());
+
+ if(cartridgeSubscriptionInfo != null) {
+ //add the information to Topology Cluster Info. model
+ TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+ cartridgeSubscriptionInfo.getCartridge(),
+ cartridgeSubscriptionInfo.getAlias(), cluster);
+ }
+ }
+ }
+ }
+ }
+
+ private CompleteTopologyEvent getCompleteTopologyEvent (Message message) {
+
+ String json = null;
+ try {
+ json = ((TextMessage)message).getText();
+
+ } catch (JMSException e) {
+ log.error("Error in getting Json message type from received Message ", e);
+ return null;
+ }
+ CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(json, CompleteTopologyEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received message details: [ " +
+ "Services: " + event.getTopology().getServices().toString() + " ]");
+ }
+
+ return event;
+ }
+
+ private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) {
+
+ try {
+ return PersistenceManager.getSubscriptionFromClusterId(clusterDomain);
+
+ } catch (Exception e) {
+ log.error("Error getting subscription information for cluster " + clusterDomain, e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/221f3dbf/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
index d034962..1c13266 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
@@ -72,6 +72,8 @@ public class InstanceStatusProcessor extends TopologyEventProcessor {
return;
}
+ log.info("Received Topology message: " + messageType);
+
if (MemberStartedEvent.class.getName().equals(messageType)) {
log.info("Received message: " + messageType);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/221f3dbf/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
index 5c25c59..1760449 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
@@ -23,11 +23,13 @@ import javax.jms.Message;
public class TopologyEventProcessorChain {
- private TopologyEventProcessor firstTopologyEventProcessor = null;
+ private TopologyEventProcessor completeTopologyEventProcessor = null;
+ private TopologyEventProcessor instanceStatusEventProcessor = null;
private static TopologyEventProcessorChain topologyEventProcessorChain;
private TopologyEventProcessorChain () {
- firstTopologyEventProcessor = new InstanceStatusProcessor();
+ completeTopologyEventProcessor = new CompleteTopologyEventProcessor();
+ instanceStatusEventProcessor = new InstanceStatusProcessor();
}
public static TopologyEventProcessorChain getInstance () {
@@ -46,13 +48,14 @@ public class TopologyEventProcessorChain {
public void initProcessorChain () {
//if any other topology event processors are added, link them as follows
- //firstTopologyEventProcessor.setNext(secondTopologyeventProcessor);
- //secondTopologyeventProcessor.setNext(null);
- firstTopologyEventProcessor.setNext(null);
+ //instanceStatusEventProcessor.setNext(nextTopologyeventProcessor);
+ //nextTopologyeventProcessor.setNext(null);
+ completeTopologyEventProcessor.setNext(instanceStatusEventProcessor);
+ instanceStatusEventProcessor.setNext(null);
}
public void startProcessing (Message message) {
- firstTopologyEventProcessor.process(message);
+ completeTopologyEventProcessor.process(message);
}