You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:30:28 UTC

[26/90] [abbrv] [partial] AIRAVATA-1124

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
deleted file mode 100644
index 4737ef9..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.lang.reflect.Constructor;
-import java.net.URI;
-
-import org.apache.airavata.client.AiravataAPIFactory;
-import org.apache.airavata.client.api.AiravataAPI;
-import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
-import org.apache.airavata.client.tools.PeriodicExecutorThread;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ServiceUtils;
-import org.apache.airavata.wsmg.broker.handler.PublishedMessageHandler;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.commons.storage.WsmgInMemoryStorage;
-import org.apache.airavata.wsmg.commons.storage.WsmgPersistantStorage;
-import org.apache.airavata.wsmg.commons.util.Axis2Utils;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
-import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
-import org.apache.airavata.wsmg.messenger.Deliverable;
-import org.apache.airavata.wsmg.messenger.DeliveryProcessor;
-import org.apache.airavata.wsmg.messenger.SenderUtils;
-import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
-import org.apache.airavata.wsmg.messenger.protocol.impl.Axis2Protocol;
-import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
-import org.apache.airavata.wsmg.messenger.strategy.impl.FixedParallelSender;
-import org.apache.airavata.wsmg.messenger.strategy.impl.ParallelSender;
-import org.apache.airavata.wsmg.messenger.strategy.impl.SerialSender;
-import org.apache.airavata.wsmg.util.RunTimeStatistics;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.engine.ServiceLifeCycle;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BrokerServiceLifeCycle implements ServiceLifeCycle {
-
-    private static final Logger log = LoggerFactory.getLogger(BrokerServiceLifeCycle.class);
-//    public static final String REPOSITORY_PROPERTIES = "airavata-server.properties";
-    public static final int GFAC_URL_UPDATE_INTERVAL = 1000 * 60 * 60 * 3;
-
-    public static final int JCR_AVAIALABILITY_WAIT_INTERVAL = 1000 * 10;
-    public static final String JCR_CLASS = "jcr.class";
-    public static final String JCR_USER = "jcr.user";
-    public static final String JCR_PASS = "jcr.pass";
-    public static final String ORG_APACHE_JACKRABBIT_REPOSITORY_URI = "org.apache.jackrabbit.repository.uri";
-    private static final String MESSAGE_BROKER_SERVICE_NAME = "EventingService";
-    private static final String SERVICE_URL = "message_broker_service_url";
-    private static final String JCR_REGISTRY = "registry";
-    private Thread thread;
-
-    private static final long DEFAULT_SOCKET_TIME_OUT = 20000l;
-
-    private DeliveryProcessor proc;
-    private ConsumerUrlManager urlManager;
-
-    private static Boolean initialized = false;
-
-    public void shutDown(ConfigurationContext configurationcontext, AxisService service) {
-        log.info("broker shutting down");
-        if (proc != null) {
-            proc.stop();
-            proc = null;
-        }
-        if (urlManager != null) {
-            urlManager.stop();
-            urlManager = null;
-        }
-
-        synchronized (initialized) {
-            if (initialized) {
-                initialized = false;
-                AiravataAPI registry = (AiravataAPI) configurationcontext.getProperty(JCR_REGISTRY);
-                if(registry != null && thread != null){
-                    try {
-                        registry.getAiravataManager().unsetEventingURI();
-                    } catch (AiravataAPIInvocationException e) {
-                        e.printStackTrace();
-                    }
-                    thread.interrupt();
-                try {
-                    thread.join();
-                } catch (InterruptedException e) {
-                    log.info("Message box url update thread is interrupted");
-                }
-                }
-            }
-        }
-        log.info("broker shut down");
-    }
-
-    public void startUp(ConfigurationContext configContext, AxisService axisService) {
-    	AiravataUtils.setExecutionAsServer();
-        Boolean inited = (Boolean) configContext.getProperty(WsmgCommonConstants.BROKER_INITED);
-
-        if (inited == null || inited == false) {
-            log.info("Starting Message Broker...");
-            Axis2Utils.overrideAddressingPhaseHander(configContext, new PublishedMessageHandler());
-            WsmgConfigurationContext brokerConext = initConfigurations(configContext, axisService);
-            initQueue(brokerConext);
-            initDeliveryMethod(brokerConext.getConfigurationManager());
-
-            inited = true;
-            configContext.setProperty(WsmgCommonConstants.BROKER_INITED, inited);
-        } else {
-            log.debug("init was already done by another webservice");
-        }
-
-        final ConfigurationContext context = configContext;
-        synchronized (initialized) {
-            if (!initialized) {
-                initialized = true;
-                new Thread() {
-                    @Override
-                    public void run() {
-//                        Properties properties = new Properties();
-                        try {
-//                            URL url = this.getClass().getClassLoader()
-//                                    .getResource(REPOSITORY_PROPERTIES);
-//                            properties.load(url.openStream());
-//                            Map<String, String> map = new HashMap<String, String>(
-//                                    (Map) properties);
-                            try {
-                                Thread.sleep(JCR_AVAIALABILITY_WAIT_INTERVAL);
-                            } catch (InterruptedException e1) {
-                                e1.printStackTrace();
-                            }
-
-                            String userName = ServerSettings.getSystemUser();
-                            String gateway = ServerSettings.getSystemUserGateway();
-
-                            AiravataAPI airavataAPI = AiravataAPIFactory.getAPI(gateway, userName);
-                            String localAddress = ServiceUtils
-                                    .generateServiceURLFromConfigurationContext(
-                                            context,
-                                            MESSAGE_BROKER_SERVICE_NAME);
-                            log.debug("MESSAGE BOX SERVICE_ADDRESS:"
-                                    + localAddress);
-                            context.setProperty(SERVICE_URL, new URI(
-                                    localAddress));
-                            context.setProperty(JCR_REGISTRY, airavataAPI);
-                            /*
-                                    * Heart beat message to registry
-                                    */
-                            thread = new MsgBrokerURLRegisterThread(airavataAPI, context);
-                            thread.start();
-                        } catch (Exception e) {
-                            log.error(e.getMessage(), e);
-                        }
-                    }
-                }.start();
-            }
-        }
-    }
-
-    private WsmgConfigurationContext initConfigurations(ConfigurationContext configContext, AxisService axisService) {
-
-        WsmgConfigurationContext wsmgConfig = new WsmgConfigurationContext();
-        configContext.setProperty(WsmgCommonConstants.BROKER_WSMGCONFIG, wsmgConfig);
-
-        ConfigurationManager configMan = new ConfigurationManager();
-
-        wsmgConfig.setConfigurationManager(configMan);
-
-        String type = configMan.getConfig(WsmgCommonConstants.CONFIG_STORAGE_TYPE,
-                WsmgCommonConstants.STORAGE_TYPE_PERSISTANT);
-
-        /*
-         * Determine Storage
-         */
-        if (WsmgCommonConstants.STORAGE_TYPE_IN_MEMORY.equalsIgnoreCase(type)) {
-            WsmgInMemoryStorage inmem = new WsmgInMemoryStorage();
-
-            wsmgConfig.setStorage(inmem);
-            wsmgConfig.setQueue(inmem);
-            wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, inmem));
-
-        } else {
-            String jdbcUrl = configMan.getConfig(WsmgCommonConstants.CONFIG_JDBC_URL);
-            String jdbcDriver = configMan.getConfig(WsmgCommonConstants.CONFIG_JDBC_DRIVER);
-            WsmgPersistantStorage persis = new WsmgPersistantStorage(jdbcUrl, jdbcDriver);
-
-            wsmgConfig.setStorage(persis);
-            wsmgConfig.setQueue(persis);
-            wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, persis));
-        }
-
-        NotificationProcessor notificatonProcessor = new NotificationProcessor(wsmgConfig);
-        wsmgConfig.setNotificationProcessor(notificatonProcessor);
-
-        return wsmgConfig;
-    }
-
-    private void initQueue(WsmgConfigurationContext context) {
-
-        log.debug("setting up queue");
-
-        WSMGParameter.OUT_GOING_QUEUE = context.getQueue();
-
-        if (WSMGParameter.cleanQueueonStartUp) {
-            log.debug("cleaning up persistant queue");
-            WSMGParameter.OUT_GOING_QUEUE.cleanup();
-            log.debug("cleaned up persistant queue");
-        }
-
-        RunTimeStatistics.setStartUpTime();
-
-    }
-
-    private void initDeliveryMethod(ConfigurationManager configMan) {
-
-        String shouldStart = configMan.getConfig(WsmgCommonConstants.CONFIG_START_DELIVERY_THREADS);
-
-        if (!Boolean.parseBoolean(shouldStart)) {
-
-            if (configMan.getConfig(WsmgCommonConstants.CONFIG_STORAGE_TYPE,
-                    WsmgCommonConstants.STORAGE_TYPE_PERSISTANT).equalsIgnoreCase(
-                    WsmgCommonConstants.STORAGE_TYPE_IN_MEMORY)) {
-
-                /*
-                 * user has asked to use in memory queue but without starting the delivery thread. this will accumulate
-                 * message in memory.
-                 */
-                log.error("conflicting configuration detected, using in memory queue without starting delivery thread will result memory growth.");
-
-            }
-            return;
-        }
-
-        /*
-         * Create Protocol
-         */
-        DeliveryProtocol protocol;
-        String protocolClass = configMan
-                .getConfig(WsmgCommonConstants.DELIVERY_PROTOCOL, Axis2Protocol.class.getName());
-        try {
-            Class cl = Class.forName(protocolClass);
-            Constructor<DeliveryProtocol> co = cl.getConstructor(null);
-            protocol = co.newInstance((Object[]) null);
-
-        } catch (Exception e) {
-            log.error("Cannot initial protocol sender", e);
-            return;
-        }
-        protocol.setTimeout(configMan.getConfig(WsmgCommonConstants.CONFIG_SOCKET_TIME_OUT, DEFAULT_SOCKET_TIME_OUT));
-
-        /*
-         * Create delivery method
-         */
-        SendingStrategy method = null;
-        String initedmethod = null;
-        String deliveryMethod = configMan.getConfig(WsmgCommonConstants.CONFIG_DELIVERY_METHOD,
-                WsmgCommonConstants.DELIVERY_METHOD_SERIAL);
-        if (WsmgCommonConstants.DELIVERY_METHOD_PARALLEL.equalsIgnoreCase(deliveryMethod)) {
-            method = new ParallelSender();
-            initedmethod = WsmgCommonConstants.DELIVERY_METHOD_PARALLEL;
-
-        } else if (WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW.equalsIgnoreCase(deliveryMethod)) {
-            int poolsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_THREAD_POOL_SIZE,
-                    WsmgCommonConstants.DEFAULT_SENDING_THREAD_POOL_SIZE);
-            int batchsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_BATCH_SIZE,
-                    WsmgCommonConstants.DEFAULT_SENDING_BATCH_SIZE);
-            method = new FixedParallelSender(poolsize, batchsize);
-            initedmethod = WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW;
-
-        } else {
-            method = new SerialSender();
-            initedmethod = WsmgCommonConstants.DELIVERY_METHOD_SERIAL;
-        }
-
-        /*
-         * Create Deliverable
-         */
-        urlManager = new ConsumerUrlManager(configMan);
-        Deliverable senderUtils = new SenderUtils(urlManager);
-        senderUtils.setProtocol(protocol);
-
-        proc = new DeliveryProcessor(senderUtils, method);
-        proc.start();
-        log.debug(initedmethod + " sending method inited");
-    }
-
-    class MsgBrokerURLRegisterThread extends PeriodicExecutorThread {
-
-        private ConfigurationContext context = null;
-
-        public MsgBrokerURLRegisterThread(AiravataAPI registry, ConfigurationContext context) {
-            super(registry);
-            this.context = context;
-        }
-
-
-        protected void updateRegistry(AiravataAPI registry) {
-            try {
-                URI localAddress = (URI) this.context.getProperty(SERVICE_URL);
-                registry.getAiravataManager().setEventingURI(localAddress);
-            } catch (AiravataAPIInvocationException e) {
-                e.printStackTrace();
-            }
-            log.debug("Updated Eventing service URL in to Repository");
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java
deleted file mode 100644
index d087a87..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.io.Serializable;
-
-public class ConsumerInfo implements Serializable {
-    static final long serialVersionUID = 2274650724788817903L;
-
-    // TODO : change this to OM, EndpointReference.
-    String consumerEprStr;
-
-    String type; // Either "wsnt" or "wse"
-
-    boolean useNotify;
-
-    boolean paused = false;
-
-    boolean wsrmEnabled;
-
-    /**
-     * @param consumerEprStr
-     * @param type
-     * @param useNotify
-     * @param paused
-     */
-    public ConsumerInfo(String consumerEprStr, String type, boolean useNotify, boolean paused) {
-        super();
-        // TODO Auto-generated constructor stub
-        this.consumerEprStr = consumerEprStr;
-        this.type = type;
-        this.useNotify = useNotify;
-        this.paused = paused;
-    }
-
-    /**
-     * @return Returns the consumerEprStr.
-     */
-    public String getConsumerEprStr() {
-        return consumerEprStr;
-    }
-
-    /**
-     * @param consumerEprStr
-     *            The consumerEprStr to set.
-     */
-    public void setConsumerEprStr(String consumerEprStr) {
-        this.consumerEprStr = consumerEprStr;
-    }
-
-    /**
-     * @return Returns the paused.
-     */
-    public boolean isPaused() {
-        return paused;
-    }
-
-    /**
-     * @param paused
-     *            The paused to set.
-     */
-    public void setPaused(boolean paused) {
-        this.paused = paused;
-    }
-
-    /**
-     * @return Returns the type.
-     */
-    public String getType() {
-        return type;
-    }
-
-    /**
-     * @param type
-     *            The type to set.
-     */
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    /**
-     * @return Returns the useNotify.
-     */
-    public boolean isUseNotify() {
-        return useNotify;
-    }
-
-    /**
-     * @param useNotify
-     *            The useNotify to set.
-     */
-    public void setUseNotify(boolean useNotify) {
-        this.useNotify = useNotify;
-    }
-
-    public boolean isWsrmEnabled() {
-        return wsrmEnabled;
-    }
-
-    public void setWsrmEnabled(boolean wsrmEnabled) {
-        this.wsrmEnabled = wsrmEnabled;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java
deleted file mode 100644
index 38d8a3a..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ConsumerList {
-    Map<String, ConsumerInfo> subId2ConsumerInfo = new HashMap<String, ConsumerInfo>();
-
-    ArrayList<ConsumerInfo> consumerInfoList = new ArrayList<ConsumerInfo>();
-
-    boolean changed = true;
-    int size = 0; // use size instead of consumerInfoList.size() to avoid
-
-    // unnecessary copy of subId2ConsumerInfo in
-    // refreshConsumerInfoList() if just need the size
-
-    public void addConsumer(String subId, ConsumerInfo consumerInfo) {
-        subId2ConsumerInfo.put(subId, consumerInfo);
-        changed = true;
-        size++;
-    }
-
-    public int removeConsumer(String subId) {
-        ConsumerInfo result = subId2ConsumerInfo.remove(subId);
-        if (result == null) {
-            return 0;
-        }
-        changed = true;
-        size--;
-        return 1;
-
-    }
-
-    public ArrayList<ConsumerInfo> getConsumerList() {
-        if (changed) {
-            refreshConsumerInfoList();
-        }
-        return consumerInfoList;
-    }
-
-    public int size() {
-        return size;
-    }
-
-    private void refreshConsumerInfoList() {
-        consumerInfoList.clear();
-        consumerInfoList.addAll(subId2ConsumerInfo.values());
-        changed = false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java
deleted file mode 100644
index b5ca1bb..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsumerListManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(ConsumerListManager.class);
-
-    protected Map<String, ConsumerList> token2ConsumerListMap = new HashMap<String, ConsumerList>();
-
-    protected Map<String, String> subId2Token = new HashMap<String, String>();
-
-    // token can be a topic or an XPath String
-    public void addToConsumerList(String token, SubscriptionState subscribeRequest, String subscriptionId) {
-        ConsumerList consumerList = token2ConsumerListMap.get(token);
-        if (consumerList == null) { // new topic
-            consumerList = new ConsumerList();
-            token2ConsumerListMap.put(token, consumerList);
-        }
-        consumerList.addConsumer(subscriptionId, subscribeRequest.getConsumerInfo());
-        subId2Token.put(subscriptionId, token);
-
-    }
-
-    public String getTokenBySubscriptionId(String subscriptionId) {
-        String token = subId2Token.get(subscriptionId);
-        return token;
-    }
-
-    public int removeFromConsumerList(String subscriptionId, String token) {
-        String tokenString = null;
-        if (token == null) {
-            tokenString = subId2Token.get(subscriptionId);
-        } else {
-            tokenString = token;
-        }
-
-        ConsumerList consumerList = token2ConsumerListMap.get(tokenString);
-        if (consumerList == null) {
-            logger.error("*****ERROR:Cannot find the token to delete: " + tokenString);
-            return 0;
-        }
-        int result = consumerList.removeConsumer(subscriptionId);
-        subId2Token.remove(subscriptionId);
-        return result;
-    }
-
-    public ConsumerList getConsumerListByToken(String token) {
-        ConsumerList consumerList = token2ConsumerListMap.get(token);
-        return consumerList;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
deleted file mode 100644
index 10ae03b..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.util.*;
-
-import javax.xml.namespace.QName;
-import javax.xml.stream.XMLStreamException;
-
-import org.apache.airavata.wsmg.broker.amqp.AMQPNotificationProcessor;
-import org.apache.airavata.wsmg.broker.context.ContextParameters;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
-import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.apache.airavata.wsmg.util.BrokerUtil;
-import org.apache.airavata.wsmg.util.RunTimeStatistics;
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMAttribute;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMException;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.axiom.om.xpath.AXIOMXPath;
-import org.apache.axis2.AxisFault;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NotificationProcessor {
-
-    private static final Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);
-
-    private WsmgConfigurationContext wsmgConfigContext;
-
-    protected long messageCounter = 0;
-    protected long messageId = 0;
-
-    OMFactory factory = OMAbstractFactory.getOMFactory();
-
-    private OutGoingQueue outgoingQueue;
-
-    private AMQPNotificationProcessor amqpNotificationProcessor = new AMQPNotificationProcessor();
-
-    public NotificationProcessor(WsmgConfigurationContext config) {
-        init(config);
-        amqpNotificationProcessor.init();
-    }
-
-    private void init(WsmgConfigurationContext config) {
-        this.wsmgConfigContext = config;
-        outgoingQueue = config.getOutgoingQueue();
-    }
-
-    private synchronized long getNextTrackId() {
-        messageCounter++;
-        return messageCounter;
-    }
-
-    private synchronized long getNextMsgId() {
-        messageId++;
-        return messageId;
-    }
-
-    public void processMsg(ProcessingContext ctx, OMNamespace protocolNs) throws OMException, AxisFault {
-
-        String trackId = "trackId_A_" + getNextTrackId();
-        if (WSMGParameter.showTrackId) {
-            logger.debug(trackId + ": received.");
-        }
-
-        AdditionalMessageContent additionalMessageContent = new AdditionalMessageContent(ctx.getMessageContext()
-                .getSoapAction(), ctx.getMessageContext().getMessageID());
-        additionalMessageContent.setTrackId(trackId);
-
-        handleExtendedNotifications(ctx, protocolNs);
-
-        if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {
-
-            onWSNTMsg(ctx, additionalMessageContent);
-            setResponseMsg(ctx, trackId, protocolNs);
-        } else { // WSE Notifications No specific namespace
-
-            onWSEMsg(ctx, trackId, additionalMessageContent);
-           setResponseMsg(ctx, trackId, protocolNs);
-        }
-    }
-
-    /**
-     * @param ctx
-     * @param topicElString
-     * @param trackId
-     * @param additionalMessageContent
-     * @throws OMException
-     * @throws XMLStreamException
-     */
-    private void onWSEMsg(ProcessingContext ctx, String trackId, AdditionalMessageContent additionalMessageContent)
-            throws OMException, AxisFault {
-
-        String topicElString = null;
-        String topicLocalString = null;
-
-        QName qName = new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Topic");
-
-        OMElement topicEl = ctx.getMessageContext().getEnvelope().getHeader().getFirstChildWithName(qName);
-
-        if (topicEl == null) {
-
-            topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL);
-
-            if (topicLocalString != null) {
-
-                topicElString = "<wsnt:Topic "
-                        + "Dialect=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-Topics/TopicExpression/simple\" "
-                        + "xmlns:ns2=\"http://tutorial.globus.org/auction\" "
-                        + "xmlns:wsnt=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification\">" + "ns2:"
-                        + topicLocalString + "</wsnt:Topic>";
-                // / }
-                additionalMessageContent.setTopicElement(topicElString);
-            } else {
-
-                topicLocalString = "wseTopic";
-                topicElString = "<wsnt:Topic "
-                        + "Dialect=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-Topics/TopicExpression/simple\" "
-                        + "xmlns:ns2=\"http://tutorial.globus.org/auction\" "
-                        + "xmlns:wsnt=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification\">"
-                        + "ns2:wseTopic</wsnt:Topic>";
-                // / }
-                additionalMessageContent.setTopicElement(topicElString);
-            }
-        } else {
-
-            topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText());
-            try {
-                topicElString = topicEl.toStringWithConsume();
-            } catch (XMLStreamException e) {
-                logger.error("exceptions occured at WSE eventing notification creating", e);
-            }
-            additionalMessageContent.setTopicElement(topicElString);
-        }
-
-        OMElement messageEl = ctx.getSoapBody().getFirstElement();
-        if (messageEl == null) {
-            throw new AxisFault("no message found");
-        }
-
-        String message = null;
-        try {
-            message = messageEl.toStringWithConsume();
-        } catch (XMLStreamException e) {
-            logger.error("unable to serialize the message", e);
-            throw new AxisFault("unable to serialize the message", e);
-        }
-
-        matchAndSave(message, topicLocalString, additionalMessageContent);
-    }
-
-    /**
-     * @param ctx
-     * @param trackId
-     * @throws OMException
-     */
-    private void setResponseMsg(ProcessingContext ctx, String trackId, OMNamespace responseNS) throws OMException {
-        // set response message
-
-        ctx.addResponseMsgNameSpaces(responseNS);
-
-        OMAttribute trackIdAttribute = factory.createOMAttribute("trackId", null, trackId);
-        OMElement messageElement = ctx.getMessageContext().getEnvelope().getBody().getFirstElement();
-        OMElement responseMsgElement = factory.createOMElement(messageElement.getLocalName() + "Response", responseNS);
-        responseMsgElement.addAttribute(trackIdAttribute);
-        ctx.setRespMessage(responseMsgElement);
-
-    }
-
-    /**
-     * @param ctx
-     * @param topicLocalString
-     * @param topicElString
-     * @param producerReferenceElString
-     * @param additionalMessageContent
-     * @throws OMException
-     * @throws XMLStreamException
-     * @throws AxisFault
-     */
-    private void onWSNTMsg(ProcessingContext ctx, AdditionalMessageContent additionalMessageContent)
-            throws OMException, AxisFault {
-
-        String producerReferenceElString = null;
-        String topicElString = null;
-
-        boolean noElements = true;
-
-        // TODO: set nicely with a processing context
-        OMElement notifyEl = ctx.getSoapBody().getFirstElement();
-        for (Iterator<OMElement> iter = notifyEl.getChildrenWithLocalName("NotificationMessage"); iter.hasNext();) {
-            noElements = false;
-            OMElement wrappedMessageEl = iter.next();
-
-            String topicLocalString = null;
-
-            OMElement topicEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
-                    .getNamespaceURI(), "Topic"));
-            if (topicEl != null) {
-
-                topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText()); // get what ever inside this
-                                                                                      // element
-
-                try {
-                    topicElString = topicEl.toStringWithConsume();
-                } catch (XMLStreamException e) {
-                    logger.error("exception occured while creating NotificationConsumer", e);
-                }
-                additionalMessageContent.setTopicElement(topicElString);
-            }
-            OMElement producerReferenceEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
-                    .getNamespaceURI(), "ProducerReference"));
-
-            if (producerReferenceEl != null) {
-                try {
-                    producerReferenceElString = producerReferenceEl.toStringWithConsume();
-                } catch (XMLStreamException e) {
-                    logger.error("exception occured while creating notification consumer", e);
-
-                }
-                additionalMessageContent.setProducerReference(producerReferenceElString);
-            }
-
-            OMElement notificationMessageEl = wrappedMessageEl.getFirstChildWithName(
-                    new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement();
-
-            String message = null;
-            try {
-                message = notificationMessageEl.toStringWithConsume();
-            } catch (XMLStreamException e) {
-                logger.error("exception occured while creating notification consumer", e);
-                throw new AxisFault("unable to serialize the message", e);
-            }
-
-            matchAndSave(message, topicLocalString, additionalMessageContent);
-
-        }
-        if (noElements) {
-            throw new AxisFault("at least one element is required");
-        }
-    }
-
-    private void matchAndSave(String notificationMessage, String topicLocalString,
-            AdditionalMessageContent additionalMessageContent) {
-
-        List<ConsumerInfo> matchedConsumers = new LinkedList<ConsumerInfo>();
-
-        // not use incoming queue
-        // This is a fix for the bug seen in yfilter.
-        try {
-
-            for (AbstractMessageMatcher matcher : wsmgConfigContext.getMessageMatchers()) {
-                matcher.populateMatches(null, additionalMessageContent, notificationMessage, topicLocalString,
-                        matchedConsumers);
-            }
-
-            save(matchedConsumers, notificationMessage, additionalMessageContent);
-
-        } catch (RuntimeException e) {
-            logger.error("Caught RuntimeException", e);
-        }
-
-    }
-
-    public void save(List<ConsumerInfo> consumerInfoList, String message,
-            AdditionalMessageContent additionalMessageContent) {
-
-        if (consumerInfoList.size() == 0) // No subscription
-            return;
-
-        RunTimeStatistics.addNewNotificationMessageSize(message.length());
-        OutGoingMessage outGoingMessage = new OutGoingMessage();
-        outGoingMessage.setTextMessage(message);
-        outGoingMessage.setConsumerInfoList(consumerInfoList);
-        outGoingMessage.setAdditionalMessageContent(additionalMessageContent);
-
-        outgoingQueue.storeNotification(outGoingMessage, getNextMsgId());
-
-        if (WSMGParameter.showTrackId)
-            logger.info(additionalMessageContent.getTrackId() + ": putIn Outgoing queue.");
-    }
-
-    private void handleExtendedNotifications(ProcessingContext ctx, OMNamespace protocolNs) throws OMException {
-        // AMQP
-        amqpNotificationProcessor.notify(ctx, protocolNs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
deleted file mode 100644
index 39970ec..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.amqp;
-
-import org.apache.airavata.common.utils.ApplicationSettings;
-import org.apache.airavata.wsmg.client.amqp.*;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMException;
-import org.apache.axiom.om.OMNamespace;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Element;
-
-import javax.xml.namespace.QName;
-
-/**
- * AMQPNotificationProcessor handles AMQP-specific notification processing.
- */
-public class AMQPNotificationProcessor {
-
-    private static final Logger logger = LoggerFactory.getLogger(AMQPNotificationProcessor.class);
-    
-    private boolean amqpEnabled = false;
-    private AMQPSender amqpSender = null;
-    private AMQPTopicSender amqpTopicSender = null;
-    private AMQPBroadcastSender amqpBroadcastSender = null;
-
-    public void init() {
-        String amqpEnabledAppSetting = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_ENABLE, "");
-        if (!amqpEnabledAppSetting.isEmpty() && (1 == Integer.parseInt(amqpEnabledAppSetting))) {
-            try {
-                String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost");
-                String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672");
-                String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest");
-                String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest");
-
-                Properties properties = new Properties();
-                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host);
-                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port);
-                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username);
-                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password);
-
-                String className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_SENDER, "");
-                Class clazz = Class.forName(className);
-                amqpSender = (AMQPSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
-
-                className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_TOPIC_SENDER, "");
-                clazz = Class.forName(className);
-                amqpTopicSender = (AMQPTopicSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
-
-                className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_BROADCAST_SENDER, "");
-                clazz = Class.forName(className);
-                amqpBroadcastSender = (AMQPBroadcastSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
-
-                Element routingKeys = AMQPUtil.loadRoutingKeys();
-                if (routingKeys != null) {
-                    ((AMQPRoutingAwareClient)amqpSender).init(routingKeys);
-                    ((AMQPRoutingAwareClient)amqpTopicSender).init(routingKeys);
-                    ((AMQPRoutingAwareClient)amqpBroadcastSender).init(routingKeys);
-                }
-
-                amqpEnabled = true;
-            } catch (Exception ex) {
-                logger.error(ex.getMessage());
-            }
-        }
-    }
-
-    public void notify(ProcessingContext ctx, OMNamespace protocolNs) throws OMException {
-        if (amqpEnabled) {
-            // Extract messages
-            List<OMElement> messages = new ArrayList<OMElement>();
-            if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {
-                // WSNT
-                OMElement messageElements = ctx.getSoapBody().getFirstElement();
-                for (Iterator<OMElement> ite = messageElements.getChildrenWithLocalName("NotificationMessage"); ite.hasNext(); ) {
-                    try {
-                        OMElement messageElement = ite.next();
-                        OMElement message = messageElement.getFirstChildWithName(
-                                new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement();
-                        messages.add(message);
-                    } catch (NullPointerException e) {
-                        throw new OMException(e);
-                    }
-                }
-            } else {
-                // WSE
-                OMElement message = ctx.getSoapBody().getFirstElement();
-                if (message != null) {
-                    messages.add(message);
-                }
-            }
-
-            // Dispatch messages
-            try {
-                for (OMElement message : messages) {
-                    amqpBroadcastSender.Send(message);
-                    amqpTopicSender.Send(message);
-                    amqpSender.Send(message);
-                }
-            } catch (AMQPException e) {
-                logger.warn("Failed to send AMQP notification.[Reason=" + e.getMessage() + "]");
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java
deleted file mode 100644
index 8c097d4..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMElement;
-
-public class ContextParameterInfo<T> {
-
-    private Class<T> parameterType;
-    private String parameterName;
-
-    public ContextParameterInfo(Class<T> type, String name) {
-        parameterType = type;
-        parameterName = name;
-
-    }
-
-    public Class<T> getParameterType() {
-        return parameterType;
-    }
-
-    public String getParameterName() {
-        return parameterName;
-    }
-
-    public T cast(Object obj) {
-
-        return parameterType.cast(obj);
-    }
-
-    public static void main(String[] a) {
-
-        new ContextParameterInfo<OMElement>(OMElement.class, "test");
-        OMAbstractFactory.getOMFactory().createOMElement("testtest", null);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java
deleted file mode 100644
index 61624d8..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.addressing.EndpointReference;
-
-public class ContextParameters {
-
-    private static <V> ContextParameterInfo<V> createParam(Class<V> c, String name) {
-        ContextParameterInfo<V> info = new ContextParameterInfo<V>(c, name);
-
-        return info;
-    }
-
-    public static ContextParameterInfo<String> RESOURCE_ID = createParam(String.class, "resourceID");
-
-    public static final ContextParameterInfo<String> SUB_ID = createParam(String.class, "subID");
-
-    public static final ContextParameterInfo<String> TOPIC_FROM_URL = createParam(String.class, "topicFromUrl");
-
-    public static final ContextParameterInfo<String> SOAP_ACTION = createParam(String.class, "soapAction");
-
-    public static final ContextParameterInfo<SubscriptionState> SUBSCRIPTION = createParam(SubscriptionState.class,
-            "subscription");
-
-    public static final ContextParameterInfo<String> SUBSCRIBER_EXPIRES = createParam(String.class, "subscriberExpires");
-
-    public ContextParameterInfo<String> USE_NOTIFY_TEXT = createParam(String.class, "useNotifyText");
-
-    public static final ContextParameterInfo<OMElement> USE_NOTIFY_ELEMENT = createParam(OMElement.class, "useNotifyEl");
-
-    public static final ContextParameterInfo<OMElement> NOTIFY_TO_ELEMENT = createParam(OMElement.class, "NotifyTo");
-
-    public static final ContextParameterInfo<EndpointReference> NOTIFY_TO_EPR = createParam(EndpointReference.class,
-            "NotifyToEPR");
-
-    public static final ContextParameterInfo<OMElement> SUB_POLICY = createParam(OMElement.class, "subPolicy");
-
-    public static final ContextParameterInfo<OMElement> FILTER_ELEMENT = createParam(OMElement.class, "filterElement");
-
-    public static final ContextParameterInfo<OMElement> TOPIC_EXPRESSION_ELEMENT = createParam(OMElement.class,
-            "topicExpressionEl");
-
-    public static final ContextParameterInfo<OMElement> XPATH_ELEMENT = createParam(OMElement.class, "xpathEl");
-
-    public static final ContextParameterInfo<OMElement> SUBSCRIBE_ELEMENT = createParam(OMElement.class, "subscribeElement");
-
-    public static final ContextParameterInfo<EndpointReference> SUBSCRIBE_ELEMENT_EPR = createParam(EndpointReference.class,
-            "subscribeElement");
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java
deleted file mode 100644
index 06a50ca..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.axiom.soap.SOAPBody;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axis2.context.MessageContext;
-
-public class ProcessingContext {
-
-    private Map<ContextParameterInfo<? extends Object>, Object> contextInfo = new HashMap<ContextParameterInfo<? extends Object>, Object>();
-
-    private List<OMNamespace> responseMsgNameSpaces;
-
-    private MessageContext messageContext = null;
-
-    private SOAPEnvelope envelope; // Used for WSe notification messages.topics
-    // are
-    // in header.
-
-    private OMElement respMessage;
-
-    private SubscriptionState subscription = null;
-
-    public SOAPEnvelope getEnvelope() {
-        return envelope;
-    }
-
-    public void setEnvelope(SOAPEnvelope envelope) {
-        this.envelope = envelope;
-    }
-
-    public SOAPBody getSoapBody() {
-
-        return envelope.getBody();
-    }
-
-    public OMElement getRespMessage() {
-        return respMessage;
-    }
-
-    public void setRespMessage(OMElement respMessage) {
-        this.respMessage = respMessage;
-    }
-
-    public SubscriptionState getSubscription() {
-        return subscription;
-    }
-
-    public void setSubscription(SubscriptionState subscription) {
-        this.subscription = subscription;
-    }
-
-    public void setMessageConext(MessageContext msgContext) {
-        this.messageContext = msgContext;
-    }
-
-    public MessageContext getMessageContext() {
-        return messageContext;
-    }
-
-    public void addResponseMsgNameSpaces(OMNamespace ns) {
-
-        if (responseMsgNameSpaces == null) {
-            responseMsgNameSpaces = new ArrayList<OMNamespace>();
-        }
-
-        if (!responseMsgNameSpaces.contains(ns)) {
-            responseMsgNameSpaces.add(ns);
-        }
-    }
-
-    public List<OMNamespace> getResponseMsgNamespaces() {
-        return responseMsgNameSpaces;
-    }
-
-    public void setContextParameter(ContextParameterInfo<?> name, Object value) {
-        contextInfo.put(name, value);
-    }
-
-    public <T> T getContextParameter(ContextParameterInfo<T> name) {
-
-        Object o = contextInfo.get(name);
-        return name.cast(o);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java
deleted file mode 100644
index 9d4d4ea..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import org.apache.axiom.om.OMElement;
-
-public abstract class ProcessingContextBuilder {
-
-    public abstract ProcessingContext build(OMElement elem);
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java
deleted file mode 100644
index ca23506..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.handler;
-
-import java.util.List;
-
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.AddressingFaultsHelper;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.dispatchers.AddressingBasedDispatcher;
-import org.apache.axis2.engine.Phase;
-import org.apache.axis2.util.JavaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PublishedMessageHandler extends AddressingBasedDispatcher {
-
-    private static final Logger logger = LoggerFactory.getLogger(PublishedMessageHandler.class);
-
-    private static final String ADDRESSING_VALIDATE_ACTION = "addressing.validateAction";
-
-    private AxisOperation publishOperation = null;
-
-    private Phase addressingPhase = null;
-
-    public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
-
-        InvocationResponse response = InvocationResponse.CONTINUE;
-
-        if (msgContext.getAxisService() == null || msgContext.getAxisOperation() == null) {
-            boolean validateAction = JavaUtils.isTrue(msgContext.getProperty(ADDRESSING_VALIDATE_ACTION), true);
-            msgContext.setProperty(ADDRESSING_VALIDATE_ACTION, Boolean.valueOf(false));
-            response = super.invoke(msgContext);
-            if (isForBrokerEventingService(msgContext))
-                validateBrokerWSEventingOperation(msgContext);
-            if (validateAction)
-                checkAction(msgContext);
-            msgContext.setProperty(ADDRESSING_VALIDATE_ACTION, Boolean.valueOf(validateAction));
-
-        }
-
-        return response;
-    }
-
-    private void validateBrokerWSEventingOperation(MessageContext msgContext) {
-        if (msgContext.getAxisOperation() == null) {
-            AxisService service = msgContext.getAxisService();
-            AxisOperation pubOperation = getPublishOperation(service);
-            msgContext.setAxisOperation(pubOperation);
-        }
-    }
-
-    private boolean isForBrokerEventingService(MessageContext msgContext) {
-        return msgContext.getAxisService() != null && msgContext.getAxisService().getName().equals("EventingService");
-    }
-
-    private AxisOperation getPublishOperation(AxisService publisherService) {
-        if (publishOperation == null)
-            publishOperation = publisherService.getOperationBySOAPAction(WsmgCommonConstants.WSMG_PUBLISH_SOAP_ACTION);
-        return publishOperation;
-    }
-
-    private Phase getAddressingPhase(MessageContext context) {
-
-        if (addressingPhase == null) {
-
-            List<Phase> inFlowPhases = context.getConfigurationContext().getAxisConfiguration().getPhasesInfo()
-                    .getINPhases();
-
-            for (Phase p : inFlowPhases) {
-                if (p.getName().equalsIgnoreCase("Addressing")) {
-                    addressingPhase = p;
-                }
-            }
-
-        }
-
-        return addressingPhase;
-
-    }
-
-    private void checkAction(MessageContext msgContext) throws AxisFault {
-
-        Phase addPhase = getAddressingPhase(msgContext);
-
-        if (addPhase == null) {
-            logger.error("unable to locate addressing phase object");
-        }
-        if (msgContext != null) {
-            if (msgContext.getCurrentPhaseIndex() + 1 == addPhase.getHandlerCount()) {
-                if (msgContext.getAxisService() == null || msgContext.getAxisOperation() == null)
-                    AddressingFaultsHelper.triggerActionNotSupportedFault(msgContext, msgContext.getWSAAction());
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java
deleted file mode 100644
index bcd4ec1..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.subscription;
-
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.airavata.wsmg.commons.CommonRoutines;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.axis2.AxisFault;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class CleanUpThread implements Runnable {
-
-    private static final Logger logger = LoggerFactory.getLogger(CleanUpThread.class);
-
-    private SubscriptionManager subMan;
-
-    public CleanUpThread(SubscriptionManager manager) {
-        this.subMan = manager;
-    }
-
-    public void run() {
-        logger.debug("CleanUpThread started");
-        String key = null;
-        SubscriptionState subscription = null;
-        Set<String> keySet = null;
-        // long expirationTime=300000*12*24; //5 min*12*24=1 day
-        final long expirationTime = WSMGParameter.expirationTime;
-        final long skipCheckInterval = 1000 * 60 * 10; // 10 minutes
-        final long checkupInterval = 1000 * 60 * 5; // 5 minutes
-        int MAX_TRY = 3;
-        logger.info("Starting Subscription Cleaning up Thread.");
-        while (true) {
-            long currentTime = System.currentTimeMillis();
-            long expiredStartTime = 0;
-            if (WSMGParameter.requireSubscriptionRenew) {
-                expiredStartTime = currentTime - expirationTime; // expired
-            }
-            long availabilityCheckTime = 0;
-            availabilityCheckTime = currentTime - skipCheckInterval; // It's
-            // time
-            // to
-            // check
-            // again
-
-            // logger.finest("CleanUpThread loop");
-            keySet = subMan.getShallowSubscriptionsCopy().keySet();
-            // Go through all the subscriptions and delete expired ones
-            for (Iterator<String> iterator = keySet.iterator(); iterator.hasNext();) {
-                key = iterator.next();
-                subscription = subMan.getShallowSubscriptionsCopy().get(key);
-                if (subscription.isNeverExpire()) {
-                    continue;
-                }
-                long subscriptionCreationTime = subscription.getCreationTime();
-                long lastAvailableTime = subscription.getLastAvailableTime();
-                if (WSMGParameter.requireSubscriptionRenew) { // expired
-                    if (subscriptionCreationTime < expiredStartTime) { // expired
-                        // or
-                        // need
-                        // to
-                        // check
-                        // again
-                        try {
-                            subMan.removeSubscription(key);
-                        } catch (AxisFault e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                        // Not need to remove the key from the keyset since
-                        // the keyset
-                        // "is backed by the map, so changes to the map are reflected in the set, and vice-versa."
-                        // i.remove(); //add this will cause
-                        // ConcurrentModificationException
-                        logger.info("*****Deleted (expiration)" + key + "-->" + subscription.getConsumerIPAddressStr()
-                                + "##" + subscription.getLocalTopic());
-                        logger.info("*****Deleted (expiration)" + key + "-->" + subscription.getConsumerIPAddressStr()
-                                + "##" + subscription.getLocalTopic());
-                        continue;
-                    }
-                }
-                if (lastAvailableTime < availabilityCheckTime) {
-                    // It's time to check again
-                    if (CommonRoutines.isAvailable(subscription.getConsumerAddressURI())) {
-                        // It's time to check but still available and do not
-                        // require subscriptio renew
-                        // set a mark saying it has been check at this time
-                        subscription.setLastAvailableTime(currentTime);
-                        if (subscription.getUnAvailableCounter() > 0) { // failed
-                            // in
-                            // previous
-                            // try
-                            subscription.resetUnAvailableCounter();
-                        }
-                    } else {
-                        int counter = subscription.addUnAvailableCounter();
-                        // System.out.println("UnavailableCounter="+counter);
-                        // logger.finest("UnavailableCounter="+counter);
-                        if (counter > MAX_TRY) {
-                            try {
-                                subMan.removeSubscription(key);
-                            } catch (AxisFault e) {
-                                // TODO Auto-generated catch block
-                                logger.error(e.getMessage(), e);
-                            }
-
-                            // Remove from hashtable seperately to avoid
-                            // conccurent access problem to the hashtable
-                            // with
-                            // i.next()
-                            iterator.remove();
-                            logger.info("*****Deleted (unavailable)" + key + "-->"
-                                    + subscription.getConsumerIPAddressStr() + "##" + subscription.getLocalTopic());
-                            logger.info("*****Deleted (unavailable)" + key + "-->"
-                                    + subscription.getConsumerIPAddressStr() + "##" + subscription.getLocalTopic());
-                        }
-                    }
-                }
-            }
-            try {
-                Thread.sleep(checkupInterval);
-            } catch (InterruptedException e) {
-                logger.error("thread was interrupped", e);
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java
deleted file mode 100644
index a9339fd..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.subscription;
-
-public class SubscriptionEntry {
-
-    private String subscriptionId;
-
-    private String subscribeXml;
-
-    public SubscriptionEntry() {
-    }
-
-    public String getSubscriptionId() {
-        return subscriptionId;
-    }
-
-    public String getSubscribeXml() {
-        return subscribeXml;
-    }
-
-    public void setSubscriptionId(String subscriptionId) {
-        this.subscriptionId = subscriptionId;
-    }
-
-    public void setSubscribeXml(String subscribeXml) {
-        this.subscribeXml = subscribeXml;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java
deleted file mode 100644
index e42e4c1..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.subscription;
-
-import java.io.StringReader;
-import java.util.AbstractMap;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-
-import org.apache.airavata.wsmg.broker.context.ContextParameters;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.broker.context.ProcessingContextBuilder;
-import org.apache.airavata.wsmg.broker.wseventing.WSEProcessingContextBuilder;
-import org.apache.airavata.wsmg.broker.wseventing.WSEProtocolSupport;
-import org.apache.airavata.wsmg.broker.wsnotification.WSNTProtocolSupport;
-import org.apache.airavata.wsmg.broker.wsnotification.WSNotificationProcessingContextBuilder;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
-import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.apache.airavata.wsmg.util.RunTimeStatistics;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.impl.builder.StAXOMBuilder;
-import org.apache.axis2.AxisFault;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages subscribers.
- * 
- */
-public class SubscriptionManager {
-
-    private static final Logger log = LoggerFactory.getLogger(SubscriptionManager.class);
-
-    private HashMap<String, SubscriptionState> subscriptions = new HashMap<String, SubscriptionState>();
-
-    private ReentrantReadWriteLock subscriptionLock = new ReentrantReadWriteLock();
-
-    private WSEProtocolSupport wseProtocalSupport = new WSEProtocolSupport();
-
-    private WSNTProtocolSupport wsntProtocolSupport = new WSNTProtocolSupport();
-
-    private WsmgStorage subscriptionDB;
-
-    private WsmgConfigurationContext wsmgConfig;
-
-    private OutGoingQueue outGoingQueue;
-
-    private int counter = 1;
-
-    public SubscriptionManager(WsmgConfigurationContext paramters, WsmgStorage storage) {
-        init(paramters, storage);
-    }
-
-    private void init(WsmgConfigurationContext parameters, WsmgStorage storage) {
-
-        this.wsmgConfig = parameters;
-
-        subscriptionDB = storage;
-        outGoingQueue = parameters.getOutgoingQueue();
-        if (WSMGParameter.enableAutoCleanSubscriptions) {
-            CleanUpThread cleanUpThread = new CleanUpThread(this);
-            Thread t = new Thread(cleanUpThread);
-            t.start();
-        }
-
-        try {
-            checkSubscriptionDB(storage);
-        } catch (AxisFault e) {
-            log.error("Subscription database has malformed" + " subscriptions. Ignoring them.", e);
-
-        }
-
-    }
-
-    /**
-     * @return Returns the subscriptions.
-     */
-    public AbstractMap<String, SubscriptionState> getShallowSubscriptionsCopy() {
-
-        AbstractMap<String, SubscriptionState> ret = null;
-        readLockUnlockSubscriptions(true);
-        try {
-            ret = new HashMap<String, SubscriptionState>(subscriptions);
-        } finally {
-            readLockUnlockSubscriptions(false);
-        }
-
-        return ret;
-
-    }
-
-    public void subscribe(ProcessingContext ctx) throws AxisFault {
-
-        String subId = createSubscription(null, ctx);
-        if (subId == null) {
-            log.error("ERROR: No subscription created");
-            return;
-        }
-
-        if (NameSpaceConstants.WSE_NS.equals(ctx.getContextParameter(ContextParameters.SUBSCRIBE_ELEMENT)
-                .getNamespace())) {
-            wseProtocalSupport.createSubscribeResponse(ctx, subId);
-
-        } else { // WSNT
-
-            wsntProtocolSupport.createSubscribeResponse(ctx, subId);
-        }
-    }
-
-    /**
-     * @param subscriptionId
-     *            this is the ID that is in the SOAP header
-     * @param ctx
-     *            contexts constructed with the body elements
-     * @return subscription id
-     * @throws AxisFault
-     */
-    private String createSubscription(String subscriptionId, ProcessingContext ctx) throws AxisFault {
-
-        SubscriptionState state = null;
-        String key = null;
-
-        // get the first element element inside the soap body element and check
-        // whether namespace is WSE
-        if (NameSpaceConstants.WSE_NS.equals(ctx.getContextParameter(ContextParameters.SUBSCRIBE_ELEMENT)
-                .getNamespace())) {
-            state = wseProtocalSupport.createSubscriptionState(ctx, outGoingQueue);
-        } else { // Handle WSNT
-
-            state = wsntProtocolSupport.createSubscriptionState(ctx, outGoingQueue);
-        }
-
-        if (subscriptionId == null) { // New subscription entry
-            key = checkSubscriptionExist(state);
-            if (key != null) { // just renew previous subscriptions
-                return key;
-            }
-            // new subscriptions
-
-            state.setCreationTime(System.currentTimeMillis());
-
-            key = generateSubscriptionId(state.getXpathString() != null && state.getXpathString().length() > 0);
-
-        } else { // Startup from previous subscription database
-            key = subscriptionId;
-        }
-
-        for (AbstractMessageMatcher m : wsmgConfig.getMessageMatchers()) {
-            m.handleSubscribe(state, key);
-        }
-
-        if (subscriptionId == null) { // New subscription entry,
-
-            RunTimeStatistics.totalSubscriptions++;
-            try {
-                String subscribeXml = ctx.getContextParameter(ContextParameters.SUBSCRIBE_ELEMENT)
-                        .toStringWithConsume();
-
-                state.setId(key);
-                state.setSubscribeXml(subscribeXml);
-                subscriptionDB.insert(state);
-
-            } catch (Exception ex) {
-                log.error("unable to insert subscription to database", ex);
-                throw new AxisFault("unable to insert subscription to database ", ex);
-            }
-        }
-
-        addToSubscriptionMap(key, state);
-        return key;
-    }
-
-    private void addToSubscriptionMap(String key, SubscriptionState state) {
-
-        writeLockUnlockSubscription(true);
-        try {
-            subscriptions.put(key, state);
-        } finally {
-            writeLockUnlockSubscription(false);
-        }
-
-    }
-
-    /**
-     * @param xpathString
-     * @return
-     */
-    private String generateSubscriptionId(boolean xPath) {
-        String key;
-        String subIdPrefix = null; // Used to indicate weather a subscription
-        // has an XPath subscription Or Topic
-        // only.
-        if (!xPath) {
-            subIdPrefix = "T";
-        } else {
-            subIdPrefix = "X";
-        }
-        key = subIdPrefix + "sub" + (counter++) + "@" + WsmgCommonConstants.PREFIX;
-        return key;
-    }
-
-    /**
-     * if find the subscription already exists, return the current subscriptionId else return null;
-     */
-
-    public String checkSubscriptionExist(SubscriptionState state) {
-
-        String key = null;
-
-        readLockUnlockSubscriptions(true);
-        try {
-
-            for (Iterator<String> keyIterator = subscriptions.keySet().iterator(); keyIterator.hasNext();) {
-
-                String currentKey = keyIterator.next();
-                SubscriptionState value = subscriptions.get(currentKey);
-
-                if (value.equals(state)) {
-                    value.setCreationTime(System.currentTimeMillis());
-                    log.info("Subscription Already exists." + " Using the current subscriptionId");
-                    key = currentKey;
-                    break;
-                }
-
-            }
-
-        } finally {
-            readLockUnlockSubscriptions(false);
-        }
-
-        return key;
-    }
-
-    public void checkSubscriptionDB(WsmgStorage storage) throws AxisFault {
-        OMElement subscribeXmlElement;
-        String subscriptionId;
-        // Read subscription Info from Subscription DB
-        List<SubscriptionEntry> subscriptionEntry = storage.getAllSubscription();
-        if (subscriptionEntry == null) {
-            return;
-        }
-
-        WSNotificationProcessingContextBuilder wsntBuilder = new WSNotificationProcessingContextBuilder();
-        WSEProcessingContextBuilder wseBuilder = new WSEProcessingContextBuilder();
-
-        // Create subscription for these entries from DB
-        for (int i = 0; i < subscriptionEntry.size(); i++) {
-
-            ProcessingContextBuilder processingCtxBuilder = null;
-
-            log.info("Subscription No. " + i + " is " + subscriptionEntry.get(i).getSubscriptionId());
-
-            StringReader sr = new StringReader(subscriptionEntry.get(i).getSubscribeXml());
-            XMLInputFactory inputFactory = XMLInputFactory.newInstance();
-            XMLStreamReader inflow;
-            try {
-                inflow = inputFactory.createXMLStreamReader(sr);
-
-                StAXOMBuilder builder = new StAXOMBuilder(inflow); // get the
-                // root
-                // element (in
-                // this case the
-                // envelope)
-                subscribeXmlElement = builder.getDocumentElement();
-
-                if (subscribeXmlElement.getNamespace().getNamespaceURI()
-                        .equals(NameSpaceConstants.WSNT_NS.getNamespaceURI())) {
-                    processingCtxBuilder = wsntBuilder;
-
-                } else {
-                    processingCtxBuilder = wseBuilder;
-                }
-
-                subscriptionId = subscriptionEntry.get(i).getSubscriptionId();
-
-                ProcessingContext context = processingCtxBuilder.build(subscribeXmlElement);
-                createSubscription(subscriptionId, context);
-
-            } catch (XMLStreamException e) {
-                log.error("error occured while checking subscription db", e);
-            }
-        }
-        RunTimeStatistics.totalSubscriptionsAtStartUp += subscriptionEntry.size();
-    }
-
-    // This is used for debug
-    public void showAllSubscription() {
-        String key = null;
-        SubscriptionState value = null;
-        Set<String> keySet = subscriptions.keySet();
-        log.info("List of all subscriptions:");
-        for (Iterator<String> iterator = keySet.iterator(); iterator.hasNext();) {
-            key = iterator.next();
-            value = subscriptions.get(key);
-            log.info("******" + key + "-->" + value.getConsumerIPAddressStr() + "##" + value.getLocalTopic());
-        }
-    }
-
-    public int unsubscribe(ProcessingContext ctx) throws AxisFault {
-
-        String subscriptionId = ctx.getContextParameter(ContextParameters.SUB_ID);
-        if (subscriptionId == null || subscriptionId.trim().length() == 0) {
-            throw new AxisFault("subscription identifier is not provided");
-        }
-
-        removeSubscription(subscriptionId);
-        RunTimeStatistics.totalUnSubscriptions++;
-        return 0;
-    }
-
-    int removeSubscription(String subId) throws AxisFault {
-
-        SubscriptionState subscription = null;
-
-        writeLockUnlockSubscription(true);
-        try {
-            subscription = subscriptions.remove(subId);
-        } finally {
-            writeLockUnlockSubscription(false);
-        }
-
-        if (subscription == null) {
-            throw AxisFault.makeFault(new RuntimeException("unknown subscription: " + subId));
-
-        }
-
-        subscriptionDB.delete(subId);
-
-        for (AbstractMessageMatcher mm : wsmgConfig.getMessageMatchers()) {
-            mm.handleUnsubscribe(subId);
-        }
-
-        return 0;
-    }
-
-    public void resumeSubscription(ProcessingContext ctx) throws AxisFault {
-
-        String subscriptionId = ctx.getContextParameter(ContextParameters.SUB_ID);
-
-        if (subscriptionId == null) {
-            throw AxisFault.makeFault(new RuntimeException("missing subscription id"));
-        }
-
-        writeLockUnlockSubscription(true);// lock
-        try {
-            SubscriptionState subscription = subscriptions.get(subscriptionId);
-
-            if (subscription == null) {
-
-                throw AxisFault.makeFault(new RuntimeException("no subscription found for id: " + subscriptionId));
-            }
-
-            subscription.resume();
-        } finally {
-            // this will execute even exception is thrown.
-            writeLockUnlockSubscription(false);
-        }
-    }
-
-    public void pauseSubscription(ProcessingContext ctx) throws AxisFault {
-
-        String subscriptionId = ctx.getContextParameter(ContextParameters.SUB_ID);
-
-        if (subscriptionId == null) {
-            throw AxisFault.makeFault(new RuntimeException("missing subscription id"));
-        }
-
-        writeLockUnlockSubscription(true);// read lock should be sufficient
-        // (since we are not modifying the
-        // map)
-        try {
-            SubscriptionState subscription = subscriptions.get(subscriptionId);
-
-            if (subscription == null) {
-
-                throw AxisFault.makeFault(new RuntimeException("no subscription found for id: " + subscriptionId));
-
-            }
-
-            subscription.pause();
-        } finally {
-            // this will execute even exception is thrown.
-            writeLockUnlockSubscription(false);
-        }
-    }
-
-    public void readLockUnlockSubscriptions(boolean lock) {
-        ReadLock readlock = subscriptionLock.readLock();
-        lockUnlock(readlock, lock);
-    }
-
-    public void writeLockUnlockSubscription(boolean lock) {
-        WriteLock writeLock = subscriptionLock.writeLock();
-        lockUnlock(writeLock, lock);
-    }
-
-    private void lockUnlock(Lock l, boolean lock) {
-
-        if (lock) {
-            l.lock();
-        } else {
-            l.unlock();
-        }
-
-    }
-}