You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/24 01:37:02 UTC

svn commit: r1175072 - in /incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg: broker/ messenger/ messenger/protocol/ messenger/protocol/impl/ messenger/strategy/ messenger/strategy/impl/

Author: patanachai
Date: Fri Sep 23 23:37:01 2011
New Revision: 1175072

URL: http://svn.apache.org/viewvc?rev=1175072&view=rev
Log:
AIRAVATA-101 Change Thread Pool in FixedParallelSender to Standard Java Thread Pool.

Added:
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java
      - copied, changed from r1174883, incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
Removed:
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ThreadCrew.java
Modified:
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java Fri Sep 23 23:37:01 2011
@@ -22,6 +22,7 @@
 package org.apache.airavata.wsmg.broker;
 
 import java.io.File;
+import java.lang.reflect.Constructor;
 
 import org.apache.airavata.wsmg.broker.handler.PublishedMessageHandler;
 import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
@@ -33,6 +34,11 @@ import org.apache.airavata.wsmg.commons.
 import org.apache.airavata.wsmg.config.WSMGParameter;
 import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
 import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.airavata.wsmg.messenger.DeliveryProcessor;
+import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.airavata.wsmg.messenger.protocol.impl.Axis2Protocol;
 import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
 import org.apache.airavata.wsmg.messenger.strategy.impl.FixedParallelSender;
 import org.apache.airavata.wsmg.messenger.strategy.impl.ParallelSender;
@@ -47,12 +53,15 @@ import org.slf4j.LoggerFactory;
 public class BrokerServiceLifeCycle implements ServiceLifeCycle {
 
     private static final Logger log = LoggerFactory.getLogger(BrokerServiceLifeCycle.class);
-    private SendingStrategy method = null;
+    
+    private static final long DEFAULT_SOCKET_TIME_OUT = 20000l;
+    
+    private DeliveryProcessor proc;
 
     public void shutDown(ConfigurationContext arg, AxisService service) {
         log.info("broker shutting down");
-        if (method != null) {
-            method.shutdown();
+        if (proc != null) {
+            proc.stop();
         }
     }
 
@@ -147,28 +156,57 @@ public class BrokerServiceLifeCycle impl
             }
             return;
         }
+        
+        /*
+         * Create Protocol
+         */
+        DeliveryProtocol protocol;
+        String protocolClass = configMan.getConfig(WsmgCommonConstants.DELIVERY_PROTOCOL,
+                Axis2Protocol.class.getName());
+        try {
+            Class cl = Class.forName(protocolClass);
+            Constructor<DeliveryProtocol> co = cl.getConstructor(null);
+            protocol = co.newInstance((Object[]) null);
+
+        } catch (Exception e) {
+            log.error("Cannot initial protocol sender", e);
+            return;
+        }
+        protocol.setTimeout(configMan.getConfig(WsmgCommonConstants.CONFIG_SOCKET_TIME_OUT, DEFAULT_SOCKET_TIME_OUT));                      
 
+        /*
+         * Create delivery method 
+         */
+        SendingStrategy method = null;
+        String initedmethod = null;        
         String deliveryMethod = configMan.getConfig(WsmgCommonConstants.CONFIG_DELIVERY_METHOD,
                 WsmgCommonConstants.DELIVERY_METHOD_SERIAL);
-
-        ConsumerUrlManager urlManager = new ConsumerUrlManager(configMan);
-
-        String initedmethod = null;
-
         if (WsmgCommonConstants.DELIVERY_METHOD_PARALLEL.equalsIgnoreCase(deliveryMethod)) {
-
-            method = new ParallelSender(configMan, urlManager);
+            method = new ParallelSender();
             initedmethod = WsmgCommonConstants.DELIVERY_METHOD_PARALLEL;
-
+            
         } else if (WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW.equalsIgnoreCase(deliveryMethod)) {
-            method = new FixedParallelSender(configMan, urlManager);
+            int poolsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_THREAD_POOL_SIZE,
+                    WsmgCommonConstants.DEFAULT_SENDING_THREAD_POOL_SIZE);
+            int batchsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_BATCH_SIZE,
+                    WsmgCommonConstants.DEFAULT_SENDING_BATCH_SIZE);
+            method = new FixedParallelSender(poolsize, batchsize);
             initedmethod = WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW;
