You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2013/12/11 13:25:44 UTC
[1/3] git commit: handling cluster removal at SM topology model
Updated Branches:
refs/heads/master 81186e979 -> e6714baa6
handling cluster removal at SM topology model
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7fbb68d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7fbb68d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7fbb68d9
Branch: refs/heads/master
Commit: 7fbb68d92ecd6b19733890870ce7868af13cbedf
Parents: 0bbf1f1
Author: Isuru <is...@wso2.com>
Authored: Wed Dec 11 17:52:38 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Wed Dec 11 17:52:38 2013 +0530
----------------------------------------------------------------------
.../processor/ClusterStatusEventProcessor.java | 117 ++++++
.../processor/InstanceStatusEventProcessor.java | 412 +++++++++++++++++++
.../processor/InstanceStatusProcessor.java | 412 -------------------
.../processor/TopologyEventProcessorChain.java | 7 +-
4 files changed, 534 insertions(+), 414 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fbb68d9/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/ClusterStatusEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/ClusterStatusEventProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/ClusterStatusEventProcessor.java
new file mode 100644
index 0000000..00b2b28
--- /dev/null
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/ClusterStatusEventProcessor.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.event.processor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
+import org.apache.stratos.adc.mgt.topology.model.TopologyClusterInformationModel;
+import org.apache.stratos.adc.mgt.utils.PersistenceManager;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+public class ClusterStatusEventProcessor extends TopologyEventProcessor {
+
+ private static final Log log = LogFactory.getLog(ClusterStatusEventProcessor.class);
+
+ @Override
+ public void process(Message message) {
+
+ doProcessing(message);
+ //go to next processor in the chain
+ if(nextTopologyEventProcessor != null) {
+ nextTopologyEventProcessor.process(message);
+ }
+ }
+
+ private void doProcessing (Message message) {
+
+ String messageType = null;
+
+ try {
+ messageType = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+ } catch (JMSException e) {
+ log.error("Error in getting message type from received Message " + message.getClass().toString(), e);
+ return;
+ }
+
+ log.info("Received Cluster Status message: " + messageType);
+
+ if (ClusterRemovedEvent.class.getName().equals(messageType)) {
+
+ log.info("Received message: " + messageType);
+
+ ClusterRemovedEvent event = getClusterRemovedEvent(message);
+
+ CartridgeSubscriptionInfo cartridgeSubscriptionInfo =
+ getCartridgeSubscriptionInfo(event.getClusterId());
+
+ if (cartridgeSubscriptionInfo != null) {
+ //add the information to Topology Cluster Info. model
+ Cluster cluster = TopologyManager.getTopology().
+ getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(event.getClusterId());
+
+ TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+ cartridgeSubscriptionInfo.getCartridge(),
+ cartridgeSubscriptionInfo.getAlias(), cluster);
+ }
+ }
+ }
+
+ private ClusterRemovedEvent getClusterRemovedEvent (Message message) {
+
+ String json = null;
+ try {
+ json = ((TextMessage)message).getText();
+
+ } catch (JMSException e) {
+ log.error("Error in getting Json message type from received Message ", e);
+ return null;
+ }
+ ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(json, ClusterRemovedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received message details: [ " +
+ "Cluster Id: " + event.getClusterId() +
+ "\nService name: " + event.getServiceName() + " ]");
+ }
+
+ return event;
+ }
+
+ private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) {
+
+ try {
+ return PersistenceManager.getSubscriptionFromClusterId(clusterDomain);
+
+ } catch (Exception e) {
+ log.error("Error getting subscription information for cluster " + clusterDomain, e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fbb68d9/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusEventProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusEventProcessor.java
new file mode 100644
index 0000000..8eeaf64
--- /dev/null
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusEventProcessor.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.event.processor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
+import org.apache.stratos.adc.mgt.topology.model.TopologyClusterInformationModel;
+import org.apache.stratos.adc.mgt.utils.PersistenceManager;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import java.util.HashMap;
+import java.util.Map;
+
+public class InstanceStatusEventProcessor extends TopologyEventProcessor {
+
+ private static final Log log = LogFactory.getLog(InstanceStatusEventProcessor.class);
+
+ private Map<String, Integer> clusterIdToActiveInstanceCountMap;
+
+ public InstanceStatusEventProcessor() {
+ clusterIdToActiveInstanceCountMap = new HashMap<String, Integer>();
+ }
+
+ @Override
+ public void process(Message message) {
+
+ //new InstanceStatusListenerThread(message).start();
+ doProcessing(message);
+ //go to next processor in the chain
+ if(nextTopologyEventProcessor != null) {
+ nextTopologyEventProcessor.process(message);
+ }
+ }
+
+ private void doProcessing (Message message) {
+
+ String messageType = null;
+
+ try {
+ messageType = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+ } catch (JMSException e) {
+ log.error("Error in getting message type from received Message " + message.getClass().toString(), e);
+ return;
+ }
+
+ log.info("Received Topology message: " + messageType);
+
+ if (MemberStartedEvent.class.getName().equals(messageType)) {
+
+ log.info("Received message: " + messageType);
+
+ MemberStartedEvent event = getMemberStartedEvent(message);
+ String clusterDomain = event.getClusterId();
+ CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
+
+ if(cartridgeSubscriptionInfo != null) {
+ Cluster cluster = TopologyManager.getTopology().
+ getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+ TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+ cartridgeSubscriptionInfo.getCartridge(),
+ cartridgeSubscriptionInfo.getAlias(), cluster);
+ }
+
+ }
+ else if (MemberActivatedEvent.class.getName().equals(messageType)) {
+
+ log.info("Received message: " + messageType);
+
+ MemberActivatedEvent event = getMemberActivetedEvent(message);
+ String clusterDomain = event.getClusterId();
+ CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
+
+ if(cartridgeSubscriptionInfo != null) {
+ Cluster cluster = TopologyManager.getTopology().
+ getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+ TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+ cartridgeSubscriptionInfo.getCartridge(),
+ cartridgeSubscriptionInfo.getAlias(), cluster);
+ }
+
+
+ } else if (MemberSuspendedEvent.class.getName().equals(messageType)) {
+
+ log.info("Received message: " + messageType);
+
+ MemberStartedEvent event = getMemberStartedEvent(message);
+ String clusterDomain = event.getClusterId();
+ CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
+
+ if(cartridgeSubscriptionInfo != null) {
+ Cluster cluster = TopologyManager.getTopology().
+ getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+ TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+ cartridgeSubscriptionInfo.getCartridge(),
+ cartridgeSubscriptionInfo.getAlias(), cluster);
+ }
+
+ } else if (MemberTerminatedEvent.class.getName().equals(messageType)) {
+
+ log.info("Received message: " + messageType);
+
+ MemberStartedEvent event = getMemberStartedEvent(message);
+ String clusterDomain = event.getClusterId();
+ CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
+
+ if(cartridgeSubscriptionInfo != null) {
+ Cluster cluster = TopologyManager.getTopology().
+ getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+ TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+ cartridgeSubscriptionInfo.getCartridge(),
+ cartridgeSubscriptionInfo.getAlias(), cluster);
+ }
+
+ } else {
+ //cannot happen
+ }
+ }
+
+ private MemberStartedEvent getMemberStartedEvent (Message message) {
+
+ String json = null;
+ try {
+ json = ((TextMessage)message).getText();
+
+ } catch (JMSException e) {
+ log.error("Error in getting Json message type from received Message ", e);
+ return null;
+ }
+ MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received message details: [ " +
+ "Cluster Id: " + event.getClusterId() +
+ "\nMember Id: " + event.getMemberId() +
+ "\nService name: " + event.getServiceName() +
+ "\nStatus: " + event.getStatus().name() + " ]");
+ }
+
+ return event;
+ }
+
+ private MemberActivatedEvent getMemberActivetedEvent (Message message) {
+
+ String json = null;
+ try {
+ json = ((TextMessage)message).getText();
+
+ } catch (JMSException e) {
+ log.error("Error in getting Json message type from received Message ", e);
+ return null;
+ }
+ MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received message details: [ " +
+ "Cluster Id: " + event.getClusterId() +
+ "\nMember Id: " + event.getMemberId() +
+ "\nService name: " + event.getServiceName() +
+ "\nIp: " + event.getMemberIp() + " ]");
+ }
+
+ return event;
+ }
+
+ private MemberSuspendedEvent getMemberSuspendedEvent (Message message) {
+
+ String json = null;
+ try {
+ json = ((TextMessage)message).getText();
+
+ } catch (JMSException e) {
+ log.error("Error in getting Json message type from received Message ", e);
+ return null;
+ }
+ MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received message details: [ " +
+ "Cluster Id: " + event.getClusterId() +
+ "\nMember Id: " + event.getMemberId() +
+ "\nService name: " + event.getServiceName() + " ]");
+ }
+
+ return event;
+ }
+
+ private MemberTerminatedEvent getMemberTerminatedEvebt (Message message) {
+
+ String json = null;
+ try {
+ json = ((TextMessage)message).getText();
+
+ } catch (JMSException e) {
+ log.error("Error in getting Json message type from received Message ", e);
+ return null;
+ }
+ MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received message details: [ " +
+ "Cluster Id: " + event.getClusterId() +
+ "\nMember Id: " + event.getMemberId() +
+ "\nService name: " + event.getServiceName() + " ]");
+ }
+
+ return event;
+ }
+
+ private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) {
+
+ try {
+ return PersistenceManager.getSubscriptionFromClusterId(clusterDomain);
+
+ } catch (Exception e) {
+ log.error("Error getting subscription information for cluster " + clusterDomain, e);
+ return null;
+ }
+ }
+
+ /**
+ * Message Processing Thread class for InstanceStatusEventProcessor
+ */
+ /*private class InstanceStatusListenerThread extends Thread {
+
+ Message message;
+
+ public InstanceStatusListenerThread (Message message) {
+ this.message = message;
+ }
+
+ public void run () {
+
+ String messageType = null;
+
+ try {
+ messageType = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+ } catch (JMSException e) {
+ log.error("Error in getting message type from received Message " + message.getClass().toString(), e);
+ return;
+ }
+
+ if (MemberStartedEvent.class.getName().equals(messageType)) {
+
+ log.info("Received message: " + messageType);
+
+ MemberStartedEvent event = getMemberStartedEvent();
+ String clusterDomain = event.getClusterId();
+ CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
+ if(cartridgeSubscriptionInfo != null) {
+
+ }
+
+ }
+ else if (MemberActivatedEvent.class.getName().equals(messageType)) {
+
+ log.info("Received message: " + messageType);
+
+ MemberActivatedEvent event = getMemberActivetedEvent();
+ String clusterDomain = event.getClusterId();
+
+
+ } else if (MemberSuspendedEvent.class.getName().equals(messageType)) {
+
+ log.info("Received message: " + messageType);
+
+ MemberStartedEvent event = getMemberStartedEvent();
+ String clusterDomain = event.getClusterId();
+
+ } else if (MemberTerminatedEvent.class.getName().equals(messageType)) {
+
+ log.info("Received message: " + messageType);
+
+ MemberStartedEvent event = getMemberStartedEvent();
+ String clusterDomain = event.getClusterId();
+
+ } else {
+ //cannot happen
+ }
+ }
+
+ private MemberStartedEvent getMemberStartedEvent () {
+
+ String json = null;
+ try {
+ json = ((TextMessage)message).getText();
+
+ } catch (JMSException e) {
+ log.error("Error in getting Json message type from received Message ", e);
+ return null;
+ }
+ MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received message details: [ " +
+ "Cluster Id: " + event.getClusterId() +
+ "\nMember Id: " + event.getMemberId() +
+ "\nService name: " + event.getServiceName() +
+ "\nStatus: " + event.getStatus().name() + " ]");
+ }
+
+ return event;
+ }
+
+ private MemberActivatedEvent getMemberActivetedEvent () {
+
+ String json = null;
+ try {
+ json = ((TextMessage)message).getText();
+
+ } catch (JMSException e) {
+ log.error("Error in getting Json message type from received Message ", e);
+ return null;
+ }
+ MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received message details: [ " +
+ "Cluster Id: " + event.getClusterId() +
+ "\nMember Id: " + event.getMemberId() +
+ "\nService name: " + event.getServiceName() +
+ "\nIp: " + event.getMemberIp() + " ]");
+ }
+
+ return event;
+ }
+
+ private MemberSuspendedEvent getMemberSuspendedEvent () {
+
+ String json = null;
+ try {
+ json = ((TextMessage)message).getText();
+
+ } catch (JMSException e) {
+ log.error("Error in getting Json message type from received Message ", e);
+ return null;
+ }
+ MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received message details: [ " +
+ "Cluster Id: " + event.getClusterId() +
+ "\nMember Id: " + event.getMemberId() +
+ "\nService name: " + event.getServiceName() + " ]");
+ }
+
+ return event;
+ }
+
+ private MemberTerminatedEvent getMemberTerminatedEvebt () {
+
+ String json = null;
+ try {
+ json = ((TextMessage)message).getText();
+
+ } catch (JMSException e) {
+ log.error("Error in getting Json message type from received Message ", e);
+ return null;
+ }
+ MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received message details: [ " +
+ "Cluster Id: " + event.getClusterId() +
+ "\nMember Id: " + event.getMemberId() +
+ "\nService name: " + event.getServiceName() + " ]");
+ }
+
+ return event;
+ }
+
+ private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) {
+
+ try {
+ return PersistenceManager.getSubscriptionFromClusterId(clusterDomain);
+
+ } catch (Exception e) {
+ log.error("Error getting subscription information for cluster " + clusterDomain, e);
+ return null;
+ }
+ }
+ }*/
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fbb68d9/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
deleted file mode 100644
index 1c13266..0000000
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.adc.mgt.topology.event.processor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
-import org.apache.stratos.adc.mgt.topology.model.TopologyClusterInformationModel;
-import org.apache.stratos.adc.mgt.utils.PersistenceManager;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.util.Constants;
-import org.apache.stratos.messaging.util.Util;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-import java.util.HashMap;
-import java.util.Map;
-
-public class InstanceStatusProcessor extends TopologyEventProcessor {
-
- private static final Log log = LogFactory.getLog(InstanceStatusProcessor.class);
-
- private Map<String, Integer> clusterIdToActiveInstanceCountMap;
-
- public InstanceStatusProcessor () {
- clusterIdToActiveInstanceCountMap = new HashMap<String, Integer>();
- }
-
- @Override
- public void process(Message message) {
-
- //new InstanceStatusListenerThread(message).start();
- doProcessing(message);
- //go to next processor in the chain
- if(nextTopologyEventProcessor != null) {
- nextTopologyEventProcessor.process(message);
- }
- }
-
- private void doProcessing (Message message) {
-
- String messageType = null;
-
- try {
- messageType = message.getStringProperty(Constants.EVENT_CLASS_NAME);
-
- } catch (JMSException e) {
- log.error("Error in getting message type from received Message " + message.getClass().toString(), e);
- return;
- }
-
- log.info("Received Topology message: " + messageType);
-
- if (MemberStartedEvent.class.getName().equals(messageType)) {
-
- log.info("Received message: " + messageType);
-
- MemberStartedEvent event = getMemberStartedEvent(message);
- String clusterDomain = event.getClusterId();
- CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
-
- if(cartridgeSubscriptionInfo != null) {
- Cluster cluster = TopologyManager.getTopology().
- getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
- TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
- cartridgeSubscriptionInfo.getCartridge(),
- cartridgeSubscriptionInfo.getAlias(), cluster);
- }
-
- }
- else if (MemberActivatedEvent.class.getName().equals(messageType)) {
-
- log.info("Received message: " + messageType);
-
- MemberActivatedEvent event = getMemberActivetedEvent(message);
- String clusterDomain = event.getClusterId();
- CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
-
- if(cartridgeSubscriptionInfo != null) {
- Cluster cluster = TopologyManager.getTopology().
- getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
- TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
- cartridgeSubscriptionInfo.getCartridge(),
- cartridgeSubscriptionInfo.getAlias(), cluster);
- }
-
-
- } else if (MemberSuspendedEvent.class.getName().equals(messageType)) {
-
- log.info("Received message: " + messageType);
-
- MemberStartedEvent event = getMemberStartedEvent(message);
- String clusterDomain = event.getClusterId();
- CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
-
- if(cartridgeSubscriptionInfo != null) {
- Cluster cluster = TopologyManager.getTopology().
- getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
- TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
- cartridgeSubscriptionInfo.getCartridge(),
- cartridgeSubscriptionInfo.getAlias(), cluster);
- }
-
- } else if (MemberTerminatedEvent.class.getName().equals(messageType)) {
-
- log.info("Received message: " + messageType);
-
- MemberStartedEvent event = getMemberStartedEvent(message);
- String clusterDomain = event.getClusterId();
- CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
-
- if(cartridgeSubscriptionInfo != null) {
- Cluster cluster = TopologyManager.getTopology().
- getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
- TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
- cartridgeSubscriptionInfo.getCartridge(),
- cartridgeSubscriptionInfo.getAlias(), cluster);
- }
-
- } else {
- //cannot happen
- }
- }
-
- private MemberStartedEvent getMemberStartedEvent (Message message) {
-
- String json = null;
- try {
- json = ((TextMessage)message).getText();
-
- } catch (JMSException e) {
- log.error("Error in getting Json message type from received Message ", e);
- return null;
- }
- MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
-
- if(log.isDebugEnabled()) {
- log.debug("Received message details: [ " +
- "Cluster Id: " + event.getClusterId() +
- "\nMember Id: " + event.getMemberId() +
- "\nService name: " + event.getServiceName() +
- "\nStatus: " + event.getStatus().name() + " ]");
- }
-
- return event;
- }
-
- private MemberActivatedEvent getMemberActivetedEvent (Message message) {
-
- String json = null;
- try {
- json = ((TextMessage)message).getText();
-
- } catch (JMSException e) {
- log.error("Error in getting Json message type from received Message ", e);
- return null;
- }
- MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
-
- if(log.isDebugEnabled()) {
- log.debug("Received message details: [ " +
- "Cluster Id: " + event.getClusterId() +
- "\nMember Id: " + event.getMemberId() +
- "\nService name: " + event.getServiceName() +
- "\nIp: " + event.getMemberIp() + " ]");
- }
-
- return event;
- }
-
- private MemberSuspendedEvent getMemberSuspendedEvent (Message message) {
-
- String json = null;
- try {
- json = ((TextMessage)message).getText();
-
- } catch (JMSException e) {
- log.error("Error in getting Json message type from received Message ", e);
- return null;
- }
- MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
-
- if(log.isDebugEnabled()) {
- log.debug("Received message details: [ " +
- "Cluster Id: " + event.getClusterId() +
- "\nMember Id: " + event.getMemberId() +
- "\nService name: " + event.getServiceName() + " ]");
- }
-
- return event;
- }
-
- private MemberTerminatedEvent getMemberTerminatedEvebt (Message message) {
-
- String json = null;
- try {
- json = ((TextMessage)message).getText();
-
- } catch (JMSException e) {
- log.error("Error in getting Json message type from received Message ", e);
- return null;
- }
- MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
-
- if(log.isDebugEnabled()) {
- log.debug("Received message details: [ " +
- "Cluster Id: " + event.getClusterId() +
- "\nMember Id: " + event.getMemberId() +
- "\nService name: " + event.getServiceName() + " ]");
- }
-
- return event;
- }
-
- private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) {
-
- try {
- return PersistenceManager.getSubscriptionFromClusterId(clusterDomain);
-
- } catch (Exception e) {
- log.error("Error getting subscription information for cluster " + clusterDomain, e);
- return null;
- }
- }
-
- /**
- * Message Processing Thread class for InstanceStatusProcessor
- */
- /*private class InstanceStatusListenerThread extends Thread {
-
- Message message;
-
- public InstanceStatusListenerThread (Message message) {
- this.message = message;
- }
-
- public void run () {
-
- String messageType = null;
-
- try {
- messageType = message.getStringProperty(Constants.EVENT_CLASS_NAME);
-
- } catch (JMSException e) {
- log.error("Error in getting message type from received Message " + message.getClass().toString(), e);
- return;
- }
-
- if (MemberStartedEvent.class.getName().equals(messageType)) {
-
- log.info("Received message: " + messageType);
-
- MemberStartedEvent event = getMemberStartedEvent();
- String clusterDomain = event.getClusterId();
- CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
- if(cartridgeSubscriptionInfo != null) {
-
- }
-
- }
- else if (MemberActivatedEvent.class.getName().equals(messageType)) {
-
- log.info("Received message: " + messageType);
-
- MemberActivatedEvent event = getMemberActivetedEvent();
- String clusterDomain = event.getClusterId();
-
-
- } else if (MemberSuspendedEvent.class.getName().equals(messageType)) {
-
- log.info("Received message: " + messageType);
-
- MemberStartedEvent event = getMemberStartedEvent();
- String clusterDomain = event.getClusterId();
-
- } else if (MemberTerminatedEvent.class.getName().equals(messageType)) {
-
- log.info("Received message: " + messageType);
-
- MemberStartedEvent event = getMemberStartedEvent();
- String clusterDomain = event.getClusterId();
-
- } else {
- //cannot happen
- }
- }
-
- private MemberStartedEvent getMemberStartedEvent () {
-
- String json = null;
- try {
- json = ((TextMessage)message).getText();
-
- } catch (JMSException e) {
- log.error("Error in getting Json message type from received Message ", e);
- return null;
- }
- MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
-
- if(log.isDebugEnabled()) {
- log.debug("Received message details: [ " +
- "Cluster Id: " + event.getClusterId() +
- "\nMember Id: " + event.getMemberId() +
- "\nService name: " + event.getServiceName() +
- "\nStatus: " + event.getStatus().name() + " ]");
- }
-
- return event;
- }
-
- private MemberActivatedEvent getMemberActivetedEvent () {
-
- String json = null;
- try {
- json = ((TextMessage)message).getText();
-
- } catch (JMSException e) {
- log.error("Error in getting Json message type from received Message ", e);
- return null;
- }
- MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
-
- if(log.isDebugEnabled()) {
- log.debug("Received message details: [ " +
- "Cluster Id: " + event.getClusterId() +
- "\nMember Id: " + event.getMemberId() +
- "\nService name: " + event.getServiceName() +
- "\nIp: " + event.getMemberIp() + " ]");
- }
-
- return event;
- }
-
- private MemberSuspendedEvent getMemberSuspendedEvent () {
-
- String json = null;
- try {
- json = ((TextMessage)message).getText();
-
- } catch (JMSException e) {
- log.error("Error in getting Json message type from received Message ", e);
- return null;
- }
- MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
-
- if(log.isDebugEnabled()) {
- log.debug("Received message details: [ " +
- "Cluster Id: " + event.getClusterId() +
- "\nMember Id: " + event.getMemberId() +
- "\nService name: " + event.getServiceName() + " ]");
- }
-
- return event;
- }
-
- private MemberTerminatedEvent getMemberTerminatedEvebt () {
-
- String json = null;
- try {
- json = ((TextMessage)message).getText();
-
- } catch (JMSException e) {
- log.error("Error in getting Json message type from received Message ", e);
- return null;
- }
- MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
-
- if(log.isDebugEnabled()) {
- log.debug("Received message details: [ " +
- "Cluster Id: " + event.getClusterId() +
- "\nMember Id: " + event.getMemberId() +
- "\nService name: " + event.getServiceName() + " ]");
- }
-
- return event;
- }
-
- private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) {
-
- try {
- return PersistenceManager.getSubscriptionFromClusterId(clusterDomain);
-
- } catch (Exception e) {
- log.error("Error getting subscription information for cluster " + clusterDomain, e);
- return null;
- }
- }
- }*/
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fbb68d9/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
index 1760449..4b1b8d5 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
@@ -25,11 +25,13 @@ public class TopologyEventProcessorChain {
private TopologyEventProcessor completeTopologyEventProcessor = null;
private TopologyEventProcessor instanceStatusEventProcessor = null;
+ private TopologyEventProcessor clusterStatusEventProcessor = null;
private static TopologyEventProcessorChain topologyEventProcessorChain;
private TopologyEventProcessorChain () {
completeTopologyEventProcessor = new CompleteTopologyEventProcessor();
- instanceStatusEventProcessor = new InstanceStatusProcessor();
+ instanceStatusEventProcessor = new InstanceStatusEventProcessor();
+ clusterStatusEventProcessor = new ClusterStatusEventProcessor();
}
public static TopologyEventProcessorChain getInstance () {
@@ -51,7 +53,8 @@ public class TopologyEventProcessorChain {
//instanceStatusEventProcessor.setNext(nextTopologyeventProcessor);
//nextTopologyeventProcessor.setNext(null);
completeTopologyEventProcessor.setNext(instanceStatusEventProcessor);
- instanceStatusEventProcessor.setNext(null);
+ instanceStatusEventProcessor.setNext(clusterStatusEventProcessor);
+ clusterStatusEventProcessor.setNext(null);
}
public void startProcessing (Message message) {
[3/3] git commit: fixing a bug
Posted by is...@apache.org.
fixing a bug
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/e6714baa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/e6714baa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/e6714baa
Branch: refs/heads/master
Commit: e6714baa63f23750626bf4efa2f6a13edf91e091
Parents: 5bb695b
Author: Isuru <is...@wso2.com>
Authored: Wed Dec 11 17:55:11 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Wed Dec 11 17:55:11 2013 +0530
----------------------------------------------------------------------
.../event/processor/ClusterStatusEventProcessor.java | 10 +++-------
1 file changed, 3 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e6714baa/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/ClusterStatusEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/ClusterStatusEventProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/ClusterStatusEventProcessor.java
index 00b2b28..ed73500 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/ClusterStatusEventProcessor.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/ClusterStatusEventProcessor.java
@@ -24,9 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
import org.apache.stratos.adc.mgt.topology.model.TopologyClusterInformationModel;
import org.apache.stratos.adc.mgt.utils.PersistenceManager;
-import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Constants;
import org.apache.stratos.messaging.util.Util;
@@ -72,13 +70,11 @@ public class ClusterStatusEventProcessor extends TopologyEventProcessor {
getCartridgeSubscriptionInfo(event.getClusterId());
if (cartridgeSubscriptionInfo != null) {
- //add the information to Topology Cluster Info. model
- Cluster cluster = TopologyManager.getTopology().
- getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(event.getClusterId());
+ //remove the information from Topology Cluster Info. model
- TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+ TopologyClusterInformationModel.getInstance().removeCluster(cartridgeSubscriptionInfo.getTenantId(),
cartridgeSubscriptionInfo.getCartridge(),
- cartridgeSubscriptionInfo.getAlias(), cluster);
+ cartridgeSubscriptionInfo.getAlias());
}
}
}
[2/3] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr
Posted by is...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/5bb695b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/5bb695b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/5bb695b6
Branch: refs/heads/master
Commit: 5bb695b669164b50542f0d13afee8e3e1ce68391
Parents: 7fbb68d 81186e9
Author: Isuru <is...@wso2.com>
Authored: Wed Dec 11 17:52:52 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Wed Dec 11 17:52:52 2013 +0530
----------------------------------------------------------------------
tools/stratos-installer/conf/setup.conf | 131 ++---
.../config/sc/repository/conf/carbon.xml | 586 -------------------
.../repository/conf/cartridge-config.properties | 51 --
.../conf/datasources/master-datasources.xml | 129 ----
.../conf/datasources/stratos-datasources.xml | 51 --
.../resources/user-data/ssl-cert-snakeoil.key | 16 -
.../resources/user-data/ssl-cert-snakeoil.pem | 14 -
.../config/sm/repository/conf/carbon.xml | 586 +++++++++++++++++++
.../repository/conf/cartridge-config.properties | 51 ++
.../conf/datasources/master-datasources.xml | 129 ++++
.../conf/datasources/stratos-datasources.xml | 51 ++
.../config/sm/repository/conf/jndi.properties | 4 +
tools/stratos-installer/setup.sh | 504 ++++++++--------
13 files changed, 1107 insertions(+), 1196 deletions(-)
----------------------------------------------------------------------