You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:30:28 UTC
[26/90] [abbrv] [partial] AIRAVATA-1124
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
deleted file mode 100644
index 4737ef9..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.lang.reflect.Constructor;
-import java.net.URI;
-
-import org.apache.airavata.client.AiravataAPIFactory;
-import org.apache.airavata.client.api.AiravataAPI;
-import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
-import org.apache.airavata.client.tools.PeriodicExecutorThread;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ServiceUtils;
-import org.apache.airavata.wsmg.broker.handler.PublishedMessageHandler;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.commons.storage.WsmgInMemoryStorage;
-import org.apache.airavata.wsmg.commons.storage.WsmgPersistantStorage;
-import org.apache.airavata.wsmg.commons.util.Axis2Utils;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
-import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
-import org.apache.airavata.wsmg.messenger.Deliverable;
-import org.apache.airavata.wsmg.messenger.DeliveryProcessor;
-import org.apache.airavata.wsmg.messenger.SenderUtils;
-import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
-import org.apache.airavata.wsmg.messenger.protocol.impl.Axis2Protocol;
-import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
-import org.apache.airavata.wsmg.messenger.strategy.impl.FixedParallelSender;
-import org.apache.airavata.wsmg.messenger.strategy.impl.ParallelSender;
-import org.apache.airavata.wsmg.messenger.strategy.impl.SerialSender;
-import org.apache.airavata.wsmg.util.RunTimeStatistics;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.engine.ServiceLifeCycle;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BrokerServiceLifeCycle implements ServiceLifeCycle {
-
- private static final Logger log = LoggerFactory.getLogger(BrokerServiceLifeCycle.class);
-// public static final String REPOSITORY_PROPERTIES = "airavata-server.properties";
- public static final int GFAC_URL_UPDATE_INTERVAL = 1000 * 60 * 60 * 3;
-
- public static final int JCR_AVAIALABILITY_WAIT_INTERVAL = 1000 * 10;
- public static final String JCR_CLASS = "jcr.class";
- public static final String JCR_USER = "jcr.user";
- public static final String JCR_PASS = "jcr.pass";
- public static final String ORG_APACHE_JACKRABBIT_REPOSITORY_URI = "org.apache.jackrabbit.repository.uri";
- private static final String MESSAGE_BROKER_SERVICE_NAME = "EventingService";
- private static final String SERVICE_URL = "message_broker_service_url";
- private static final String JCR_REGISTRY = "registry";
- private Thread thread;
-
- private static final long DEFAULT_SOCKET_TIME_OUT = 20000l;
-
- private DeliveryProcessor proc;
- private ConsumerUrlManager urlManager;
-
- private static Boolean initialized = false;
-
- public void shutDown(ConfigurationContext configurationcontext, AxisService service) {
- log.info("broker shutting down");
- if (proc != null) {
- proc.stop();
- proc = null;
- }
- if (urlManager != null) {
- urlManager.stop();
- urlManager = null;
- }
-
- synchronized (initialized) {
- if (initialized) {
- initialized = false;
- AiravataAPI registry = (AiravataAPI) configurationcontext.getProperty(JCR_REGISTRY);
- if(registry != null && thread != null){
- try {
- registry.getAiravataManager().unsetEventingURI();
- } catch (AiravataAPIInvocationException e) {
- e.printStackTrace();
- }
- thread.interrupt();
- try {
- thread.join();
- } catch (InterruptedException e) {
- log.info("Message box url update thread is interrupted");
- }
- }
- }
- }
- log.info("broker shut down");
- }
-
- public void startUp(ConfigurationContext configContext, AxisService axisService) {
- AiravataUtils.setExecutionAsServer();
- Boolean inited = (Boolean) configContext.getProperty(WsmgCommonConstants.BROKER_INITED);
-
- if (inited == null || inited == false) {
- log.info("Starting Message Broker...");
- Axis2Utils.overrideAddressingPhaseHander(configContext, new PublishedMessageHandler());
- WsmgConfigurationContext brokerConext = initConfigurations(configContext, axisService);
- initQueue(brokerConext);
- initDeliveryMethod(brokerConext.getConfigurationManager());
-
- inited = true;
- configContext.setProperty(WsmgCommonConstants.BROKER_INITED, inited);
- } else {
- log.debug("init was already done by another webservice");
- }
-
- final ConfigurationContext context = configContext;
- synchronized (initialized) {
- if (!initialized) {
- initialized = true;
- new Thread() {
- @Override
- public void run() {
-// Properties properties = new Properties();
- try {
-// URL url = this.getClass().getClassLoader()
-// .getResource(REPOSITORY_PROPERTIES);
-// properties.load(url.openStream());
-// Map<String, String> map = new HashMap<String, String>(
-// (Map) properties);
- try {
- Thread.sleep(JCR_AVAIALABILITY_WAIT_INTERVAL);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
-
- String userName = ServerSettings.getSystemUser();
- String gateway = ServerSettings.getSystemUserGateway();
-
- AiravataAPI airavataAPI = AiravataAPIFactory.getAPI(gateway, userName);
- String localAddress = ServiceUtils
- .generateServiceURLFromConfigurationContext(
- context,
- MESSAGE_BROKER_SERVICE_NAME);
- log.debug("MESSAGE BOX SERVICE_ADDRESS:"
- + localAddress);
- context.setProperty(SERVICE_URL, new URI(
- localAddress));
- context.setProperty(JCR_REGISTRY, airavataAPI);
- /*
- * Heart beat message to registry
- */
- thread = new MsgBrokerURLRegisterThread(airavataAPI, context);
- thread.start();
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
- }
- }.start();
- }
- }
- }
-
- private WsmgConfigurationContext initConfigurations(ConfigurationContext configContext, AxisService axisService) {
-
- WsmgConfigurationContext wsmgConfig = new WsmgConfigurationContext();
- configContext.setProperty(WsmgCommonConstants.BROKER_WSMGCONFIG, wsmgConfig);
-
- ConfigurationManager configMan = new ConfigurationManager();
-
- wsmgConfig.setConfigurationManager(configMan);
-
- String type = configMan.getConfig(WsmgCommonConstants.CONFIG_STORAGE_TYPE,
- WsmgCommonConstants.STORAGE_TYPE_PERSISTANT);
-
- /*
- * Determine Storage
- */
- if (WsmgCommonConstants.STORAGE_TYPE_IN_MEMORY.equalsIgnoreCase(type)) {
- WsmgInMemoryStorage inmem = new WsmgInMemoryStorage();
-
- wsmgConfig.setStorage(inmem);
- wsmgConfig.setQueue(inmem);
- wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, inmem));
-
- } else {
- String jdbcUrl = configMan.getConfig(WsmgCommonConstants.CONFIG_JDBC_URL);
- String jdbcDriver = configMan.getConfig(WsmgCommonConstants.CONFIG_JDBC_DRIVER);
- WsmgPersistantStorage persis = new WsmgPersistantStorage(jdbcUrl, jdbcDriver);
-
- wsmgConfig.setStorage(persis);
- wsmgConfig.setQueue(persis);
- wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, persis));
- }
-
- NotificationProcessor notificatonProcessor = new NotificationProcessor(wsmgConfig);
- wsmgConfig.setNotificationProcessor(notificatonProcessor);
-
- return wsmgConfig;
- }
-
- private void initQueue(WsmgConfigurationContext context) {
-
- log.debug("setting up queue");
-
- WSMGParameter.OUT_GOING_QUEUE = context.getQueue();
-
- if (WSMGParameter.cleanQueueonStartUp) {
- log.debug("cleaning up persistant queue");
- WSMGParameter.OUT_GOING_QUEUE.cleanup();
- log.debug("cleaned up persistant queue");
- }
-
- RunTimeStatistics.setStartUpTime();
-
- }
-
- private void initDeliveryMethod(ConfigurationManager configMan) {
-
- String shouldStart = configMan.getConfig(WsmgCommonConstants.CONFIG_START_DELIVERY_THREADS);
-
- if (!Boolean.parseBoolean(shouldStart)) {
-
- if (configMan.getConfig(WsmgCommonConstants.CONFIG_STORAGE_TYPE,
- WsmgCommonConstants.STORAGE_TYPE_PERSISTANT).equalsIgnoreCase(
- WsmgCommonConstants.STORAGE_TYPE_IN_MEMORY)) {
-
- /*
- * user has asked to use in memory queue but without starting the delivery thread. this will accumulate
- * message in memory.
- */
- log.error("conflicting configuration detected, using in memory queue without starting delivery thread will result memory growth.");
-
- }
- return;
- }
-
- /*
- * Create Protocol
- */
- DeliveryProtocol protocol;
- String protocolClass = configMan
- .getConfig(WsmgCommonConstants.DELIVERY_PROTOCOL, Axis2Protocol.class.getName());
- try {
- Class cl = Class.forName(protocolClass);
- Constructor<DeliveryProtocol> co = cl.getConstructor(null);
- protocol = co.newInstance((Object[]) null);
-
- } catch (Exception e) {
- log.error("Cannot initial protocol sender", e);
- return;
- }
- protocol.setTimeout(configMan.getConfig(WsmgCommonConstants.CONFIG_SOCKET_TIME_OUT, DEFAULT_SOCKET_TIME_OUT));
-
- /*
- * Create delivery method
- */
- SendingStrategy method = null;
- String initedmethod = null;
- String deliveryMethod = configMan.getConfig(WsmgCommonConstants.CONFIG_DELIVERY_METHOD,
- WsmgCommonConstants.DELIVERY_METHOD_SERIAL);
- if (WsmgCommonConstants.DELIVERY_METHOD_PARALLEL.equalsIgnoreCase(deliveryMethod)) {
- method = new ParallelSender();
- initedmethod = WsmgCommonConstants.DELIVERY_METHOD_PARALLEL;
-
- } else if (WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW.equalsIgnoreCase(deliveryMethod)) {
- int poolsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_THREAD_POOL_SIZE,
- WsmgCommonConstants.DEFAULT_SENDING_THREAD_POOL_SIZE);
- int batchsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_BATCH_SIZE,
- WsmgCommonConstants.DEFAULT_SENDING_BATCH_SIZE);
- method = new FixedParallelSender(poolsize, batchsize);
- initedmethod = WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW;
-
- } else {
- method = new SerialSender();
- initedmethod = WsmgCommonConstants.DELIVERY_METHOD_SERIAL;
- }
-
- /*
- * Create Deliverable
- */
- urlManager = new ConsumerUrlManager(configMan);
- Deliverable senderUtils = new SenderUtils(urlManager);
- senderUtils.setProtocol(protocol);
-
- proc = new DeliveryProcessor(senderUtils, method);
- proc.start();
- log.debug(initedmethod + " sending method inited");
- }
-
- class MsgBrokerURLRegisterThread extends PeriodicExecutorThread {
-
- private ConfigurationContext context = null;
-
- public MsgBrokerURLRegisterThread(AiravataAPI registry, ConfigurationContext context) {
- super(registry);
- this.context = context;
- }
-
-
- protected void updateRegistry(AiravataAPI registry) {
- try {
- URI localAddress = (URI) this.context.getProperty(SERVICE_URL);
- registry.getAiravataManager().setEventingURI(localAddress);
- } catch (AiravataAPIInvocationException e) {
- e.printStackTrace();
- }
- log.debug("Updated Eventing service URL in to Repository");
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java
deleted file mode 100644
index d087a87..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.io.Serializable;
-
-public class ConsumerInfo implements Serializable {
- static final long serialVersionUID = 2274650724788817903L;
-
- // TODO : change this to OM, EndpointReference.
- String consumerEprStr;
-
- String type; // Either "wsnt" or "wse"
-
- boolean useNotify;
-
- boolean paused = false;
-
- boolean wsrmEnabled;
-
- /**
- * @param consumerEprStr
- * @param type
- * @param useNotify
- * @param paused
- */
- public ConsumerInfo(String consumerEprStr, String type, boolean useNotify, boolean paused) {
- super();
- // TODO Auto-generated constructor stub
- this.consumerEprStr = consumerEprStr;
- this.type = type;
- this.useNotify = useNotify;
- this.paused = paused;
- }
-
- /**
- * @return Returns the consumerEprStr.
- */
- public String getConsumerEprStr() {
- return consumerEprStr;
- }
-
- /**
- * @param consumerEprStr
- * The consumerEprStr to set.
- */
- public void setConsumerEprStr(String consumerEprStr) {
- this.consumerEprStr = consumerEprStr;
- }
-
- /**
- * @return Returns the paused.
- */
- public boolean isPaused() {
- return paused;
- }
-
- /**
- * @param paused
- * The paused to set.
- */
- public void setPaused(boolean paused) {
- this.paused = paused;
- }
-
- /**
- * @return Returns the type.
- */
- public String getType() {
- return type;
- }
-
- /**
- * @param type
- * The type to set.
- */
- public void setType(String type) {
- this.type = type;
- }
-
- /**
- * @return Returns the useNotify.
- */
- public boolean isUseNotify() {
- return useNotify;
- }
-
- /**
- * @param useNotify
- * The useNotify to set.
- */
- public void setUseNotify(boolean useNotify) {
- this.useNotify = useNotify;
- }
-
- public boolean isWsrmEnabled() {
- return wsrmEnabled;
- }
-
- public void setWsrmEnabled(boolean wsrmEnabled) {
- this.wsrmEnabled = wsrmEnabled;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java
deleted file mode 100644
index 38d8a3a..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ConsumerList {
- Map<String, ConsumerInfo> subId2ConsumerInfo = new HashMap<String, ConsumerInfo>();
-
- ArrayList<ConsumerInfo> consumerInfoList = new ArrayList<ConsumerInfo>();
-
- boolean changed = true;
- int size = 0; // use size instead of consumerInfoList.size() to avoid
-
- // unnecessary copy of subId2ConsumerInfo in
- // refreshConsumerInfoList() if just need the size
-
- public void addConsumer(String subId, ConsumerInfo consumerInfo) {
- subId2ConsumerInfo.put(subId, consumerInfo);
- changed = true;
- size++;
- }
-
- public int removeConsumer(String subId) {
- ConsumerInfo result = subId2ConsumerInfo.remove(subId);
- if (result == null) {
- return 0;
- }
- changed = true;
- size--;
- return 1;
-
- }
-
- public ArrayList<ConsumerInfo> getConsumerList() {
- if (changed) {
- refreshConsumerInfoList();
- }
- return consumerInfoList;
- }
-
- public int size() {
- return size;
- }
-
- private void refreshConsumerInfoList() {
- consumerInfoList.clear();
- consumerInfoList.addAll(subId2ConsumerInfo.values());
- changed = false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java
deleted file mode 100644
index b5ca1bb..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsumerListManager {
-
- private static final Logger logger = LoggerFactory.getLogger(ConsumerListManager.class);
-
- protected Map<String, ConsumerList> token2ConsumerListMap = new HashMap<String, ConsumerList>();
-
- protected Map<String, String> subId2Token = new HashMap<String, String>();
-
- // token can be a topic or an XPath String
- public void addToConsumerList(String token, SubscriptionState subscribeRequest, String subscriptionId) {
- ConsumerList consumerList = token2ConsumerListMap.get(token);
- if (consumerList == null) { // new topic
- consumerList = new ConsumerList();
- token2ConsumerListMap.put(token, consumerList);
- }
- consumerList.addConsumer(subscriptionId, subscribeRequest.getConsumerInfo());
- subId2Token.put(subscriptionId, token);
-
- }
-
- public String getTokenBySubscriptionId(String subscriptionId) {
- String token = subId2Token.get(subscriptionId);
- return token;
- }
-
- public int removeFromConsumerList(String subscriptionId, String token) {
- String tokenString = null;
- if (token == null) {
- tokenString = subId2Token.get(subscriptionId);
- } else {
- tokenString = token;
- }
-
- ConsumerList consumerList = token2ConsumerListMap.get(tokenString);
- if (consumerList == null) {
- logger.error("*****ERROR:Cannot find the token to delete: " + tokenString);
- return 0;
- }
- int result = consumerList.removeConsumer(subscriptionId);
- subId2Token.remove(subscriptionId);
- return result;
- }
-
- public ConsumerList getConsumerListByToken(String token) {
- ConsumerList consumerList = token2ConsumerListMap.get(token);
- return consumerList;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
deleted file mode 100644
index 10ae03b..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.util.*;
-
-import javax.xml.namespace.QName;
-import javax.xml.stream.XMLStreamException;
-
-import org.apache.airavata.wsmg.broker.amqp.AMQPNotificationProcessor;
-import org.apache.airavata.wsmg.broker.context.ContextParameters;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
-import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.apache.airavata.wsmg.util.BrokerUtil;
-import org.apache.airavata.wsmg.util.RunTimeStatistics;
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMAttribute;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMException;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.axiom.om.xpath.AXIOMXPath;
-import org.apache.axis2.AxisFault;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NotificationProcessor {
-
- private static final Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);
-
- private WsmgConfigurationContext wsmgConfigContext;
-
- protected long messageCounter = 0;
- protected long messageId = 0;
-
- OMFactory factory = OMAbstractFactory.getOMFactory();
-
- private OutGoingQueue outgoingQueue;
-
- private AMQPNotificationProcessor amqpNotificationProcessor = new AMQPNotificationProcessor();
-
- public NotificationProcessor(WsmgConfigurationContext config) {
- init(config);
- amqpNotificationProcessor.init();
- }
-
- private void init(WsmgConfigurationContext config) {
- this.wsmgConfigContext = config;
- outgoingQueue = config.getOutgoingQueue();
- }
-
- private synchronized long getNextTrackId() {
- messageCounter++;
- return messageCounter;
- }
-
- private synchronized long getNextMsgId() {
- messageId++;
- return messageId;
- }
-
- public void processMsg(ProcessingContext ctx, OMNamespace protocolNs) throws OMException, AxisFault {
-
- String trackId = "trackId_A_" + getNextTrackId();
- if (WSMGParameter.showTrackId) {
- logger.debug(trackId + ": received.");
- }
-
- AdditionalMessageContent additionalMessageContent = new AdditionalMessageContent(ctx.getMessageContext()
- .getSoapAction(), ctx.getMessageContext().getMessageID());
- additionalMessageContent.setTrackId(trackId);
-
- handleExtendedNotifications(ctx, protocolNs);
-
- if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {
-
- onWSNTMsg(ctx, additionalMessageContent);
- setResponseMsg(ctx, trackId, protocolNs);
- } else { // WSE Notifications No specific namespace
-
- onWSEMsg(ctx, trackId, additionalMessageContent);
- setResponseMsg(ctx, trackId, protocolNs);
- }
- }
-
- /**
- * @param ctx
- * @param topicElString
- * @param trackId
- * @param additionalMessageContent
- * @throws OMException
- * @throws XMLStreamException
- */
- private void onWSEMsg(ProcessingContext ctx, String trackId, AdditionalMessageContent additionalMessageContent)
- throws OMException, AxisFault {
-
- String topicElString = null;
- String topicLocalString = null;
-
- QName qName = new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Topic");
-
- OMElement topicEl = ctx.getMessageContext().getEnvelope().getHeader().getFirstChildWithName(qName);
-
- if (topicEl == null) {
-
- topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL);
-
- if (topicLocalString != null) {
-
- topicElString = "<wsnt:Topic "
- + "Dialect=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-Topics/TopicExpression/simple\" "
- + "xmlns:ns2=\"http://tutorial.globus.org/auction\" "
- + "xmlns:wsnt=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification\">" + "ns2:"
- + topicLocalString + "</wsnt:Topic>";
- // / }
- additionalMessageContent.setTopicElement(topicElString);
- } else {
-
- topicLocalString = "wseTopic";
- topicElString = "<wsnt:Topic "
- + "Dialect=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-Topics/TopicExpression/simple\" "
- + "xmlns:ns2=\"http://tutorial.globus.org/auction\" "
- + "xmlns:wsnt=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification\">"
- + "ns2:wseTopic</wsnt:Topic>";
- // / }
- additionalMessageContent.setTopicElement(topicElString);
- }
- } else {
-
- topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText());
- try {
- topicElString = topicEl.toStringWithConsume();
- } catch (XMLStreamException e) {
- logger.error("exceptions occured at WSE eventing notification creating", e);
- }
- additionalMessageContent.setTopicElement(topicElString);
- }
-
- OMElement messageEl = ctx.getSoapBody().getFirstElement();
- if (messageEl == null) {
- throw new AxisFault("no message found");
- }
-
- String message = null;
- try {
- message = messageEl.toStringWithConsume();
- } catch (XMLStreamException e) {
- logger.error("unable to serialize the message", e);
- throw new AxisFault("unable to serialize the message", e);
- }
-
- matchAndSave(message, topicLocalString, additionalMessageContent);
- }
-
- /**
- * @param ctx
- * @param trackId
- * @throws OMException
- */
- private void setResponseMsg(ProcessingContext ctx, String trackId, OMNamespace responseNS) throws OMException {
- // set response message
-
- ctx.addResponseMsgNameSpaces(responseNS);
-
- OMAttribute trackIdAttribute = factory.createOMAttribute("trackId", null, trackId);
- OMElement messageElement = ctx.getMessageContext().getEnvelope().getBody().getFirstElement();
- OMElement responseMsgElement = factory.createOMElement(messageElement.getLocalName() + "Response", responseNS);
- responseMsgElement.addAttribute(trackIdAttribute);
- ctx.setRespMessage(responseMsgElement);
-
- }
-
- /**
- * @param ctx
- * @param topicLocalString
- * @param topicElString
- * @param producerReferenceElString
- * @param additionalMessageContent
- * @throws OMException
- * @throws XMLStreamException
- * @throws AxisFault
- */
- private void onWSNTMsg(ProcessingContext ctx, AdditionalMessageContent additionalMessageContent)
- throws OMException, AxisFault {
-
- String producerReferenceElString = null;
- String topicElString = null;
-
- boolean noElements = true;
-
- // TODO: set nicely with a processing context
- OMElement notifyEl = ctx.getSoapBody().getFirstElement();
- for (Iterator<OMElement> iter = notifyEl.getChildrenWithLocalName("NotificationMessage"); iter.hasNext();) {
- noElements = false;
- OMElement wrappedMessageEl = iter.next();
-
- String topicLocalString = null;
-
- OMElement topicEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
- .getNamespaceURI(), "Topic"));
- if (topicEl != null) {
-
- topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText()); // get what ever inside this
- // element
-
- try {
- topicElString = topicEl.toStringWithConsume();
- } catch (XMLStreamException e) {
- logger.error("exception occured while creating NotificationConsumer", e);
- }
- additionalMessageContent.setTopicElement(topicElString);
- }
- OMElement producerReferenceEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
- .getNamespaceURI(), "ProducerReference"));
-
- if (producerReferenceEl != null) {
- try {
- producerReferenceElString = producerReferenceEl.toStringWithConsume();
- } catch (XMLStreamException e) {
- logger.error("exception occured while creating notification consumer", e);
-
- }
- additionalMessageContent.setProducerReference(producerReferenceElString);
- }
-
- OMElement notificationMessageEl = wrappedMessageEl.getFirstChildWithName(
- new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement();
-
- String message = null;
- try {
- message = notificationMessageEl.toStringWithConsume();
- } catch (XMLStreamException e) {
- logger.error("exception occured while creating notification consumer", e);
- throw new AxisFault("unable to serialize the message", e);
- }
-
- matchAndSave(message, topicLocalString, additionalMessageContent);
-
- }
- if (noElements) {
- throw new AxisFault("at least one element is required");
- }
- }
-
- private void matchAndSave(String notificationMessage, String topicLocalString,
- AdditionalMessageContent additionalMessageContent) {
-
- List<ConsumerInfo> matchedConsumers = new LinkedList<ConsumerInfo>();
-
- // not use incoming queue
- // This is a fix for the bug seen in yfilter.
- try {
-
- for (AbstractMessageMatcher matcher : wsmgConfigContext.getMessageMatchers()) {
- matcher.populateMatches(null, additionalMessageContent, notificationMessage, topicLocalString,
- matchedConsumers);
- }
-
- save(matchedConsumers, notificationMessage, additionalMessageContent);
-
- } catch (RuntimeException e) {
- logger.error("Caught RuntimeException", e);
- }
-
- }
-
- public void save(List<ConsumerInfo> consumerInfoList, String message,
- AdditionalMessageContent additionalMessageContent) {
-
- if (consumerInfoList.size() == 0) // No subscription
- return;
-
- RunTimeStatistics.addNewNotificationMessageSize(message.length());
- OutGoingMessage outGoingMessage = new OutGoingMessage();
- outGoingMessage.setTextMessage(message);
- outGoingMessage.setConsumerInfoList(consumerInfoList);
- outGoingMessage.setAdditionalMessageContent(additionalMessageContent);
-
- outgoingQueue.storeNotification(outGoingMessage, getNextMsgId());
-
- if (WSMGParameter.showTrackId)
- logger.info(additionalMessageContent.getTrackId() + ": putIn Outgoing queue.");
- }
-
- private void handleExtendedNotifications(ProcessingContext ctx, OMNamespace protocolNs) throws OMException {
- // AMQP
- amqpNotificationProcessor.notify(ctx, protocolNs);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
deleted file mode 100644
index 39970ec..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.amqp;
-
-import org.apache.airavata.common.utils.ApplicationSettings;
-import org.apache.airavata.wsmg.client.amqp.*;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMException;
-import org.apache.axiom.om.OMNamespace;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Element;
-
-import javax.xml.namespace.QName;
-
-/**
- * AMQPNotificationProcessor handles AMQP-specific notification processing.
- */
-public class AMQPNotificationProcessor {
-
- private static final Logger logger = LoggerFactory.getLogger(AMQPNotificationProcessor.class);
-
- private boolean amqpEnabled = false;
- private AMQPSender amqpSender = null;
- private AMQPTopicSender amqpTopicSender = null;
- private AMQPBroadcastSender amqpBroadcastSender = null;
-
- public void init() {
- String amqpEnabledAppSetting = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_ENABLE, "");
- if (!amqpEnabledAppSetting.isEmpty() && (1 == Integer.parseInt(amqpEnabledAppSetting))) {
- try {
- String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost");
- String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672");
- String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest");
- String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest");
-
- Properties properties = new Properties();
- properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host);
- properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port);
- properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username);
- properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password);
-
- String className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_SENDER, "");
- Class clazz = Class.forName(className);
- amqpSender = (AMQPSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
-
- className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_TOPIC_SENDER, "");
- clazz = Class.forName(className);
- amqpTopicSender = (AMQPTopicSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
-
- className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_BROADCAST_SENDER, "");
- clazz = Class.forName(className);
- amqpBroadcastSender = (AMQPBroadcastSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
-
- Element routingKeys = AMQPUtil.loadRoutingKeys();
- if (routingKeys != null) {
- ((AMQPRoutingAwareClient)amqpSender).init(routingKeys);
- ((AMQPRoutingAwareClient)amqpTopicSender).init(routingKeys);
- ((AMQPRoutingAwareClient)amqpBroadcastSender).init(routingKeys);
- }
-
- amqpEnabled = true;
- } catch (Exception ex) {
- logger.error(ex.getMessage());
- }
- }
- }
-
- public void notify(ProcessingContext ctx, OMNamespace protocolNs) throws OMException {
- if (amqpEnabled) {
- // Extract messages
- List<OMElement> messages = new ArrayList<OMElement>();
- if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {
- // WSNT
- OMElement messageElements = ctx.getSoapBody().getFirstElement();
- for (Iterator<OMElement> ite = messageElements.getChildrenWithLocalName("NotificationMessage"); ite.hasNext(); ) {
- try {
- OMElement messageElement = ite.next();
- OMElement message = messageElement.getFirstChildWithName(
- new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement();
- messages.add(message);
- } catch (NullPointerException e) {
- throw new OMException(e);
- }
- }
- } else {
- // WSE
- OMElement message = ctx.getSoapBody().getFirstElement();
- if (message != null) {
- messages.add(message);
- }
- }
-
- // Dispatch messages
- try {
- for (OMElement message : messages) {
- amqpBroadcastSender.Send(message);
- amqpTopicSender.Send(message);
- amqpSender.Send(message);
- }
- } catch (AMQPException e) {
- logger.warn("Failed to send AMQP notification.[Reason=" + e.getMessage() + "]");
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java
deleted file mode 100644
index 8c097d4..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMElement;
-
-public class ContextParameterInfo<T> {
-
- private Class<T> parameterType;
- private String parameterName;
-
- public ContextParameterInfo(Class<T> type, String name) {
- parameterType = type;
- parameterName = name;
-
- }
-
- public Class<T> getParameterType() {
- return parameterType;
- }
-
- public String getParameterName() {
- return parameterName;
- }
-
- public T cast(Object obj) {
-
- return parameterType.cast(obj);
- }
-
- public static void main(String[] a) {
-
- new ContextParameterInfo<OMElement>(OMElement.class, "test");
- OMAbstractFactory.getOMFactory().createOMElement("testtest", null);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java
deleted file mode 100644
index 61624d8..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.addressing.EndpointReference;
-
-public class ContextParameters {
-
- private static <V> ContextParameterInfo<V> createParam(Class<V> c, String name) {
- ContextParameterInfo<V> info = new ContextParameterInfo<V>(c, name);
-
- return info;
- }
-
- public static ContextParameterInfo<String> RESOURCE_ID = createParam(String.class, "resourceID");
-
- public static final ContextParameterInfo<String> SUB_ID = createParam(String.class, "subID");
-
- public static final ContextParameterInfo<String> TOPIC_FROM_URL = createParam(String.class, "topicFromUrl");
-
- public static final ContextParameterInfo<String> SOAP_ACTION = createParam(String.class, "soapAction");
-
- public static final ContextParameterInfo<SubscriptionState> SUBSCRIPTION = createParam(SubscriptionState.class,
- "subscription");
-
- public static final ContextParameterInfo<String> SUBSCRIBER_EXPIRES = createParam(String.class, "subscriberExpires");
-
- public ContextParameterInfo<String> USE_NOTIFY_TEXT = createParam(String.class, "useNotifyText");
-
- public static final ContextParameterInfo<OMElement> USE_NOTIFY_ELEMENT = createParam(OMElement.class, "useNotifyEl");
-
- public static final ContextParameterInfo<OMElement> NOTIFY_TO_ELEMENT = createParam(OMElement.class, "NotifyTo");
-
- public static final ContextParameterInfo<EndpointReference> NOTIFY_TO_EPR = createParam(EndpointReference.class,
- "NotifyToEPR");
-
- public static final ContextParameterInfo<OMElement> SUB_POLICY = createParam(OMElement.class, "subPolicy");
-
- public static final ContextParameterInfo<OMElement> FILTER_ELEMENT = createParam(OMElement.class, "filterElement");
-
- public static final ContextParameterInfo<OMElement> TOPIC_EXPRESSION_ELEMENT = createParam(OMElement.class,
- "topicExpressionEl");
-
- public static final ContextParameterInfo<OMElement> XPATH_ELEMENT = createParam(OMElement.class, "xpathEl");
-
- public static final ContextParameterInfo<OMElement> SUBSCRIBE_ELEMENT = createParam(OMElement.class, "subscribeElement");
-
- public static final ContextParameterInfo<EndpointReference> SUBSCRIBE_ELEMENT_EPR = createParam(EndpointReference.class,
- "subscribeElement");
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java
deleted file mode 100644
index 06a50ca..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.axiom.soap.SOAPBody;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axis2.context.MessageContext;
-
-public class ProcessingContext {
-
- private Map<ContextParameterInfo<? extends Object>, Object> contextInfo = new HashMap<ContextParameterInfo<? extends Object>, Object>();
-
- private List<OMNamespace> responseMsgNameSpaces;
-
- private MessageContext messageContext = null;
-
- private SOAPEnvelope envelope; // Used for WSe notification messages.topics
- // are
- // in header.
-
- private OMElement respMessage;
-
- private SubscriptionState subscription = null;
-
- public SOAPEnvelope getEnvelope() {
- return envelope;
- }
-
- public void setEnvelope(SOAPEnvelope envelope) {
- this.envelope = envelope;
- }
-
- public SOAPBody getSoapBody() {
-
- return envelope.getBody();
- }
-
- public OMElement getRespMessage() {
- return respMessage;
- }
-
- public void setRespMessage(OMElement respMessage) {
- this.respMessage = respMessage;
- }
-
- public SubscriptionState getSubscription() {
- return subscription;
- }
-
- public void setSubscription(SubscriptionState subscription) {
- this.subscription = subscription;
- }
-
- public void setMessageConext(MessageContext msgContext) {
- this.messageContext = msgContext;
- }
-
- public MessageContext getMessageContext() {
- return messageContext;
- }
-
- public void addResponseMsgNameSpaces(OMNamespace ns) {
-
- if (responseMsgNameSpaces == null) {
- responseMsgNameSpaces = new ArrayList<OMNamespace>();
- }
-
- if (!responseMsgNameSpaces.contains(ns)) {
- responseMsgNameSpaces.add(ns);
- }
- }
-
- public List<OMNamespace> getResponseMsgNamespaces() {
- return responseMsgNameSpaces;
- }
-
- public void setContextParameter(ContextParameterInfo<?> name, Object value) {
- contextInfo.put(name, value);
- }
-
- public <T> T getContextParameter(ContextParameterInfo<T> name) {
-
- Object o = contextInfo.get(name);
- return name.cast(o);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java
deleted file mode 100644
index 9d4d4ea..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import org.apache.axiom.om.OMElement;
-
-public abstract class ProcessingContextBuilder {
-
- public abstract ProcessingContext build(OMElement elem);
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java
deleted file mode 100644
index ca23506..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.handler;
-
-import java.util.List;
-
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.AddressingFaultsHelper;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.dispatchers.AddressingBasedDispatcher;
-import org.apache.axis2.engine.Phase;
-import org.apache.axis2.util.JavaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PublishedMessageHandler extends AddressingBasedDispatcher {
-
- private static final Logger logger = LoggerFactory.getLogger(PublishedMessageHandler.class);
-
- private static final String ADDRESSING_VALIDATE_ACTION = "addressing.validateAction";
-
- private AxisOperation publishOperation = null;
-
- private Phase addressingPhase = null;
-
- public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
-
- InvocationResponse response = InvocationResponse.CONTINUE;
-
- if (msgContext.getAxisService() == null || msgContext.getAxisOperation() == null) {
- boolean validateAction = JavaUtils.isTrue(msgContext.getProperty(ADDRESSING_VALIDATE_ACTION), true);
- msgContext.setProperty(ADDRESSING_VALIDATE_ACTION, Boolean.valueOf(false));
- response = super.invoke(msgContext);
- if (isForBrokerEventingService(msgContext))
- validateBrokerWSEventingOperation(msgContext);
- if (validateAction)
- checkAction(msgContext);
- msgContext.setProperty(ADDRESSING_VALIDATE_ACTION, Boolean.valueOf(validateAction));
-
- }
-
- return response;
- }
-
- private void validateBrokerWSEventingOperation(MessageContext msgContext) {
- if (msgContext.getAxisOperation() == null) {
- AxisService service = msgContext.getAxisService();
- AxisOperation pubOperation = getPublishOperation(service);
- msgContext.setAxisOperation(pubOperation);
- }
- }
-
- private boolean isForBrokerEventingService(MessageContext msgContext) {
- return msgContext.getAxisService() != null && msgContext.getAxisService().getName().equals("EventingService");
- }
-
- private AxisOperation getPublishOperation(AxisService publisherService) {
- if (publishOperation == null)
- publishOperation = publisherService.getOperationBySOAPAction(WsmgCommonConstants.WSMG_PUBLISH_SOAP_ACTION);
- return publishOperation;
- }
-
- private Phase getAddressingPhase(MessageContext context) {
-
- if (addressingPhase == null) {
-
- List<Phase> inFlowPhases = context.getConfigurationContext().getAxisConfiguration().getPhasesInfo()
- .getINPhases();
-
- for (Phase p : inFlowPhases) {
- if (p.getName().equalsIgnoreCase("Addressing")) {
- addressingPhase = p;
- }
- }
-
- }
-
- return addressingPhase;
-
- }
-
- private void checkAction(MessageContext msgContext) throws AxisFault {
-
- Phase addPhase = getAddressingPhase(msgContext);
-
- if (addPhase == null) {
- logger.error("unable to locate addressing phase object");
- }
- if (msgContext != null) {
- if (msgContext.getCurrentPhaseIndex() + 1 == addPhase.getHandlerCount()) {
- if (msgContext.getAxisService() == null || msgContext.getAxisOperation() == null)
- AddressingFaultsHelper.triggerActionNotSupportedFault(msgContext, msgContext.getWSAAction());
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java
deleted file mode 100644
index bcd4ec1..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.subscription;
-
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.airavata.wsmg.commons.CommonRoutines;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.axis2.AxisFault;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class CleanUpThread implements Runnable {
-
- private static final Logger logger = LoggerFactory.getLogger(CleanUpThread.class);
-
- private SubscriptionManager subMan;
-
- public CleanUpThread(SubscriptionManager manager) {
- this.subMan = manager;
- }
-
- public void run() {
- logger.debug("CleanUpThread started");
- String key = null;
- SubscriptionState subscription = null;
- Set<String> keySet = null;
- // long expirationTime=300000*12*24; //5 min*12*24=1 day
- final long expirationTime = WSMGParameter.expirationTime;
- final long skipCheckInterval = 1000 * 60 * 10; // 10 minutes
- final long checkupInterval = 1000 * 60 * 5; // 5 minutes
- int MAX_TRY = 3;
- logger.info("Starting Subscription Cleaning up Thread.");
- while (true) {
- long currentTime = System.currentTimeMillis();
- long expiredStartTime = 0;
- if (WSMGParameter.requireSubscriptionRenew) {
- expiredStartTime = currentTime - expirationTime; // expired
- }
- long availabilityCheckTime = 0;
- availabilityCheckTime = currentTime - skipCheckInterval; // It's
- // time
- // to
- // check
- // again
-
- // logger.finest("CleanUpThread loop");
- keySet = subMan.getShallowSubscriptionsCopy().keySet();
- // Go through all the subscriptions and delete expired ones
- for (Iterator<String> iterator = keySet.iterator(); iterator.hasNext();) {
- key = iterator.next();
- subscription = subMan.getShallowSubscriptionsCopy().get(key);
- if (subscription.isNeverExpire()) {
- continue;
- }
- long subscriptionCreationTime = subscription.getCreationTime();
- long lastAvailableTime = subscription.getLastAvailableTime();
- if (WSMGParameter.requireSubscriptionRenew) { // expired
- if (subscriptionCreationTime < expiredStartTime) { // expired
- // or
- // need
- // to
- // check
- // again
- try {
- subMan.removeSubscription(key);
- } catch (AxisFault e) {
- logger.error(e.getMessage(), e);
- }
- // Not need to remove the key from the keyset since
- // the keyset
- // "is backed by the map, so changes to the map are reflected in the set, and vice-versa."
- // i.remove(); //add this will cause
- // ConcurrentModificationException
- logger.info("*****Deleted (expiration)" + key + "-->" + subscription.getConsumerIPAddressStr()
- + "##" + subscription.getLocalTopic());
- logger.info("*****Deleted (expiration)" + key + "-->" + subscription.getConsumerIPAddressStr()
- + "##" + subscription.getLocalTopic());
- continue;
- }
- }
- if (lastAvailableTime < availabilityCheckTime) {
- // It's time to check again
- if (CommonRoutines.isAvailable(subscription.getConsumerAddressURI())) {
- // It's time to check but still available and do not
- // require subscriptio renew
- // set a mark saying it has been check at this time
- subscription.setLastAvailableTime(currentTime);
- if (subscription.getUnAvailableCounter() > 0) { // failed
- // in
- // previous
- // try
- subscription.resetUnAvailableCounter();
- }
- } else {
- int counter = subscription.addUnAvailableCounter();
- // System.out.println("UnavailableCounter="+counter);
- // logger.finest("UnavailableCounter="+counter);
- if (counter > MAX_TRY) {
- try {
- subMan.removeSubscription(key);
- } catch (AxisFault e) {
- // TODO Auto-generated catch block
- logger.error(e.getMessage(), e);
- }
-
- // Remove from hashtable seperately to avoid
- // conccurent access problem to the hashtable
- // with
- // i.next()
- iterator.remove();
- logger.info("*****Deleted (unavailable)" + key + "-->"
- + subscription.getConsumerIPAddressStr() + "##" + subscription.getLocalTopic());
- logger.info("*****Deleted (unavailable)" + key + "-->"
- + subscription.getConsumerIPAddressStr() + "##" + subscription.getLocalTopic());
- }
- }
- }
- }
- try {
- Thread.sleep(checkupInterval);
- } catch (InterruptedException e) {
- logger.error("thread was interrupped", e);
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java
deleted file mode 100644
index a9339fd..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.subscription;
-
-public class SubscriptionEntry {
-
- private String subscriptionId;
-
- private String subscribeXml;
-
- public SubscriptionEntry() {
- }
-
- public String getSubscriptionId() {
- return subscriptionId;
- }
-
- public String getSubscribeXml() {
- return subscribeXml;
- }
-
- public void setSubscriptionId(String subscriptionId) {
- this.subscriptionId = subscriptionId;
- }
-
- public void setSubscribeXml(String subscribeXml) {
- this.subscribeXml = subscribeXml;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java
deleted file mode 100644
index e42e4c1..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.subscription;
-
-import java.io.StringReader;
-import java.util.AbstractMap;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-
-import org.apache.airavata.wsmg.broker.context.ContextParameters;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.broker.context.ProcessingContextBuilder;
-import org.apache.airavata.wsmg.broker.wseventing.WSEProcessingContextBuilder;
-import org.apache.airavata.wsmg.broker.wseventing.WSEProtocolSupport;
-import org.apache.airavata.wsmg.broker.wsnotification.WSNTProtocolSupport;
-import org.apache.airavata.wsmg.broker.wsnotification.WSNotificationProcessingContextBuilder;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
-import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.apache.airavata.wsmg.util.RunTimeStatistics;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.impl.builder.StAXOMBuilder;
-import org.apache.axis2.AxisFault;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages subscribers.
- *
- */
-public class SubscriptionManager {
-
- private static final Logger log = LoggerFactory.getLogger(SubscriptionManager.class);
-
- private HashMap<String, SubscriptionState> subscriptions = new HashMap<String, SubscriptionState>();
-
- private ReentrantReadWriteLock subscriptionLock = new ReentrantReadWriteLock();
-
- private WSEProtocolSupport wseProtocalSupport = new WSEProtocolSupport();
-
- private WSNTProtocolSupport wsntProtocolSupport = new WSNTProtocolSupport();
-
- private WsmgStorage subscriptionDB;
-
- private WsmgConfigurationContext wsmgConfig;
-
- private OutGoingQueue outGoingQueue;
-
- private int counter = 1;
-
- public SubscriptionManager(WsmgConfigurationContext paramters, WsmgStorage storage) {
- init(paramters, storage);
- }
-
- private void init(WsmgConfigurationContext parameters, WsmgStorage storage) {
-
- this.wsmgConfig = parameters;
-
- subscriptionDB = storage;
- outGoingQueue = parameters.getOutgoingQueue();
- if (WSMGParameter.enableAutoCleanSubscriptions) {
- CleanUpThread cleanUpThread = new CleanUpThread(this);
- Thread t = new Thread(cleanUpThread);
- t.start();
- }
-
- try {
- checkSubscriptionDB(storage);
- } catch (AxisFault e) {
- log.error("Subscription database has malformed" + " subscriptions. Ignoring them.", e);
-
- }
-
- }
-
- /**
- * @return Returns the subscriptions.
- */
- public AbstractMap<String, SubscriptionState> getShallowSubscriptionsCopy() {
-
- AbstractMap<String, SubscriptionState> ret = null;
- readLockUnlockSubscriptions(true);
- try {
- ret = new HashMap<String, SubscriptionState>(subscriptions);
- } finally {
- readLockUnlockSubscriptions(false);
- }
-
- return ret;
-
- }
-
- public void subscribe(ProcessingContext ctx) throws AxisFault {
-
- String subId = createSubscription(null, ctx);
- if (subId == null) {
- log.error("ERROR: No subscription created");
- return;
- }
-
- if (NameSpaceConstants.WSE_NS.equals(ctx.getContextParameter(ContextParameters.SUBSCRIBE_ELEMENT)
- .getNamespace())) {
- wseProtocalSupport.createSubscribeResponse(ctx, subId);
-
- } else { // WSNT
-
- wsntProtocolSupport.createSubscribeResponse(ctx, subId);
- }
- }
-
- /**
- * @param subscriptionId
- * this is the ID that is in the SOAP header
- * @param ctx
- * contexts constructed with the body elements
- * @return subscription id
- * @throws AxisFault
- */
- private String createSubscription(String subscriptionId, ProcessingContext ctx) throws AxisFault {
-
- SubscriptionState state = null;
- String key = null;
-
- // get the first element element inside the soap body element and check
- // whether namespace is WSE
- if (NameSpaceConstants.WSE_NS.equals(ctx.getContextParameter(ContextParameters.SUBSCRIBE_ELEMENT)
- .getNamespace())) {
- state = wseProtocalSupport.createSubscriptionState(ctx, outGoingQueue);
- } else { // Handle WSNT
-
- state = wsntProtocolSupport.createSubscriptionState(ctx, outGoingQueue);
- }
-
- if (subscriptionId == null) { // New subscription entry
- key = checkSubscriptionExist(state);
- if (key != null) { // just renew previous subscriptions
- return key;
- }
- // new subscriptions
-
- state.setCreationTime(System.currentTimeMillis());
-
- key = generateSubscriptionId(state.getXpathString() != null && state.getXpathString().length() > 0);
-
- } else { // Startup from previous subscription database
- key = subscriptionId;
- }
-
- for (AbstractMessageMatcher m : wsmgConfig.getMessageMatchers()) {
- m.handleSubscribe(state, key);
- }
-
- if (subscriptionId == null) { // New subscription entry,
-
- RunTimeStatistics.totalSubscriptions++;
- try {
- String subscribeXml = ctx.getContextParameter(ContextParameters.SUBSCRIBE_ELEMENT)
- .toStringWithConsume();
-
- state.setId(key);
- state.setSubscribeXml(subscribeXml);
- subscriptionDB.insert(state);
-
- } catch (Exception ex) {
- log.error("unable to insert subscription to database", ex);
- throw new AxisFault("unable to insert subscription to database ", ex);
- }
- }
-
- addToSubscriptionMap(key, state);
- return key;
- }
-
- private void addToSubscriptionMap(String key, SubscriptionState state) {
-
- writeLockUnlockSubscription(true);
- try {
- subscriptions.put(key, state);
- } finally {
- writeLockUnlockSubscription(false);
- }
-
- }
-
- /**
- * @param xpathString
- * @return
- */
- private String generateSubscriptionId(boolean xPath) {
- String key;
- String subIdPrefix = null; // Used to indicate weather a subscription
- // has an XPath subscription Or Topic
- // only.
- if (!xPath) {
- subIdPrefix = "T";
- } else {
- subIdPrefix = "X";
- }
- key = subIdPrefix + "sub" + (counter++) + "@" + WsmgCommonConstants.PREFIX;
- return key;
- }
-
- /**
- * if find the subscription already exists, return the current subscriptionId else return null;
- */
-
- public String checkSubscriptionExist(SubscriptionState state) {
-
- String key = null;
-
- readLockUnlockSubscriptions(true);
- try {
-
- for (Iterator<String> keyIterator = subscriptions.keySet().iterator(); keyIterator.hasNext();) {
-
- String currentKey = keyIterator.next();
- SubscriptionState value = subscriptions.get(currentKey);
-
- if (value.equals(state)) {
- value.setCreationTime(System.currentTimeMillis());
- log.info("Subscription Already exists." + " Using the current subscriptionId");
- key = currentKey;
- break;
- }
-
- }
-
- } finally {
- readLockUnlockSubscriptions(false);
- }
-
- return key;
- }
-
- public void checkSubscriptionDB(WsmgStorage storage) throws AxisFault {
- OMElement subscribeXmlElement;
- String subscriptionId;
- // Read subscription Info from Subscription DB
- List<SubscriptionEntry> subscriptionEntry = storage.getAllSubscription();
- if (subscriptionEntry == null) {
- return;
- }
-
- WSNotificationProcessingContextBuilder wsntBuilder = new WSNotificationProcessingContextBuilder();
- WSEProcessingContextBuilder wseBuilder = new WSEProcessingContextBuilder();
-
- // Create subscription for these entries from DB
- for (int i = 0; i < subscriptionEntry.size(); i++) {
-
- ProcessingContextBuilder processingCtxBuilder = null;
-
- log.info("Subscription No. " + i + " is " + subscriptionEntry.get(i).getSubscriptionId());
-
- StringReader sr = new StringReader(subscriptionEntry.get(i).getSubscribeXml());
- XMLInputFactory inputFactory = XMLInputFactory.newInstance();
- XMLStreamReader inflow;
- try {
- inflow = inputFactory.createXMLStreamReader(sr);
-
- StAXOMBuilder builder = new StAXOMBuilder(inflow); // get the
- // root
- // element (in
- // this case the
- // envelope)
- subscribeXmlElement = builder.getDocumentElement();
-
- if (subscribeXmlElement.getNamespace().getNamespaceURI()
- .equals(NameSpaceConstants.WSNT_NS.getNamespaceURI())) {
- processingCtxBuilder = wsntBuilder;
-
- } else {
- processingCtxBuilder = wseBuilder;
- }
-
- subscriptionId = subscriptionEntry.get(i).getSubscriptionId();
-
- ProcessingContext context = processingCtxBuilder.build(subscribeXmlElement);
- createSubscription(subscriptionId, context);
-
- } catch (XMLStreamException e) {
- log.error("error occured while checking subscription db", e);
- }
- }
- RunTimeStatistics.totalSubscriptionsAtStartUp += subscriptionEntry.size();
- }
-
- // This is used for debug
- public void showAllSubscription() {
- String key = null;
- SubscriptionState value = null;
- Set<String> keySet = subscriptions.keySet();
- log.info("List of all subscriptions:");
- for (Iterator<String> iterator = keySet.iterator(); iterator.hasNext();) {
- key = iterator.next();
- value = subscriptions.get(key);
- log.info("******" + key + "-->" + value.getConsumerIPAddressStr() + "##" + value.getLocalTopic());
- }
- }
-
- public int unsubscribe(ProcessingContext ctx) throws AxisFault {
-
- String subscriptionId = ctx.getContextParameter(ContextParameters.SUB_ID);
- if (subscriptionId == null || subscriptionId.trim().length() == 0) {
- throw new AxisFault("subscription identifier is not provided");
- }
-
- removeSubscription(subscriptionId);
- RunTimeStatistics.totalUnSubscriptions++;
- return 0;
- }
-
- int removeSubscription(String subId) throws AxisFault {
-
- SubscriptionState subscription = null;
-
- writeLockUnlockSubscription(true);
- try {
- subscription = subscriptions.remove(subId);
- } finally {
- writeLockUnlockSubscription(false);
- }
-
- if (subscription == null) {
- throw AxisFault.makeFault(new RuntimeException("unknown subscription: " + subId));
-
- }
-
- subscriptionDB.delete(subId);
-
- for (AbstractMessageMatcher mm : wsmgConfig.getMessageMatchers()) {
- mm.handleUnsubscribe(subId);
- }
-
- return 0;
- }
-
- public void resumeSubscription(ProcessingContext ctx) throws AxisFault {
-
- String subscriptionId = ctx.getContextParameter(ContextParameters.SUB_ID);
-
- if (subscriptionId == null) {
- throw AxisFault.makeFault(new RuntimeException("missing subscription id"));
- }
-
- writeLockUnlockSubscription(true);// lock
- try {
- SubscriptionState subscription = subscriptions.get(subscriptionId);
-
- if (subscription == null) {
-
- throw AxisFault.makeFault(new RuntimeException("no subscription found for id: " + subscriptionId));
- }
-
- subscription.resume();
- } finally {
- // this will execute even exception is thrown.
- writeLockUnlockSubscription(false);
- }
- }
-
- public void pauseSubscription(ProcessingContext ctx) throws AxisFault {
-
- String subscriptionId = ctx.getContextParameter(ContextParameters.SUB_ID);
-
- if (subscriptionId == null) {
- throw AxisFault.makeFault(new RuntimeException("missing subscription id"));
- }
-
- writeLockUnlockSubscription(true);// read lock should be sufficient
- // (since we are not modifying the
- // map)
- try {
- SubscriptionState subscription = subscriptions.get(subscriptionId);
-
- if (subscription == null) {
-
- throw AxisFault.makeFault(new RuntimeException("no subscription found for id: " + subscriptionId));
-
- }
-
- subscription.pause();
- } finally {
- // this will execute even exception is thrown.
- writeLockUnlockSubscription(false);
- }
- }
-
- public void readLockUnlockSubscriptions(boolean lock) {
- ReadLock readlock = subscriptionLock.readLock();
- lockUnlock(readlock, lock);
- }
-
- public void writeLockUnlockSubscription(boolean lock) {
- WriteLock writeLock = subscriptionLock.writeLock();
- lockUnlock(writeLock, lock);
- }
-
- private void lockUnlock(Lock l, boolean lock) {
-
- if (lock) {
- l.lock();
- } else {
- l.unlock();
- }
-
- }
-}