+            
         } else {
-            method = new SerialSender(configMan, urlManager);
+            method = new SerialSender();
             initedmethod = WsmgCommonConstants.DELIVERY_METHOD_SERIAL;
-        }
-
-        method.start();
+        }        
+        
+        /*
+         * Create Deliverable
+         */
+        ConsumerUrlManager urlManager = new ConsumerUrlManager(configMan);
+        Deliverable senderUtils = new SenderUtils(urlManager);
+        senderUtils.setProtocol(protocol);
+        
+        DeliveryProcessor proc = new DeliveryProcessor(senderUtils, method);
+        proc.start();
         log.info(initedmethod + " sending method inited");
     }
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java Fri Sep 23 23:37:01 2011
@@ -37,6 +37,9 @@ import org.apache.axis2.addressing.Endpo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/*
+ * FIXME: need thread safe version
+ */
 public class ConsumerUrlManager {
     
     private static final Logger logger = LoggerFactory.getLogger(ConsumerUrlManager.class);

Added: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java?rev=1175072&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java (added)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java Fri Sep 23 23:37:01 2011
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.wsmg.messenger;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.axiom.om.OMElement;
+
+public interface Deliverable {
+    void setProtocol(DeliveryProtocol protocol);
+    
+    void send(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent);
+}

Added: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java?rev=1175072&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java (added)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java Fri Sep 23 23:37:01 2011
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.wsmg.messenger;
+
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeliveryProcessor{
+
+    private static final Logger logger = LoggerFactory.getLogger(DeliveryProcessor.class);
+    
+    private SendingStrategy strategy;
+    private Deliverable deliverable;
+    
+    private boolean running;
+    private Thread t;
+
+    public DeliveryProcessor(Deliverable deliverable, SendingStrategy strategy) {
+        this.strategy = strategy;       
+        this.deliverable = deliverable;
+    }
+
+    public void start() {
+        this.running = true;
+        t = new Thread(new CheckingAndSending());
+        t.start();
+    }
+
+    public void stop() {
+        this.running = false;
+    }        
+
+    private class CheckingAndSending implements Runnable {
+
+        public void run() {
+            strategy.init();
+            while (running) {
+                logger.debug("run - delivery thread");
+                try {
+
+                    OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
+
+                    if (WSMGParameter.showTrackId)
+                        logger.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
+                                + ": dequeued from outgoing queue");
+
+                    strategy.addMessageToSend(outGoingMessage, deliverable);
+
+                } catch (Exception e) {
+                    logger.error("Unexpected_exception:", e);
+                }
+            }
+            strategy.shutdown();
+        }
+    }
+}

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java Fri Sep 23 23:37:01 2011
@@ -22,18 +22,14 @@
 package org.apache.airavata.wsmg.messenger;
 
 import java.io.StringReader;
-import java.lang.reflect.Constructor;
 
 import javax.xml.stream.XMLStreamException;
 
 import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
 import org.apache.airavata.wsmg.broker.ConsumerInfo;
 import org.apache.airavata.wsmg.commons.CommonRoutines;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
 import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
 import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.messenger.protocol.Axis2Protocol;
 import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
 import org.apache.airavata.wsmg.messenger.protocol.SendingException;
 import org.apache.axiom.om.OMAbstractFactory;
@@ -46,38 +42,22 @@ import org.slf4j.LoggerFactory;
 /*
  * this class is not thread safe
  * */
