You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2011/07/03 17:51:46 UTC

svn commit: r1142453 [8/12] - in /incubator/airavata/ws-messaging/trunk/messagebroker: ./ .settings/ customLibs/ customLibs/activeMQ/ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/airavata/ src/main...

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol;
+
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.WsmgNameSpaceConstants;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.util.ElementHelper;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeaderBlock;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
+import org.apache.log4j.Logger;
+
+public class Axis2Protocol implements DeliveryProtocol {
+
+    Logger logger = Logger.getLogger(Axis2Protocol.class);
+
+    SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();
+
+    ServiceClient nonThreadLocalServiceClient = null;
+
+    long tcpConnectionTimeout = 0;
+
+    @Override
+    public void setTimeout(long timeout) {
+        this.tcpConnectionTimeout = timeout;
+    }
+
+    @Override
+    public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent)
+            throws SendingException {
+        EndpointReference consumerReference = new EndpointReference(consumerInfo.getConsumerEprStr());
+
+        /*
+         * Extract information
+         */
+        String actionString = null;
+        List<OMElement> soapHeaders = new LinkedList<OMElement>();
+        if (consumerInfo.getType().compareTo("wsnt") == 0) {
+            actionString = WsmgNameSpaceConstants.WSNT_NS.getNamespaceURI() + "/Notify";
+        } else { // wse
+            actionString = additionalMessageContent.getAction();
+            String topicElString = additionalMessageContent.getTopicElement();
+            if (topicElString != null) {
+                OMElement topicEl = null;
+                try {
+                    topicEl = CommonRoutines.reader2OMElement(new StringReader(topicElString));
+                    soapHeaders.add(topicEl);
+                } catch (XMLStreamException e) {
+                    logger.fatal("exception at topicEl xmlStreamException", e);
+                }
+            }
+        }
+
+        try {
+
+            ServiceClient client = configureServiceClient(actionString, consumerReference,
+                    additionalMessageContent.getMessageID(), soapHeaders);
+
+            client.sendRobust(message);
+            client.cleanupTransport();
+
+        } catch (AxisFault ex) {
+            throw new SendingException(ex.getCause());
+        }
+    }
+
+    private ServiceClient getServiceClient() throws AxisFault {
+
+        ServiceClient ret = nonThreadLocalServiceClient;
+        if (ret == null) {
+            ret = new ServiceClient();
+
+            nonThreadLocalServiceClient = ret;
+        }
+        ret.removeHeaders();
+        return ret;
+    }
+
+    private ServiceClient configureServiceClient(String action, EndpointReference consumerLocation, String msgId,
+            List<OMElement> soapHeaders) throws AxisFault {
+
+        // not engaging addressing modules
+
+        ServiceClient client = getServiceClient();
+
+        SOAPHeaderBlock msgIdEl = soapfactory.createSOAPHeaderBlock("MessageID", WsmgNameSpaceConstants.WSA_NS);
+        msgIdEl.setText(msgId);
+        SOAPHeaderBlock actionEl = soapfactory.createSOAPHeaderBlock("Action", WsmgNameSpaceConstants.WSA_NS);
+        actionEl.setText(action);
+
+        SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", WsmgNameSpaceConstants.WSA_NS);
+        to.setText(consumerLocation.getAddress());
+
+        client.addHeader(actionEl);
+        client.addHeader(msgIdEl);
+        client.addHeader(to);
+
+        for (OMElement omHeader : soapHeaders) {
+
+            try {
+                SOAPHeaderBlock headerBlock = ElementHelper.toSOAPHeaderBlock(omHeader, soapfactory);
+
+                client.addHeader(headerBlock);
+            } catch (Exception e) {
+                throw AxisFault.makeFault(e);
+            }
+
+        }
+
+        Options opts = new Options();
+        opts.setTimeOutInMilliSeconds(tcpConnectionTimeout);
+        opts.setMessageId(msgId);
+        opts.setTo(consumerLocation);
+        opts.setAction(action);
+        opts.setProperty(org.apache.axis2.transport.http.HTTPConstants.CHUNKED, Boolean.FALSE);
+
+        opts.setProperty(org.apache.axis2.transport.http.HTTPConstants.HTTP_PROTOCOL_VERSION,
+                org.apache.axis2.transport.http.HTTPConstants.HEADER_PROTOCOL_10);
+        client.setOptions(opts);
+
+        return client;
+    }
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.axiom.om.OMElement;
+
+public interface DeliveryProtocol {
+
+    public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent)
+            throws SendingException;
+
+    public void setTimeout(long timeout);
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol;
+
+import org.apache.axis2.AxisFault;
+
+public class SendingException extends AxisFault {
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 6250791562500752579L;
+
+    public SendingException(Throwable cause) {
+        super(cause);
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy;
+
+public interface SendingStrategy {
+    public void start();
+
+    public void shutdown();
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,241 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
+import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.apache.axiom.om.OMElement;
+import org.apache.log4j.Logger;
+
+public class FixedParallelSender extends Thread implements SendingStrategy {
+
+    private Logger log = Logger.getLogger(FixedParallelSender.class);
+
+    private ConcurrentHashMap<String, ConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ConsumerHandler>();
+
+    private ThreadCrew threadCrew = null;
+
+    private ConsumerUrlManager urlManager = null;
+    private ConfigurationManager configManager = null;
+
+    private long consumerHandlerIdCounter;
+
+    private boolean stopFlag = false;
+
+    public FixedParallelSender(ConfigurationManager config, ConsumerUrlManager urlMan) {
+
+        int poolSize = config.getConfig(WsmgCommonConstants.CONFIG_SENDING_THREAD_POOL_SIZE,
+                WsmgCommonConstants.DEFAULT_SENDING_THREAD_POOL_SIZE);
+
+        threadCrew = new ThreadCrew(poolSize);
+        urlManager = urlMan;
+        configManager = config;
+    }
+
+    public void shutdown() {
+        stopFlag = true;
+    }
+
+    public void run() {
+        int dequeuedMessageCounter = 0;
+
+        while (!stopFlag) {
+
+            try {
+
+                if (log.isDebugEnabled())
+                    log.debug("before dequeue -  delivery thread");
+
+                OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
+
+                if (WSMGParameter.showTrackId)
+                    log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
+                            + ": dequeued from outgoing queue");
+
+                distributeOverConsumerQueues(outGoingMessage);
+
+            } catch (Exception e) {
+
+                log.fatal("Unexpected_exception:", e);
+            }
+
+            dequeuedMessageCounter++;
+        }
+
+        threadCrew.stop();
+
+    }
+
+    public void distributeOverConsumerQueues(OutGoingMessage message) {
+        List<ConsumerInfo> consumerInfoList = message.getConsumerInfoList();
+
+        for (ConsumerInfo consumer : consumerInfoList) {
+
+            sendToConsumerHandler(consumer, message);
+
+        }
+
+    }
+
+    private ConsumerHandler sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message) {
+
+        String consumerUrl = consumer.getConsumerEprStr();
+
+        LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
+                message.getAdditionalMessageContent());
+
+        ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+
+        if (handler == null) {
+            handler = new ConsumerHandler(getNextConsumerHandlerId(), consumerUrl, configManager, urlManager);
+            activeConsumerHanders.put(consumerUrl, handler);
+            handler.submitMessage(lwm); // import to submit before execute.
+            threadCrew.submitTask(handler);
+            // (to remove a possible race
+            // condition)
+        } else {
+            handler.submitMessage(lwm);
+        }
+
+        return handler;
+
+    }
+
+    private long getNextConsumerHandlerId() {
+        return ++consumerHandlerIdCounter;
+    }
+
+    class ConsumerHandler implements RunnableEx {
+
+        LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
+
+        final long id;
+        int batchSize;
+
+        ThreadLocal<SenderUtils> threadlocalSender = new ThreadLocal<SenderUtils>();
+
+        // SenderUtils sender = null;
+        String consumerUrl;
+
+        ConfigurationManager configMan;
+        ConsumerUrlManager consumerURLManager;
+
+        public ConsumerHandler(long handlerId, String url, ConfigurationManager config, ConsumerUrlManager urlMan) {
+
+            configMan = config;
+            consumerURLManager = urlMan;
+            // sender = new SenderUtils(urlMan, config, true);
+            id = handlerId;
+            consumerUrl = url;
+
+            batchSize = config.getConfig(WsmgCommonConstants.CONFIG_SENDING_BATCH_SIZE,
+                    WsmgCommonConstants.DEFAULT_SENDING_BATCH_SIZE);
+        }
+
+        public long getId() {
+            return id;
+        }
+
+        public String getConsumerUrl() {
+            return consumerUrl;
+        }
+
+        private SenderUtils getSender() {
+
+            SenderUtils s = threadlocalSender.get();
+
+            if (s == null) {
+                s = new SenderUtils(consumerURLManager, configMan);
+                threadlocalSender.set(s);
+            }
+
+            return s;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+
+            if (o instanceof ConsumerHandler) {
+                ConsumerHandler h = (ConsumerHandler) o;
+                return h.getId() == id && h.getConsumerUrl().equals(this.getConsumerUrl());
+            }
+
+            return false;
+        }
+
+        public void submitMessage(LightweightMsg msg) {
+            queue.add(msg);
+        }
+
+        public void run() {
+
+            if (log.isDebugEnabled())
+                log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
+
+            LinkedList<LightweightMsg> localList = new LinkedList<LightweightMsg>();
+
+            queue.drainTo(localList, batchSize);
+
+            send(localList);
+            localList.clear();
+
+            if (log.isDebugEnabled())
+                log.debug(String.format("calling on completion from : %d,", getId()));
+
+        }
+
+        private void send(LinkedList<LightweightMsg> list) {
+
+            SenderUtils s = getSender();
+
+            while (!list.isEmpty()) {
+
+                LightweightMsg m = list.removeFirst();
+
+                try {
+                    OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
+
+                    s.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
+
+                } catch (Exception e) {
+                    log.fatal(e);
+                }
+
+            }
+
+        }
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+
+class LightweightMsg {
+    private ConsumerInfo consumerInfo;
+    private String payload;
+    private AdditionalMessageContent header;
+
+    public LightweightMsg(ConsumerInfo c, String pld, AdditionalMessageContent h) {
+        consumerInfo = c;
+        payload = pld;
+        header = h;
+    }
+
+    public String getPayLoad() {
+        return payload;
+    }
+
+    public ConsumerInfo getConsumerInfo() {
+        return consumerInfo;
+    }
+
+    public AdditionalMessageContent getHeader() {
+        return header;
+    }
+
+    public String toString() {
+        return String.format("header: %s, consumer: %s, pld: %s", header, consumerInfo.getConsumerEprStr(), payload);
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,300 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
+import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.apache.axiom.om.OMElement;
+import org.apache.log4j.Logger;
+
+public class ParallelSender extends Thread implements SendingStrategy {
+
+    private Logger log = Logger.getLogger(ParallelSender.class);
+
+    private ConcurrentHashMap<String, ConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ConsumerHandler>();
+
+    private final ExecutorService threadPool;
+    private long consumerHandlerIdCounter = 0L;
+    private boolean stopFlag = false;
+
+    private ConsumerUrlManager urlManager = null;
+    private ConfigurationManager configManager = null;
+
+    private ConsumerHandlerCompletionCallback consumerCallback = new ConsumerHandlerCompletionCallback() {
+
+        public void onCompletion(ConsumerHandler h) {
+
+            if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
+
+                if (log.isDebugEnabled())
+                    log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s",
+                            h.getId(), h.getConsumerUrl()));
+            }
+
+        }
+    };
+
+    public ParallelSender(ConfigurationManager config, ConsumerUrlManager urlMan) {
+        urlManager = urlMan;
+        configManager = config;
+
+        threadPool = Executors.newCachedThreadPool();
+
+    }
+
+    public void shutdown() {
+        stopFlag = true;
+    }
+
+    public void run() {
+        int dequeuedMessageCounter = 0;
+
+        while (!stopFlag) {
+
+            try {
+
+                if (log.isDebugEnabled())
+                    log.debug("before dequeue -  delivery thread");
+
+                OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
+
+                if (WSMGParameter.showTrackId)
+                    log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
+                            + ": dequeued from outgoing queue");
+
+                distributeOverConsumerQueues(outGoingMessage);
+
+            } catch (Exception e) {
+
+                log.fatal("Unexpected_exception:", e);
+            }
+
+            dequeuedMessageCounter++;
+        }
+
+        threadPool.shutdown();
+
+    }
+
+    public void distributeOverConsumerQueues(OutGoingMessage message) {
+        List<ConsumerInfo> consumerInfoList = message.getConsumerInfoList();
+
+        for (ConsumerInfo consumer : consumerInfoList) {
+
+            sendToConsumerHandler(consumer, message);
+
+        }
+
+    }
+
+    private ConsumerHandler sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message) {
+
+        String consumerUrl = consumer.getConsumerEprStr();
+
+        LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
+                message.getAdditionalMessageContent());
+
+        ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+
+        if (handler == null || (!handler.isActive())) {
+            handler = new ConsumerHandler(getNextConsumerHandlerId(), consumerUrl, consumerCallback, configManager,
+                    urlManager);
+            activeConsumerHanders.put(consumerUrl, handler);
+            handler.submitMessage(lwm); // import to submit before execute.
+            // (to remove a possible race
+            // condition)
+            threadPool.execute(handler);
+        } else {
+            handler.submitMessage(lwm);
+        }
+
+        return handler;
+    }
+
+    private long getNextConsumerHandlerId() {
+        return ++consumerHandlerIdCounter;
+    }
+
+    interface ConsumerHandlerCompletionCallback {
+
+        public void onCompletion(ConsumerHandler h);
+
+    }
+
+    class ConsumerHandler implements Runnable {
+
+        LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
+
+        ReadWriteLock activeLock = new ReentrantReadWriteLock();
+
+        final long id;
+        final int MAX_UNSUCCESSFULL_DRAINS = 3;
+        final int SLEEP_TIME_SECONDS = 1;
+        int numberOfUnsuccessfullDrainAttempts = 0;
+
+        boolean active = true;
+
+        ConsumerHandlerCompletionCallback callback = null;
+        SenderUtils sender = null;
+        String consumerUrl;
+
+        public ConsumerHandler(long handlerId, String url, ConsumerHandlerCompletionCallback c,
+                ConfigurationManager config, ConsumerUrlManager urlMan) {
+            callback = c;
+            sender = new SenderUtils(urlMan, config);
+            id = handlerId;
+            consumerUrl = url;
+        }
+
+        public long getId() {
+            return id;
+        }
+
+        public String getConsumerUrl() {
+            return consumerUrl;
+        }
+
+        public boolean isActive() {
+
+            boolean ret = false;
+
+            activeLock.readLock().lock();
+            try {
+                ret = active;
+            } finally {
+                activeLock.readLock().unlock();
+            }
+
+            return ret;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+
+            if (o instanceof ConsumerHandler) {
+                ConsumerHandler h = (ConsumerHandler) o;
+                return h.getId() == id && h.getConsumerUrl().equals(this.getConsumerUrl());
+            }
+
+            return false;
+        }
+
+        public void submitMessage(LightweightMsg msg) {
+            queue.add(msg);
+        }
+
+        public void run() {
+
+            if (log.isDebugEnabled())
+                log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
+
+            LinkedList<LightweightMsg> localList = new LinkedList<LightweightMsg>();
+
+            while (active) {
+
+                int drainedMsgs = 0;
+                try {
+                    activeLock.writeLock().lock();
+
+                    drainedMsgs = queue.drainTo(localList);
+
+                    if (drainedMsgs <= 0) {
+                        numberOfUnsuccessfullDrainAttempts++;
+                    } else {
+                        numberOfUnsuccessfullDrainAttempts = 0;
+                    }
+
+                    if (numberOfUnsuccessfullDrainAttempts >= MAX_UNSUCCESSFULL_DRAINS) {
+
+                        log.debug(String.format("inactivating, %d", getId()));
+
+                        active = false;
+                        numberOfUnsuccessfullDrainAttempts = 0;
+                    }
+
+                } finally {
+                    activeLock.writeLock().unlock();
+                }
+
+                send(localList);
+                localList.clear();
+
+                if (numberOfUnsuccessfullDrainAttempts > 0) {
+                    waitForMessages();
+                }
+
+            }
+
+            if (log.isDebugEnabled())
+                log.debug(String.format("calling on completion from : %d,", getId()));
+
+            callback.onCompletion(this);
+
+        }
+
+        private void send(LinkedList<LightweightMsg> list) {
+
+            while (!list.isEmpty()) {
+
+                LightweightMsg m = list.removeFirst();
+
+                try {
+                    OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
+
+                    sender.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
+
+                } catch (Exception e) {
+                    log.fatal(e);
+                }
+
+            }
+
+        }
+
+        private void waitForMessages() {
+            try {
+
+                TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS);
+                log.debug("finished - waiting for messages");
+            } catch (InterruptedException e) {
+                log.error("interrupted while waiting for messages", e);
+            }
+        }
+    }
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.List;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
+import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.apache.axiom.om.OMElement;
+import org.apache.log4j.Logger;
+
+public class SerialSender extends Thread implements SendingStrategy {
+
+    Logger log = Logger.getLogger(SerialSender.class);
+
+    private boolean stopFlag = false;
+
+    SenderUtils sender;
+
+    public SerialSender(ConfigurationManager config, ConsumerUrlManager urlman) {
+        sender = new SenderUtils(urlman, config);
+    }
+
+    public void shutdown() {
+        stopFlag = true;
+        log.info("delivery thread termination notificaiton recieved");
+    }
+
+    public void run() {
+
+        log.debug("run - delivery thread");
+
+        while (!stopFlag) {
+
+            try {
+
+                OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
+
+                if (WSMGParameter.showTrackId)
+                    log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
+                            + ": dequeued from outgoing queue");
+
+                sendNotification(outGoingMessage);
+
+            } catch (Exception e) {
+
+                log.fatal("Unexpected_exception:", e);
+            }
+        }
+    }
+
+    public synchronized void sendNotification(OutGoingMessage outGoingMessage) {
+
+        if (outGoingMessage == null) {
+            log.error("got a null outgoing message");
+            return;
+        }
+        String messageString = outGoingMessage.getTextMessage();
+
+        List<ConsumerInfo> consumerInfoList = outGoingMessage.getConsumerInfoList();
+        AdditionalMessageContent soapHeader = outGoingMessage.getAdditionalMessageContent();
+        deliverMessage(consumerInfoList, messageString, soapHeader);
+    }
+
+    private void deliverMessage(List<ConsumerInfo> consumerInfoList, String messageString,
+            AdditionalMessageContent additionalMessageContent) {
+
+        try {
+            OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(messageString));
+
+            for (ConsumerInfo obj : consumerInfoList) {
+
+                sender.send(obj, messgae2Send, additionalMessageContent);
+
+            }
+
+        } catch (XMLStreamException e) {
+            log.fatal(e);
+        }
+
+    }
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ThreadCrew.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ThreadCrew.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ThreadCrew.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ThreadCrew.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+interface TaskCompletionCallback {
+    void onCompletion(ThreadCrew.WSMGRunnable r);
+}
+
+interface RunnableEx {
+    public void run();
+
+}
+
+public class ThreadCrew implements TaskCompletionCallback {
+
+    int nextIndex = 0;
+
+    Logger log = Logger.getLogger(ThreadCrew.class);
+
+    List<WSMGRunnable> wsmgRunnables = new ArrayList<WSMGRunnable>();
+
+    ExecutorService service;
+
+    public void onCompletion(WSMGRunnable r) {
+
+        synchronized (wsmgRunnables) {
+            log.error("error occued ...");
+            List<RunnableEx> jobList = r.getJobList();
+            wsmgRunnables.remove(r);
+            WSMGRunnable newR = new WSMGRunnable(this, jobList);
+            wsmgRunnables.add(newR);
+            service.submit(newR);
+        }
+
+    }
+
+    public ThreadCrew(int poolSize) {
+
+        log.debug("pool size=" + poolSize);
+
+        service = Executors.newFixedThreadPool(poolSize);
+
+        for (int i = 0; i < poolSize; i++) {
+
+            WSMGRunnable r = new WSMGRunnable(this);
+            wsmgRunnables.add(r);
+            service.submit(r);
+        }
+
+    }
+
+    public void submitTask(RunnableEx task) {
+
+        WSMGRunnable r = null;
+
+        synchronized (wsmgRunnables) {
+
+            nextIndex++;
+            if (nextIndex >= wsmgRunnables.size())
+                nextIndex = 0;
+
+            r = wsmgRunnables.get(nextIndex);
+
+        }
+
+        r.submit(task);
+    }
+
+    public void stop() {
+
+        for (WSMGRunnable r : wsmgRunnables) {
+            r.shutdown();
+        }
+
+        service.shutdown();
+    }
+
+    class WSMGRunnable implements Runnable {
+
+        List<RunnableEx> jobs = new ArrayList<RunnableEx>();
+        TaskCompletionCallback completionCallback;
+        boolean runFlag = true;
+
+        public WSMGRunnable(TaskCompletionCallback c) {
+
+            this(c, new ArrayList<RunnableEx>());
+            System.out.println("org.apache.airavata.wsmg created new");
+        }
+
+        public WSMGRunnable(TaskCompletionCallback c, List<RunnableEx> jl) {
+            completionCallback = c;
+            jobs = jl;
+        }
+
+        public void shutdown() {
+            runFlag = false;
+        }
+
+        public void run() {
+
+            try {
+
+                int i = 0;
+
+                while (runFlag) {
+
+                    if (jobs.isEmpty()) {
+                        waitForJobs();
+                        continue;
+                    }
+
+                    RunnableEx runnable = jobs.get(i);
+                    runnable.run();
+
+                    i++;
+                    if (i >= jobs.size()) {
+                        i = 0;
+                    }
+
+                }
+
+            } finally {
+
+                completionCallback.onCompletion(this);
+            }
+
+        }
+
+        public void submit(RunnableEx w) {
+
+            synchronized (jobs) {
+                jobs.add(w);
+            }
+
+        }
+
+        public List<RunnableEx> getJobList() {
+            return jobs;
+        }
+
+        final int SLEEP_TIME_SECONDS = 1;
+
+        private void waitForJobs() {
+            try {
+
+                TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS);
+                if (log.isDebugEnabled())
+                    log.debug("finished - waiting for messages");
+            } catch (InterruptedException e) {
+                log.error("interrupted while waiting for messages", e);
+            }
+        }
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/OutGoingMessage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/OutGoingMessage.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/OutGoingMessage.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/OutGoingMessage.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.processors;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+
+public class OutGoingMessage implements Serializable {
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = -6999667921413261492L;
+
+    String textMessage;
+
+    AdditionalMessageContent additionalMessageContent;
+
+    List<ConsumerInfo> consumerInfoList = null;
+
+    // ConsumerInfo consumerInfo=null;
+
+    /**
+	 * 
+	 */
+    public OutGoingMessage() {
+        // super();
+        // TODO Auto-generated constructor stub
+        // consumerInfo=new ConsumerInfo();
+    }
+
+    /**
+     * @param textMessage
+     * @param additionalMessageContent
+     * @param consumerInfoList
+     */
+    public OutGoingMessage(String textMessage, AdditionalMessageContent additionalMessageContent,
+            List<ConsumerInfo> consumerInfoList) {
+        super();
+        // TODO Auto-generated constructor stub
+        this.textMessage = textMessage;
+        this.additionalMessageContent = additionalMessageContent;
+        this.consumerInfoList = consumerInfoList;
+    }
+
+    /**
+     * @param consumerInfo
+     *            The consumerInfo to set.
+     */
+    public void addConsumerInfo(ConsumerInfo consumerInfo) {
+        // this.consumerInfo = consumerInfo;
+        if (consumerInfoList == null) {
+            consumerInfoList = new ArrayList<ConsumerInfo>();
+        }
+        consumerInfoList.add(consumerInfo);
+    }
+
+    /**
+     * @return Returns the textMessage.
+     */
+    public String getTextMessage() {
+        return textMessage;
+    }
+
+    /**
+     * @param textMessage
+     *            The textMessage to set.
+     */
+    public void setTextMessage(String textMessage) {
+        this.textMessage = textMessage;
+    }
+
+    /**
+     * @return Returns the consumerInfoList.
+     */
+    public List<ConsumerInfo> getConsumerInfoList() {
+        return consumerInfoList;
+    }
+
+    /**
+     * @param consumerInfoList
+     *            The consumerInfoList to set.
+     */
+    public void setConsumerInfoList(List<ConsumerInfo> consumerInfoList) {
+        this.consumerInfoList = consumerInfoList;
+    }
+
+    /**
+     * @return Returns the soapHeader.
+     */
+    public AdditionalMessageContent getAdditionalMessageContent() {
+        return additionalMessageContent;
+    }
+
+    /**
+     * @param soapHeader
+     *            The soapHeader to set.
+     */
+    public void setAdditionalMessageContent(AdditionalMessageContent soapHeader) {
+        this.additionalMessageContent = soapHeader;
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/PublisherRegistrationManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/PublisherRegistrationManager.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/PublisherRegistrationManager.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/PublisherRegistrationManager.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.processors;
+
+import java.net.URI;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.commons.WsmgNameSpaceConstants;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.apache.log4j.Logger;
+
+public class PublisherRegistrationManager {
+    int counter = 1;
+    OMFactory factory = OMAbstractFactory.getOMFactory();
+    URI serviceLocation = null;
+    static org.apache.log4j.Logger logger = Logger.getLogger(PublisherRegistrationManager.class);
+
+    public PublisherRegistrationManager(URI serviceLocation) {
+        this.serviceLocation = serviceLocation;
+    }
+
+    public void registerPublisher(ProcessingContext ctx) {
+
+        OMElement eprEl = ctx.getSoapBody().getFirstChildWithName(
+                new QName(WsmgNameSpaceConstants.WSBR_NS.getNamespaceURI(), "PublisherReference"));
+        new EndpointReference(eprEl.getText());
+        // SubscriptionState state = new SubscriptionState(publisherRef);
+        // String key = "sub"+(counter++)+"@"+PREFIX;
+        // subscriptions.put(key, state);
+        EndpointReference publisherRegistrationRef = new EndpointReference(serviceLocation.toASCIIString());
+        publisherRegistrationRef.addReferenceParameter(new QName(WsmgNameSpaceConstants.WIDGET_NS.getNamespaceURI(),
+                WsmgNameSpaceConstants.RESOURCE_ID), "pub" + (counter++));
+
+        OMElement publisherRegistrationReferenceOMElement;
+        try {
+            publisherRegistrationReferenceOMElement = EndpointReferenceHelper.toOM(factory, publisherRegistrationRef,
+                    new QName(WsmgNameSpaceConstants.WSBR_NS.getNamespaceURI(), "SubscriptionManager"),
+                    WsmgNameSpaceConstants.WSA_NS.getNamespaceURI());
+
+            ctx.getRespMessage().addChild(publisherRegistrationReferenceOMElement);
+        } catch (AxisFault e) {
+            // TODO add with throws clause
+            logger.fatal("axis fault found at publisher registrationgmanager", e);
+            e.printStackTrace();
+        }
+    }
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/ReceivedMessage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/ReceivedMessage.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/ReceivedMessage.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/ReceivedMessage.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.processors;
+
+import java.io.Serializable;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+
+public class ReceivedMessage implements Serializable {
+
+    private static final long serialVersionUID = 7908767667077753895L;
+
+    String topicLocalString = null;
+
+    // String soapHeader = null;
+    AdditionalMessageContent soapHeader;
+
+    String notificationMessage = null;
+
+    String wsntMessageConverterClassName;
+
+    /**
+     * @param topicLocalString
+     * @param producerReference
+     * @param notificationMessageEl
+     * @param wsntMessageConverterClassName
+     */
+    public ReceivedMessage(String topicLocalString, AdditionalMessageContent soapHeader, String notificationMessageEl,
+            String wsntMessageConverterClassName) {
+        super();
+        // TODO Auto-generated constructor stub
+        this.topicLocalString = topicLocalString;
+        this.soapHeader = soapHeader;
+        this.notificationMessage = notificationMessageEl;
+        this.wsntMessageConverterClassName = wsntMessageConverterClassName;
+    }
+
+    /**
+     * @return Returns the soapHeader.
+     */
+    public AdditionalMessageContent getSoapHeader() {
+        return soapHeader;
+    }
+
+    /**
+     * @param soapHeader
+     *            The soapHeader to set.
+     */
+    public void setSoapHeader(AdditionalMessageContent soapHeader) {
+        this.soapHeader = soapHeader;
+    }
+
+    /**
+     * @return Returns the notificationMessage.
+     */
+    public String getNotificationMessage() {
+        return notificationMessage;
+    }
+
+    /**
+     * @param notificationMessage
+     *            The notificationMessage to set.
+     */
+    public void setNotificationMessage(String notificationMessage) {
+        this.notificationMessage = notificationMessage;
+    }
+
+    /**
+     * @return Returns the topicLocalString.
+     */
+    public String getTopicLocalString() {
+        return topicLocalString;
+    }
+
+    /**
+     * @param topicLocalString
+     *            The topicLocalString to set.
+     */
+    public void setTopicLocalString(String topicLocalString) {
+        this.topicLocalString = topicLocalString;
+    }
+
+    /**
+     * @return Returns the wsntMessageConverterClassName.
+     */
+    public String getWsntMessageConverterClassName() {
+        return wsntMessageConverterClassName;
+    }
+
+    /**
+     * @param wsntMessageConverterClassName
+     *            The wsntMessageConverterClassName to set.
+     */
+    public void setWsntMessageConverterClassName(String wsntMessageConverterClassName) {
+        this.wsntMessageConverterClassName = wsntMessageConverterClassName;
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/transports/jms/MessageMatcherConnection.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/transports/jms/MessageMatcherConnection.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/transports/jms/MessageMatcherConnection.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/transports/jms/MessageMatcherConnection.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.transports.jms;
+
+public interface MessageMatcherConnection {
+    public int stop();
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+//Used for stress test. use together with TimerThread
+public class Counter {
+
+    private AtomicLong counter = new AtomicLong(0);
+
+    private AtomicReference<String> otherStringValue = new AtomicReference<String>();
+
+    public void addCounter() {
+        counter.getAndIncrement();
+
+    }
+
+    public synchronized void addCounter(String otherValue) {
+        counter.getAndIncrement();
+        otherStringValue.set(otherValue);
+    }
+
+    /**
+     * @return Returns the counterValue.
+     */
+    public long getCounterValue() {
+
+        return counter.get();
+    }
+
+    /**
+     * @param counterValue
+     *            The counterValue to set.
+     */
+    public void setCounterValue(long counterValue) {
+        counter.set(counterValue);
+
+    }
+
+    /**
+     * @return Returns the otherValueString.
+     */
+    public String getOtherValueString() {
+
+        return otherStringValue.get();
+    }
+
+    /**
+     * @param otherValueString
+     *            The otherValueString to set.
+     */
+    public void setOtherValueString(String otherValueString) {
+        otherStringValue.set(otherValueString);
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/CurrentDate.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/CurrentDate.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/CurrentDate.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/CurrentDate.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.util.TimeZone;
+
+import org.apache.axiom.om.OMElement;
+
+public class CurrentDate implements Cloneable {
+    private OMElement parent;
+
+    public CurrentDate(OMElement parent) {
+        this.parent = parent;
+
+    }
+
+    public Object clone() throws CloneNotSupportedException {
+
+        OMElement element = parent.cloneOMElement();
+
+        return new CurrentDate(element);
+
+    }
+
+    public String getText() {
+        // active entity: value computed on demand
+        // TODO use static method that would format System.currentTimeMillis +
+        // TZ
+        DcDate now = new DcDate(TimeZone.getDefault());
+        return now.toString();
+    }
+
+    public Boolean isWhitespaceContent() {
+        return null;
+    }
+
+    public OMElement getParent() {
+        return parent;
+    }
+
+    public void setParent(OMElement currentDateParent) {
+        this.parent = currentDateParent;
+    }
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/DcDate.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/DcDate.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/DcDate.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/DcDate.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,472 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TimeZone;
+
+//TODO: support for fractional seconds
+
+/**
+ * Represent dc:date as an immutable value object. http://www.w3.org/TR/NOTE-datetime
+ * http://www.sics.se/~preben/DC/date_help.html
+ * 
+ */
+public final class DcDate implements Cloneable {
+
+    public Object clone() throws CloneNotSupportedException {
+        // it is easy as object is immutable
+        DcDate dd = (DcDate) super.clone();
+        return dd;
+    }
+
+    private String canonical;
+
+    // private Date date;
+    private int year;
+
+    private int month; // 1-12
+
+    private int day; // 1-31
+
+    private int hh = -1; // 00-23
+
+    private int mm = -1; // 00-59
+
+    private int ss = -1; // 00-59
+
+    private String decimalFraction = null; // decimal fraction
+
+    private int zoneOffset; // in minutes +/-(12*60) ???
+
+    private Calendar instant;
+
+    private int millisOffset = -1; // milliseconds
+
+    // public DcDate() throws SException {
+    //
+    // this(System.currentTimeMillis());
+    // }
+    //
+    // public DcDate(long timeMillis) throws SException {
+    // this(timeMillis, null);
+    // Calendar local = new GregorianCalendar();
+    // local.setTimeInMillis(timeMillis);
+    // }
+
+    // public DcDate(String tzName) throws SException {
+    // TimeZone tz = tzName != null ? TimeZone.getTimeZone(tzName) :
+    // TimeZone.getDefault();
+    // TimeZone.getTimeZone("GMT")
+    public DcDate(TimeZone tz) throws RuntimeException {
+
+        // based on http://javaalmanac.com/egs/java.util/GetAllZones.html?l=rel
+
+        Calendar cal = new GregorianCalendar(tz);
+        init(cal, tz);
+    }
+
+    public DcDate(Calendar cal) throws RuntimeException {
+        init(cal, cal.getTimeZone());
+    }
+
+    private void init(Calendar cal, TimeZone tz) throws RuntimeException {
+        // Get the number of hours from GMT
+        int rawOffset = tz.getRawOffset();
+        zoneOffset = rawOffset / (60 * 1000);
+
+        instant = cal;
+        // http://javaalmanac.com/egs/java.util/GetCurDate.html
+        year = cal.get(Calendar.YEAR);
+        month = cal.get(Calendar.MONTH) + 1; // 0=Jan, 1=Feb, ...
+        day = cal.get(Calendar.DAY_OF_MONTH);
+        hh = cal.get(Calendar.HOUR_OF_DAY); // 0..23
+        mm = cal.get(Calendar.MINUTE); // 0..59
+        ss = cal.get(Calendar.SECOND); // 0..59
+        // int ms = cal.get(Calendar.MILLISECOND); // 0..999
+        // date = cal.getTime();
+        canonical = computeCanonical();
+    }
+
+    public DcDate(String dcDate) throws RuntimeException {
+        // try {
+        // synchronized(sdf) { //TODO REVISIT: SimpleFormat is not multi-thread
+        // safe!
+        // d = sdf.parse(dcDate);
+        // }
+        // } catch (ParseException e) {
+        // throw new SException("could not parse dc:date "+dcDate+getCtx(pp),
+        // e);
+        // }
+        // 2003-05-06T23:07:04Z
+        // 2003-08-09T18:36:00-05:00
+        // 1234567890123456789012345
+        // 2004-02-02T19:09:46-05:00
+        // 1997
+        // 1997-07
+        // 1997-07-16
+        // 1997-07-16T19:20+01:00
+        // 1997-07-16T19:20:30+01:00
+        // 1997-07-16T19:20:30.45+01:00
+        assert dcDate != null;
+        // assert pp != null;
+        canonical = dcDate;
+        year = getInt(dcDate, 1, 4); // add min, max check for all getInt
+        if (dcDate.length() == 4) {
+            return;
+        }
+        check(dcDate, 5, '-');
+        month = getInt(dcDate, 6, 2);
+        if (dcDate.length() == 7) {
+            return;
+        }
+        check(dcDate, 8, '-');
+        day = getInt(dcDate, 9, 2);
+        if (dcDate.length() == 10) {
+            return;
+        }
+        check(dcDate, 11, 'T');
+        hh = getInt(dcDate, 12, 2);
+        check(dcDate, 14, ':');
+        mm = getInt(dcDate, 15, 2);
+        if (dcDate.length() == 16) {
+            throw new RuntimeException("expected date formatted as YYYY-MM-DDThh:mm[:ss[.mmm]]TZD and not " + dcDate);
+        }
+        int pos = 17;
+        char c17 = dcDate.charAt(pos - 1);
+        if (c17 == ':') {
+            check(dcDate, 17, ':');
+            ss = getInt(dcDate, 18, 2);
+            pos = 20;
+        }
+        char zoneIndicator = dcDate.charAt(pos - 1);
+        if (zoneIndicator == '.') { // OK we have yet millliseocnds to parse
+            // (and ignore ...)
+            // Ex: 2004-04-13T11:53:15.4362784-04:00
+            // eat digits
+            char d;
+            int oldPos = pos;
+            do {
+                d = dcDate.charAt(pos);
+                ++pos;
+            } while (d >= '0' && d <= '9');
+            if (oldPos + 1 == pos) {
+                throw new RuntimeException("expected date formtted as YYYY-MM-DDThh:mm[:ss[.s]]TZD and not " + dcDate);
+
+            }
+            zoneIndicator = d;
+            int newPos = pos;
+            decimalFraction = dcDate.substring(oldPos, newPos - 1);
+            if (newPos - oldPos >= 3) {
+                newPos = oldPos + 3;
+            }
+            int len = newPos - (oldPos + 1);
+            int ii = getInt(dcDate, oldPos + 1, len);
+            if (len == 1) {
+                millisOffset = 100 * ii;
+            } else if (len == 2) {
+                millisOffset = 10 * ii;
+            } else if (len == 3) {
+                millisOffset = ii;
+            }
+        }
+        if (zoneIndicator == 'Z') {
+            // done
+        } else if (zoneIndicator == '-' || zoneIndicator == '+') {
+            int zoneHH = getInt(dcDate, pos + 1, 2);
+            check(dcDate, pos + 3, ':');
+            int zoneMM = getInt(dcDate, pos + 4, 2);
+            zoneOffset = 60 * zoneHH + zoneMM;
+            if (zoneIndicator == '-') {
+                zoneOffset *= -1;
+            }
+        } else {
+            throw new RuntimeException("unknown zone indicator " + zoneIndicator + " in " + dcDate);
+        }
+        // Get the number of hours from GMT
+
+        // TimeZone tz = TimeZone.
+        instant = new GregorianCalendar();
+        int rawOffset = zoneOffset * 60 * 1000;
+        instant.set(Calendar.ZONE_OFFSET, rawOffset);
+        instant.set(Calendar.YEAR, year);
+        instant.set(Calendar.MONTH, month - 1); // 0=Jan, 1=Feb, ...
+        instant.set(Calendar.DAY_OF_MONTH, day);
+        instant.set(Calendar.HOUR_OF_DAY, hh); // 0..23
+        instant.set(Calendar.MINUTE, mm); // 0..59
+        instant.set(Calendar.SECOND, ss); // 0..59
+        instant.set(Calendar.MILLISECOND, millisOffset); // /0..999 ?
+        instant.getTimeInMillis(); // full time in ms -- test?
+    }
+
+    public long getTimeInMillis() {
+        return instant.getTimeInMillis();
+    }
+
+    public int getYear() {
+        return year;
+    }
+
+    public int getMonth() {
+        return month;
+    }
+
+    public int getDay() {
+        return day;
+    }
+
+    public int getHour() {
+        return hh;
+    }
+
+    public int getMinute() {
+        return mm;
+    }
+
+    public int getSecond() {
+        return ss;
+    }
+
+    public int getTimeZoneOffset() {
+        return zoneOffset;
+    }
+
+    public String getDcDate() {
+        return canonical;
+    }
+
+    public String getDecimalFraction() {
+        return decimalFraction;
+    }
+
+    private String computeCanonical() {
+        // 2003-08-09T18:36:00-05:00
+        // 1234567890123456789012345
+        StringBuffer sb = new StringBuffer();
+        fill(sb, year, 4);
+        if (month > 0) {
+            sb.append('-');
+            fill(sb, month, 2);
+            if (day > 0) {
+                sb.append('-');
+                fill(sb, day, 2);
+                if (hh > -1) {
+                    sb.append('T');
+                    fill(sb, hh, 2);
+                    sb.append(':');
+                    fill(sb, mm, 2);
+                    if (ss > -1) {
+                        sb.append(':');
+                        fill(sb, ss, 2);
+                    }
+                    if (decimalFraction != null) {
+                        sb.append('.');
+                        sb.append(decimalFraction);
+                    }
+                    if (zoneOffset == 0) {
+                        sb.append('Z');
+                    } else {
+                        int off = zoneOffset;
+                        if (zoneOffset > 0) {
+                            sb.append('+');
+                        } else {
+                            sb.append('-');
+                            off *= -1;
+                        }
+                        int zoneHH = off / 60;
+                        int zoneMM = off % 60;
+                        fill(sb, zoneHH, 2);
+                        sb.append(':');
+                        fill(sb, zoneMM, 2);
+                    }
+                }
+            }
+        }
+        return sb.toString();
+    }
+
+    public String toString() {
+        return canonical;
+    }
+
+    public final static String LOCATION_PROPERTY = "http://xmlpull.org/v1/doc/properties.html#location";
+
+    public static String printable(char ch) {
+        return "'" + escape(ch) + "'";
+    }
+
+    public static String printable(String s) {
+        return "\"" + escape(s) + "\"";
+    }
+
+    public static String escape(char ch) {
+        if (ch == '\n') {
+            return "\\n";
+        } else if (ch == '\r') {
+            return "\\r";
+        } else if (ch == '\t') {
+            return "\\t";
+        } else if (ch == '\'') {
+            return "\\'";
+        }
+        if (ch > 127 || ch < 32) {
+            return "\\u" + Integer.toHexString((int) ch);
+        }
+        return "" + ch;
+    }
+
+    public static String escape(String s) {
+        if (s == null) {
+            return null;
+        }
+        final int sLen = s.length();
+        StringBuffer buf = new StringBuffer(sLen + 10);
+        for (int i = 0; i < sLen; ++i) {
+            buf.append(escape(s.charAt(i)));
+        }
+        s = buf.toString();
+        return s;
+    }
+
+    private static final int LOOKUP_10S[] = { 1, 10, 10 * 10, 10 * 100, 100 * 100 };
+
+    public static void fill(StringBuffer sb, int value, int fields) {
+        assert fields > 0;
+        assert fields <= 4;
+        assert value >= 0;
+        int mm = LOOKUP_10S[fields];
+        assert value < mm;
+        // assert mm > 0
+        // TODO: optimize it ...
+        while (fields-- > 0) {
+            mm /= 10;
+            if (value >= mm) {
+                sb.append((value / mm) % 10);
+                // i /= 10;
+            } else {
+                sb.append('0');
+            }
+        }
+
+        // String s = sb.toString();
+        // assert s.toString().length == fields;
+        // return s;
+    }
+
+    public static void check(String dcDate, int pos, char ch) {
+        if (pos > dcDate.length()) {
+            throw new RuntimeException("expected " + printable(ch) + " at position " + pos + " but " + dcDate
+                    + " is too short");
+        }
+        char c = dcDate.charAt(pos - 1);
+        if (c != ch) {
+            throw new RuntimeException("expected " + printable(ch) + " but got " + printable(c) + " in " + dcDate);
+        }
+    }
+
+    public static int getInt(String dcDate, int pos, int len) throws RuntimeException {
+        assert len > 0;
+        int end = pos + len - 1;
+        String s = dcDate.substring(pos - 1, end);
+        try {
+            int i = Integer.parseInt(s);
+            return i;
+        } catch (NumberFormatException e) {
+            throw new RuntimeException("expected number for " + printable(s) + " in " + dcDate, e);
+
+        }
+    }
+
+    /**
+     * Take string and return string that has no spaces, only alpahnumerics, each word is capitalized (like WikiWords)
+     * sometimes called CamelCase?!.
+     */
+    public static String getWikiTitle(String title) {
+        StringBuffer sb = new StringBuffer();
+        List<String> words = breakIntoWords(title);
+        boolean start = true;
+        for (Iterator<String> it = words.iterator(); it.hasNext();) {
+            String word = it.next();
+            boolean wordStart = true;
+            for (int i = 0; i < word.length(); i++) {
+                char c = word.charAt(i);
+                if (Character.isLetterOrDigit(c)) {
+                    if (wordStart && !start) {
+                        sb.append(Character.toUpperCase(c));
+                    } else {
+                        sb.append(c);
+                    }
+                    wordStart = false;
+                }
+            }
+            start = false;
+        }
+        return sb.toString();
+    }
+
+    public static List<String> breakIntoWords(String s) {
+        List<String> words = new ArrayList<String>(s.length() / 5);
+        boolean inWord = true;
+        int wordStart = 0;
+        for (int pos = 0; pos < s.length(); ++pos) {
+            char ch = s.charAt(pos);
+            boolean isWordSeparator = Character.isWhitespace(ch);
+            if (ch == ',') {
+                isWordSeparator = true;
+            }
+            if (ch == '.') {
+                isWordSeparator = true;
+            }
+            if (isWordSeparator) {
+                if (inWord) {
+                    words.add(s.substring(wordStart, pos));
+                    inWord = false;
+                }
+            } else {
+                if (!inWord) {
+                    inWord = true;
+                    wordStart = pos;
+                }
+            }
+            assert inWord == !isWordSeparator;
+        }
+        if (inWord) {
+            words.add(s.substring(wordStart));
+        }
+        return words;
+    }
+
+    public static String makeTwoDigit(int oneOrTwoDigits) {
+        if (oneOrTwoDigits < 0 || oneOrTwoDigits > 99) {
+            throw new IllegalArgumentException();
+        }
+        if (oneOrTwoDigits < 10) {
+            return "0" + oneOrTwoDigits;
+        }
+        return "" + oneOrTwoDigits;
+    }
+
+}

Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/LoadConfigProperties.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/LoadConfigProperties.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/LoadConfigProperties.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/LoadConfigProperties.java Sun Jul  3 15:51:36 2011
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.io.File;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ConfigurationContextFactory;
+import org.apache.log4j.Logger;
+
+public class LoadConfigProperties {
+
+    private static Logger logger = Logger.getLogger(LoadConfigProperties.class);
+
+    public static LoadConfigProperties confProp = null;
+    private String axisRepo = null;
+    private ConfigurationContext configContext = null;
+
+    private LoadConfigProperties() {
+
+        String axis2RepoDir = System.getProperty("axis2.repository");
+
+        if (axis2RepoDir == null) {
+            String axis2Home = System.getenv("AXIS2_HOME");
+
+            if (axis2Home != null) {
+                axis2Home = axis2Home.trim();
+                axis2RepoDir = axis2Home.endsWith("/") ? axis2Home + "repository" : axis2Home + "/" + "repository";
+
+            }
+        }
+        File repoDir = null;
+        if (axis2RepoDir != null)
+            repoDir = new File(axis2RepoDir);
+
+        if (repoDir != null && repoDir.isDirectory()) {
+
+            try {
+                configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoDir
+                        .getAbsolutePath());
+
+                axisRepo = repoDir.getAbsolutePath();
+
+            } catch (AxisFault e) {
+                logger.error("unable to load the repository", e);
+            }
+
+        }
+
+    }
+
+    public static LoadConfigProperties getInstance() {
+        if (confProp == null)
+            confProp = new LoadConfigProperties();
+        return confProp;
+    }
+
+    public String getAxisRepo() {
+        return axisRepo;
+    }
+
+    public boolean hasAxisRepo() {
+
+        logger.info("repo is : " + axisRepo);
+
+        return axisRepo != null;
+    }
+
+    public ConfigurationContext getConfCtxFromFileSystem() {
+        return configContext;
+    }
+}