-
-public class SenderUtils {
+public class SenderUtils implements Deliverable {
 
     private static final Logger logger = LoggerFactory.getLogger(SenderUtils.class);
 
-    OMFactory factory = OMAbstractFactory.getOMFactory();
-
-    ConsumerUrlManager urlManager;
+    private static OMFactory factory = OMAbstractFactory.getOMFactory();
 
-    DeliveryProtocol protocol;
+    private ConsumerUrlManager urlManager;
+    
+    private DeliveryProtocol protocol;
 
-    public SenderUtils(ConsumerUrlManager urlMan, ConfigurationManager config) {
+    public SenderUtils(ConsumerUrlManager urlMan) {
         urlManager = urlMan;
-
-        /*
-         * Invoke factory and config
-         */
-        String protocolClass = config.getConfig(WsmgCommonConstants.DELIVERY_PROTOCOL,
-                "org.apache.airavata.wsmg.messenger.protocol.Axis2Protocol");
-        try {
-            Class cl = Class.forName(protocolClass);
-            Constructor<DeliveryProtocol> co = cl.getConstructor(null);
-            protocol = co.newInstance((Object[]) null);
-
-        } catch (Exception e) {
-            // fallback to normal class
-            logger.error("Cannot initial protocol sender", e);
-            protocol = new Axis2Protocol();
-        }
-
-        protocol.setTimeout(Long.parseLong(config.getConfig(WsmgCommonConstants.CONFIG_SOCKET_TIME_OUT, "20000")));
-
+    }
+    
+    public void setProtocol(DeliveryProtocol protocol){
+        this.protocol = protocol;
     }
 
     public void send(ConsumerInfo consumerInfo, OMElement notificationMessageBodyEl,

Copied: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java (from r1174883, incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java)
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java?p2=incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java&p1=incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java&r1=1174883&r2=1175072&rev=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java Fri Sep 23 23:37:01 2011
@@ -19,7 +19,7 @@
  *
  */
 
-package org.apache.airavata.wsmg.messenger.protocol;
+package org.apache.airavata.wsmg.messenger.protocol.impl;
 
 import java.io.StringReader;
 import java.util.LinkedList;
@@ -31,6 +31,8 @@ import org.apache.airavata.wsmg.broker.A
 import org.apache.airavata.wsmg.broker.ConsumerInfo;
 import org.apache.airavata.wsmg.commons.CommonRoutines;
 import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.airavata.wsmg.messenger.protocol.SendingException;
 import org.apache.axiom.om.OMAbstractFactory;
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.util.ElementHelper;
@@ -40,6 +42,7 @@ import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.client.Options;
 import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.transport.http.HTTPConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,15 +50,11 @@ public class Axis2Protocol implements De
 
     private static final Logger logger = LoggerFactory.getLogger(Axis2Protocol.class);
 
-    SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();
+    private static SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();
 
-    ServiceClient nonThreadLocalServiceClient = null;
+    private ServiceClient nonThreadLocalServiceClient;
 
-    long tcpConnectionTimeout = 0;
-
-    public void setTimeout(long timeout) {
-        this.tcpConnectionTimeout = timeout;
-    }
+    long tcpConnectionTimeout;
 
     public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent)
             throws SendingException {
@@ -82,36 +81,35 @@ public class Axis2Protocol implements De
             }
         }
 
+        ServiceClient client = null;
         try {
 
-            ServiceClient client = configureServiceClient(actionString, consumerReference,
-                    additionalMessageContent.getMessageID(), soapHeaders);
+            client = configureServiceClient(actionString, consumerReference, additionalMessageContent.getMessageID(),
+                    soapHeaders);
 
             client.sendRobust(message);
-            client.cleanupTransport();
 
         } catch (AxisFault ex) {
             throw new SendingException(ex.getCause());
+        } finally {
+            if (client != null) {
+                try {
+                    client.cleanupTransport();
+                } catch (AxisFault ex) {
+                    logger.error(ex.getMessage(), ex);
+                }
+            }
         }
     }
 
-    private ServiceClient getServiceClient() throws AxisFault {
-
-        ServiceClient ret = nonThreadLocalServiceClient;
-        if (ret == null) {
-            ret = new ServiceClient();
-
-            nonThreadLocalServiceClient = ret;
-        }
-        ret.removeHeaders();
-        return ret;
+    public void setTimeout(long timeout) {
+        this.tcpConnectionTimeout = timeout;
     }
 
     private ServiceClient configureServiceClient(String action, EndpointReference consumerLocation, String msgId,
             List<OMElement> soapHeaders) throws AxisFault {
 
         // not engaging addressing modules
-
         ServiceClient client = getServiceClient();
 
         SOAPHeaderBlock msgIdEl = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
@@ -127,15 +125,12 @@ public class Axis2Protocol implements De
         client.addHeader(to);
 
         for (OMElement omHeader : soapHeaders) {
-
             try {
                 SOAPHeaderBlock headerBlock = ElementHelper.toSOAPHeaderBlock(omHeader, soapfactory);
-
                 client.addHeader(headerBlock);
             } catch (Exception e) {
                 throw AxisFault.makeFault(e);
             }
-
         }
 
         Options opts = new Options();
@@ -143,12 +138,18 @@ public class Axis2Protocol implements De
         opts.setMessageId(msgId);
         opts.setTo(consumerLocation);
         opts.setAction(action);
-        opts.setProperty(org.apache.axis2.transport.http.HTTPConstants.CHUNKED, Boolean.FALSE);
-
-        opts.setProperty(org.apache.axis2.transport.http.HTTPConstants.HTTP_PROTOCOL_VERSION,
-                org.apache.axis2.transport.http.HTTPConstants.HEADER_PROTOCOL_10);
+        opts.setProperty(HTTPConstants.CHUNKED, Boolean.FALSE);
+        opts.setProperty(HTTPConstants.HTTP_PROTOCOL_VERSION, HTTPConstants.HEADER_PROTOCOL_10);
         client.setOptions(opts);
 
         return client;
     }
+
+    private ServiceClient getServiceClient() throws AxisFault {
+        if (nonThreadLocalServiceClient == null) {
+            nonThreadLocalServiceClient = new ServiceClient();
+        }
+        nonThreadLocalServiceClient.removeHeaders();
+        return nonThreadLocalServiceClient;
+    }
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java Fri Sep 23 23:37:01 2011
@@ -21,8 +21,11 @@
 
 package org.apache.airavata.wsmg.messenger.strategy;
 
-public interface SendingStrategy {
-    public void start();
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.messenger.Deliverable;
 
-    public void shutdown();
+public interface SendingStrategy {
+    void init();
+    void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable);
+    void shutdown();
 }

Added: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java?rev=1175072&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java (added)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java Fri Sep 23 23:37:01 2011
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ConsumerHandler implements Runnable {
+    
+    private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class);
+    
+    protected LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
+
+    private final long id;
+
+    private String consumerUrl;
+
+    private Deliverable deliverable;
+
+    public ConsumerHandler(long handlerId, String url, Deliverable deliverable) {
+        id = handlerId;
+        consumerUrl = url;
+        this.deliverable = deliverable;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public String getConsumerUrl() {
+        return consumerUrl;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof ConsumerHandler) {
+            ConsumerHandler h = (ConsumerHandler) o;
+            return h.getId() == this.id && h.getConsumerUrl().equals(this.getConsumerUrl());
+        }
+        return false;
+    }
+
+    public void submitMessage(LightweightMsg msg) {
+        try {
+            queue.put(msg);
+        } catch (InterruptedException e) {
+            log.error("Interrupted when trying to add message");
+        }
+    }
+
+    protected void send(List<LightweightMsg> list) {
+        for (LightweightMsg m : list) {
+            try {
+                OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
+                deliverable.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+}

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java Fri Sep 23 23:37:01 2011
@@ -21,221 +21,112 @@
 
 package org.apache.airavata.wsmg.messenger.strategy.impl;
 
-import java.io.StringReader;
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.commons.CommonRoutines;
 import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
-import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.Deliverable;
 import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
-import org.apache.axiom.om.OMElement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FixedParallelSender extends Thread implements SendingStrategy {
+public class FixedParallelSender implements SendingStrategy {
 
     private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class);
 
-    private ConcurrentHashMap<String, ConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ConsumerHandler>();
-
-    private ThreadCrew threadCrew = null;
-
-    private ConsumerUrlManager urlManager = null;
-    private ConfigurationManager configManager = null;
+    private ConcurrentHashMap<String, FixedParallelConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, FixedParallelConsumerHandler>();
 
     private long consumerHandlerIdCounter;
 
-    private boolean stopFlag = false;
-
-    public FixedParallelSender(ConfigurationManager config, ConsumerUrlManager urlMan) {
-
-        int poolSize = config.getConfig(WsmgCommonConstants.CONFIG_SENDING_THREAD_POOL_SIZE,
-                WsmgCommonConstants.DEFAULT_SENDING_THREAD_POOL_SIZE);
+    private int batchSize;
 
-        threadCrew = new ThreadCrew(poolSize);
-        urlManager = urlMan;
-        configManager = config;
-    }
+    private ExecutorService threadPool;
 
-    public void shutdown() {
-        stopFlag = true;
+    public FixedParallelSender(int poolsize, int batchsize) {
+        this.threadPool = Executors.newFixedThreadPool(poolsize);
+        this.batchSize = batchsize;
     }
 
-    public void run() {
-        int dequeuedMessageCounter = 0;
-
-        while (!stopFlag) {
-
-            try {
+    public void init() {
 
-                if (log.isDebugEnabled())
-                    log.debug("before dequeue -  delivery thread");
-
-                OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
-
-                if (WSMGParameter.showTrackId)
-                    log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
-                            + ": dequeued from outgoing queue");
-
-                distributeOverConsumerQueues(outGoingMessage);
-
-            } catch (Exception e) {
-                log.error("Unexpected_exception:", e);
-            }
-
-            dequeuedMessageCounter++;
-        }
+    }
 
-        threadCrew.stop();
+    public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+        distributeOverConsumerQueues(outMessage, deliverable);
+    }
 
+    public void shutdown() {
+        threadPool.shutdown();
     }
 
-    public void distributeOverConsumerQueues(OutGoingMessage message) {
+    public void distributeOverConsumerQueues(OutGoingMessage message, Deliverable deliverable) {
         List<ConsumerInfo> consumerInfoList = message.getConsumerInfoList();
 
         for (ConsumerInfo consumer : consumerInfoList) {
-
-            sendToConsumerHandler(consumer, message);
-
+            sendToConsumerHandler(consumer, message, deliverable);
         }
-
     }
 
-    private ConsumerHandler sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message) {
+    private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message,
+            Deliverable deliverable) {
 
         String consumerUrl = consumer.getConsumerEprStr();
 
         LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
                 message.getAdditionalMessageContent());
 
-        ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
-
-        if (handler == null) {
-            handler = new ConsumerHandler(getNextConsumerHandlerId(), consumerUrl, configManager, urlManager);
-            activeConsumerHanders.put(consumerUrl, handler);
-            handler.submitMessage(lwm); // import to submit before execute.
-            threadCrew.submitTask(handler);
-            // (to remove a possible race
-            // condition)
-        } else {
-            handler.submitMessage(lwm);
-        }
-
-        return handler;
-
-    }
-
-    private long getNextConsumerHandlerId() {
-        return ++consumerHandlerIdCounter;
-    }
-
-    class ConsumerHandler implements RunnableEx {
-
-        LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
-
-        final long id;
-        int batchSize;
-
-        ThreadLocal<SenderUtils> threadlocalSender = new ThreadLocal<SenderUtils>();
-
-        // SenderUtils sender = null;
-        String consumerUrl;
-
-        ConfigurationManager configMan;
-        ConsumerUrlManager consumerURLManager;
-
-        public ConsumerHandler(long handlerId, String url, ConfigurationManager config, ConsumerUrlManager urlMan) {
-
-            configMan = config;
-            consumerURLManager = urlMan;
-            // sender = new SenderUtils(urlMan, config, true);
-            id = handlerId;
-            consumerUrl = url;
-
-            batchSize = config.getConfig(WsmgCommonConstants.CONFIG_SENDING_BATCH_SIZE,
-                    WsmgCommonConstants.DEFAULT_SENDING_BATCH_SIZE);
-        }
-
-        public long getId() {
-            return id;
-        }
-
-        public String getConsumerUrl() {
-            return consumerUrl;
-        }
-
-        private SenderUtils getSender() {
-
-            SenderUtils s = threadlocalSender.get();
-
-            if (s == null) {
-                s = new SenderUtils(consumerURLManager, configMan);
-                threadlocalSender.set(s);
-            }
-
-            return s;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-
-            if (o instanceof ConsumerHandler) {
-                ConsumerHandler h = (ConsumerHandler) o;
-                return h.getId() == id && h.getConsumerUrl().equals(this.getConsumerUrl());
+        synchronized (activeConsumerHanders) {
+            FixedParallelConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+            if (handler == null) {
+                handler = new FixedParallelConsumerHandler(consumerHandlerIdCounter++, consumerUrl, deliverable);
+                activeConsumerHanders.put(consumerUrl, handler);
+                handler.submitMessage(lwm);
+                threadPool.submit(handler);
+            } else {
+                handler.submitMessage(lwm);
             }
+        }        
+    }
 
-            return false;
+    public void removeFromList(ConsumerHandler h) {
+        if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
+            log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s", h.getId(),
+                    h.getConsumerUrl()));
         }
+    }
+    
+    class FixedParallelConsumerHandler extends ConsumerHandler {
 
-        public void submitMessage(LightweightMsg msg) {
-            queue.add(msg);
+        public FixedParallelConsumerHandler(long handlerId, String url, Deliverable deliverable) {
+            super(handlerId, url, deliverable);
         }
 
         public void run() {
 
-            if (log.isDebugEnabled())
-                log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
+            log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
 
-            LinkedList<LightweightMsg> localList = new LinkedList<LightweightMsg>();
+            ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
 
             queue.drainTo(localList, batchSize);
 
             send(localList);
             localList.clear();
 
-            if (log.isDebugEnabled())
-                log.debug(String.format("calling on completion from : %d,", getId()));
-
-        }
-
-        private void send(LinkedList<LightweightMsg> list) {
-
-            SenderUtils s = getSender();
-
-            while (!list.isEmpty()) {
-
-                LightweightMsg m = list.removeFirst();
-
-                try {
-                    OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
-
-                    s.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
-
-                } catch (Exception e) {
-                    log.error(e.getMessage(), e);
+            log.debug(String.format("calling on completion from : %d,", getId()));
+            
+            
+            /*
+             * Remove handler if there is no message
+             */
+            synchronized (activeConsumerHanders) {  
+                if(queue.size() == 0){
+                    removeFromList(this);
                 }
-
             }
-
         }
     }
-
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java Fri Sep 23 23:37:01 2011
@@ -21,211 +21,109 @@
 
 package org.apache.airavata.wsmg.messenger.strategy.impl;
 
-import java.io.StringReader;
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.commons.CommonRoutines;
 import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
-import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.Deliverable;
 import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
-import org.apache.axiom.om.OMElement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ParallelSender extends Thread implements SendingStrategy {
+/**
+ * Each subscriber (URL Endpoint) will have its own thread to send a message to
+ * 
+ */
+public class ParallelSender implements SendingStrategy {
 
     private static final Logger log = LoggerFactory.getLogger(ParallelSender.class);
 
-    private ConcurrentHashMap<String, ConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ConsumerHandler>();
+    private ConcurrentHashMap<String, ParallelConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ParallelConsumerHandler>();
 
-    private final ExecutorService threadPool;
-    private long consumerHandlerIdCounter = 0L;
-    private boolean stopFlag = false;
-
-    private ConsumerUrlManager urlManager = null;
-    private ConfigurationManager configManager = null;
-
-    private ConsumerHandlerCompletionCallback consumerCallback = new ConsumerHandlerCompletionCallback() {
-
-        public void onCompletion(ConsumerHandler h) {
-
-            if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
-
-                if (log.isDebugEnabled())
-                    log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s",
-                            h.getId(), h.getConsumerUrl()));
-            }
-
-        }
-    };
-
-    public ParallelSender(ConfigurationManager config, ConsumerUrlManager urlMan) {
-        urlManager = urlMan;
-        configManager = config;
-
-        threadPool = Executors.newCachedThreadPool();
+    private ExecutorService threadPool;
+    private long consumerHandlerIdCounter;
 
+    public void init() {
+        this.threadPool = Executors.newCachedThreadPool();
     }
 
-    public void shutdown() {
-        stopFlag = true;
+    public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+        distributeOverConsumerQueues(outMessage, deliverable);
     }
 
-    public void run() {
-        int dequeuedMessageCounter = 0;
-
-        while (!stopFlag) {
-
-            try {
-
-                if (log.isDebugEnabled())
-                    log.debug("before dequeue -  delivery thread");
-
-                OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
-
-                if (WSMGParameter.showTrackId)
-                    log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
-                            + ": dequeued from outgoing queue");
-
-                distributeOverConsumerQueues(outGoingMessage);
-
-            } catch (Exception e) {
-                log.error("Unexpected_exception:", e);
-            }
-
-            dequeuedMessageCounter++;
-        }
-
+    public void shutdown() {
         threadPool.shutdown();
-
     }
 
-    public void distributeOverConsumerQueues(OutGoingMessage message) {
+    public void distributeOverConsumerQueues(OutGoingMessage message, Deliverable deliverable) {
         List<ConsumerInfo> consumerInfoList = message.getConsumerInfoList();
-
         for (ConsumerInfo consumer : consumerInfoList) {
-
-            sendToConsumerHandler(consumer, message);
-
+            sendToConsumerHandler(consumer, message, deliverable);
         }
-
     }
 
-    private ConsumerHandler sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message) {
-
+    private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) {
         String consumerUrl = consumer.getConsumerEprStr();
 
         LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
                 message.getAdditionalMessageContent());
 
-        ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
-
-        if (handler == null || (!handler.isActive())) {
-            handler = new ConsumerHandler(getNextConsumerHandlerId(), consumerUrl, consumerCallback, configManager,
-                    urlManager);
+        ParallelConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+        if (handler == null || !handler.isActive()) {
+            handler = new ParallelConsumerHandler(consumerHandlerIdCounter++, consumerUrl, deliverable);
             activeConsumerHanders.put(consumerUrl, handler);
-            handler.submitMessage(lwm); // import to submit before execute.
-            // (to remove a possible race
-            // condition)
-            threadPool.execute(handler);
+            handler.submitMessage(lwm);
+            threadPool.submit(handler);
         } else {
             handler.submitMessage(lwm);
         }
-
-        return handler;
-    }
-
-    private long getNextConsumerHandlerId() {
-        return ++consumerHandlerIdCounter;
     }
 
-    interface ConsumerHandlerCompletionCallback {
-
-        public void onCompletion(ConsumerHandler h);
-
+    public void removeFromList(ConsumerHandler h) {
+        if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
+            log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s", h.getId(),
+                    h.getConsumerUrl()));
+        }
     }
 
-    class ConsumerHandler implements Runnable {
-
-        LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
+    class ParallelConsumerHandler extends ConsumerHandler {
 
-        ReadWriteLock activeLock = new ReentrantReadWriteLock();
+        private ReadWriteLock activeLock = new ReentrantReadWriteLock();
 
-        final long id;
-        final int MAX_UNSUCCESSFULL_DRAINS = 3;
-        final int SLEEP_TIME_SECONDS = 1;
-        int numberOfUnsuccessfullDrainAttempts = 0;
+        private static final int MAX_UNSUCCESSFULL_DRAINS = 3;
+        private static final int SLEEP_TIME_SECONDS = 1;
+        private int numberOfUnsuccessfullDrainAttempts = 0;
 
-        boolean active = true;
+        private boolean active;
 
-        ConsumerHandlerCompletionCallback callback = null;
-        SenderUtils sender = null;
-        String consumerUrl;
-
-        public ConsumerHandler(long handlerId, String url, ConsumerHandlerCompletionCallback c,
-                ConfigurationManager config, ConsumerUrlManager urlMan) {
-            callback = c;
-            sender = new SenderUtils(urlMan, config);
-            id = handlerId;
-            consumerUrl = url;
-        }
-
-        public long getId() {
-            return id;
-        }
-
-        public String getConsumerUrl() {
-            return consumerUrl;
+        public ParallelConsumerHandler(long handlerId, String url, Deliverable deliverable) {
+            super(handlerId, url, deliverable);
         }
 
         public boolean isActive() {
-
             boolean ret = false;
-
             activeLock.readLock().lock();
             try {
                 ret = active;
             } finally {
                 activeLock.readLock().unlock();
             }
-
             return ret;
         }
 
-        @Override
-        public boolean equals(Object o) {
-
-            if (o instanceof ConsumerHandler) {
-                ConsumerHandler h = (ConsumerHandler) o;
-                return h.getId() == id && h.getConsumerUrl().equals(this.getConsumerUrl());
-            }
-
-            return false;
-        }
-
-        public void submitMessage(LightweightMsg msg) {
-            queue.add(msg);
-        }
-
         public void run() {
+            this.active = true;
 
-            if (log.isDebugEnabled())
-                log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
-
-            LinkedList<LightweightMsg> localList = new LinkedList<LightweightMsg>();
+            log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
 
+            ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
             while (active) {
 
                 int drainedMsgs = 0;
@@ -241,11 +139,10 @@ public class ParallelSender extends Thre
                     }
 
                     if (numberOfUnsuccessfullDrainAttempts >= MAX_UNSUCCESSFULL_DRAINS) {
-
                         log.debug(String.format("inactivating, %d", getId()));
-
                         active = false;
                         numberOfUnsuccessfullDrainAttempts = 0;
+                        break;
                     }
 
                 } finally {
@@ -258,38 +155,16 @@ public class ParallelSender extends Thre
                 if (numberOfUnsuccessfullDrainAttempts > 0) {
                     waitForMessages();
                 }
-
             }
 
-            if (log.isDebugEnabled())
-                log.debug(String.format("calling on completion from : %d,", getId()));
+            log.debug(String.format("calling on completion from : %d,", getId()));
 
-            callback.onCompletion(this);
-
-        }
-
-        private void send(LinkedList<LightweightMsg> list) {
-
-            while (!list.isEmpty()) {
-
-                LightweightMsg m = list.removeFirst();
-
-                try {
-                    OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
-
-                    sender.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
-
-                } catch (Exception e) {
-                    log.error(e.getMessage(), e);
-                }
-
-            }
+            removeFromList(this);
 
         }
 
         private void waitForMessages() {
             try {
-
                 TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS);
                 log.debug("finished - waiting for messages");
             } catch (InterruptedException e) {

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java Fri Sep 23 23:37:01 2011
@@ -30,82 +30,46 @@ import org.apache.airavata.wsmg.broker.A
 import org.apache.airavata.wsmg.broker.ConsumerInfo;
 import org.apache.airavata.wsmg.commons.CommonRoutines;
 import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
-import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.Deliverable;
 import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
 import org.apache.axiom.om.OMElement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SerialSender extends Thread implements SendingStrategy {
+public class SerialSender implements SendingStrategy {
 
     private static final Logger log = LoggerFactory.getLogger(SerialSender.class);
-
-    private boolean stopFlag = false;
-
-    SenderUtils sender;
-
-    public SerialSender(ConfigurationManager config, ConsumerUrlManager urlman) {
-        sender = new SenderUtils(urlman, config);
+    
+    public void init() {   
     }
-
+    
     public void shutdown() {
-        stopFlag = true;
-        log.info("delivery thread termination notificaiton recieved");
-    }
-
-    public void run() {
-
-        log.debug("run - delivery thread");
-
-        while (!stopFlag) {
-
-            try {
-
-                OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
-
-                if (WSMGParameter.showTrackId)
-                    log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
-                            + ": dequeued from outgoing queue");
-
-                sendNotification(outGoingMessage);
-
-            } catch (Exception e) {
-                log.error("Unexpected_exception:", e);
-            }
-        }
+    }   
+    
+    public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+        sendNotification(outMessage, deliverable);
     }
 
-    public synchronized void sendNotification(OutGoingMessage outGoingMessage) {
+    public void sendNotification(OutGoingMessage outGoingMessage, Deliverable deliverable) {
 
         if (outGoingMessage == null) {
-            log.error("got a null outgoing message");
+            log.error("Got a null outgoing message");
             return;
         }
         String messageString = outGoingMessage.getTextMessage();
 
         List<ConsumerInfo> consumerInfoList = outGoingMessage.getConsumerInfoList();
         AdditionalMessageContent soapHeader = outGoingMessage.getAdditionalMessageContent();
-        deliverMessage(consumerInfoList, messageString, soapHeader);
-    }
-
-    private void deliverMessage(List<ConsumerInfo> consumerInfoList, String messageString,
-            AdditionalMessageContent additionalMessageContent) {
-
+        
         try {
             OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(messageString));
 
             for (ConsumerInfo obj : consumerInfoList) {
-
-                sender.send(obj, messgae2Send, additionalMessageContent);
-
+                deliverable.send(obj, messgae2Send, soapHeader);
             }
 
         } catch (XMLStreamException e) {
             log.error(e.getMessage(), e);
-        }
-
+        }        
     }
 }