You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/24 01:37:02 UTC
svn commit: r1175072 - in
/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg:
broker/ messenger/ messenger/protocol/ messenger/protocol/impl/
messenger/strategy/ messenger/strategy/impl/
Author: patanachai
Date: Fri Sep 23 23:37:01 2011
New Revision: 1175072
URL: http://svn.apache.org/viewvc?rev=1175072&view=rev
Log:
AIRAVATA-101 Change Thread Pool in FixedParallelSender to Standard Java Thread Pool.
Added:
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java
- copied, changed from r1174883, incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
Removed:
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ThreadCrew.java
Modified:
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java Fri Sep 23 23:37:01 2011
@@ -22,6 +22,7 @@
package org.apache.airavata.wsmg.broker;
import java.io.File;
+import java.lang.reflect.Constructor;
import org.apache.airavata.wsmg.broker.handler.PublishedMessageHandler;
import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
@@ -33,6 +34,11 @@ import org.apache.airavata.wsmg.commons.
import org.apache.airavata.wsmg.config.WSMGParameter;
import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.airavata.wsmg.messenger.DeliveryProcessor;
+import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.airavata.wsmg.messenger.protocol.impl.Axis2Protocol;
import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
import org.apache.airavata.wsmg.messenger.strategy.impl.FixedParallelSender;
import org.apache.airavata.wsmg.messenger.strategy.impl.ParallelSender;
@@ -47,12 +53,15 @@ import org.slf4j.LoggerFactory;
public class BrokerServiceLifeCycle implements ServiceLifeCycle {
private static final Logger log = LoggerFactory.getLogger(BrokerServiceLifeCycle.class);
- private SendingStrategy method = null;
+
+ private static final long DEFAULT_SOCKET_TIME_OUT = 20000l;
+
+ private DeliveryProcessor proc;
public void shutDown(ConfigurationContext arg, AxisService service) {
log.info("broker shutting down");
- if (method != null) {
- method.shutdown();
+ if (proc != null) {
+ proc.stop();
}
}
@@ -147,28 +156,57 @@ public class BrokerServiceLifeCycle impl
}
return;
}
+
+ /*
+ * Create Protocol
+ */
+ DeliveryProtocol protocol;
+ String protocolClass = configMan.getConfig(WsmgCommonConstants.DELIVERY_PROTOCOL,
+ Axis2Protocol.class.getName());
+ try {
+ Class cl = Class.forName(protocolClass);
+ Constructor<DeliveryProtocol> co = cl.getConstructor(null);
+ protocol = co.newInstance((Object[]) null);
+
+ } catch (Exception e) {
+ log.error("Cannot initial protocol sender", e);
+ return;
+ }
+ protocol.setTimeout(configMan.getConfig(WsmgCommonConstants.CONFIG_SOCKET_TIME_OUT, DEFAULT_SOCKET_TIME_OUT));
+ /*
+ * Create delivery method
+ */
+ SendingStrategy method = null;
+ String initedmethod = null;
String deliveryMethod = configMan.getConfig(WsmgCommonConstants.CONFIG_DELIVERY_METHOD,
WsmgCommonConstants.DELIVERY_METHOD_SERIAL);
-
- ConsumerUrlManager urlManager = new ConsumerUrlManager(configMan);
-
- String initedmethod = null;
-
if (WsmgCommonConstants.DELIVERY_METHOD_PARALLEL.equalsIgnoreCase(deliveryMethod)) {
-
- method = new ParallelSender(configMan, urlManager);
+ method = new ParallelSender();
initedmethod = WsmgCommonConstants.DELIVERY_METHOD_PARALLEL;
-
+
} else if (WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW.equalsIgnoreCase(deliveryMethod)) {
- method = new FixedParallelSender(configMan, urlManager);
+ int poolsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_THREAD_POOL_SIZE,
+ WsmgCommonConstants.DEFAULT_SENDING_THREAD_POOL_SIZE);
+ int batchsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_BATCH_SIZE,
+ WsmgCommonConstants.DEFAULT_SENDING_BATCH_SIZE);
+ method = new FixedParallelSender(poolsize, batchsize);
initedmethod = WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW;
+
} else {
- method = new SerialSender(configMan, urlManager);
+ method = new SerialSender();
initedmethod = WsmgCommonConstants.DELIVERY_METHOD_SERIAL;
- }
-
- method.start();
+ }
+
+ /*
+ * Create Deliverable
+ */
+ ConsumerUrlManager urlManager = new ConsumerUrlManager(configMan);
+ Deliverable senderUtils = new SenderUtils(urlManager);
+ senderUtils.setProtocol(protocol);
+
+ DeliveryProcessor proc = new DeliveryProcessor(senderUtils, method);
+ proc.start();
log.info(initedmethod + " sending method inited");
}
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java Fri Sep 23 23:37:01 2011
@@ -37,6 +37,9 @@ import org.apache.axis2.addressing.Endpo
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/*
+ * FIXME: need thread safe version
+ */
public class ConsumerUrlManager {
private static final Logger logger = LoggerFactory.getLogger(ConsumerUrlManager.class);
Added: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java?rev=1175072&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java (added)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java Fri Sep 23 23:37:01 2011
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.wsmg.messenger;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.axiom.om.OMElement;
+
+public interface Deliverable {
+ void setProtocol(DeliveryProtocol protocol);
+
+ void send(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent);
+}
Added: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java?rev=1175072&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java (added)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java Fri Sep 23 23:37:01 2011
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.wsmg.messenger;
+
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeliveryProcessor{
+
+ private static final Logger logger = LoggerFactory.getLogger(DeliveryProcessor.class);
+
+ private SendingStrategy strategy;
+ private Deliverable deliverable;
+
+ private boolean running;
+ private Thread t;
+
+ public DeliveryProcessor(Deliverable deliverable, SendingStrategy strategy) {
+ this.strategy = strategy;
+ this.deliverable = deliverable;
+ }
+
+ public void start() {
+ this.running = true;
+ t = new Thread(new CheckingAndSending());
+ t.start();
+ }
+
+ public void stop() {
+ this.running = false;
+ }
+
+ private class CheckingAndSending implements Runnable {
+
+ public void run() {
+ strategy.init();
+ while (running) {
+ logger.debug("run - delivery thread");
+ try {
+
+ OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
+
+ if (WSMGParameter.showTrackId)
+ logger.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
+ + ": dequeued from outgoing queue");
+
+ strategy.addMessageToSend(outGoingMessage, deliverable);
+
+ } catch (Exception e) {
+ logger.error("Unexpected_exception:", e);
+ }
+ }
+ strategy.shutdown();
+ }
+ }
+}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java Fri Sep 23 23:37:01 2011
@@ -22,18 +22,14 @@
package org.apache.airavata.wsmg.messenger;
import java.io.StringReader;
-import java.lang.reflect.Constructor;
import javax.xml.stream.XMLStreamException;
import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
import org.apache.airavata.wsmg.broker.ConsumerInfo;
import org.apache.airavata.wsmg.commons.CommonRoutines;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.messenger.protocol.Axis2Protocol;
import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
import org.apache.airavata.wsmg.messenger.protocol.SendingException;
import org.apache.axiom.om.OMAbstractFactory;
@@ -46,38 +42,22 @@ import org.slf4j.LoggerFactory;
/*
* this class is not thread safe
* */
-
-public class SenderUtils {
+public class SenderUtils implements Deliverable {
private static final Logger logger = LoggerFactory.getLogger(SenderUtils.class);
- OMFactory factory = OMAbstractFactory.getOMFactory();
-
- ConsumerUrlManager urlManager;
+ private static OMFactory factory = OMAbstractFactory.getOMFactory();
- DeliveryProtocol protocol;
+ private ConsumerUrlManager urlManager;
+
+ private DeliveryProtocol protocol;
- public SenderUtils(ConsumerUrlManager urlMan, ConfigurationManager config) {
+ public SenderUtils(ConsumerUrlManager urlMan) {
urlManager = urlMan;
-
- /*
- * Invoke factory and config
- */
- String protocolClass = config.getConfig(WsmgCommonConstants.DELIVERY_PROTOCOL,
- "org.apache.airavata.wsmg.messenger.protocol.Axis2Protocol");
- try {
- Class cl = Class.forName(protocolClass);
- Constructor<DeliveryProtocol> co = cl.getConstructor(null);
- protocol = co.newInstance((Object[]) null);
-
- } catch (Exception e) {
- // fallback to normal class
- logger.error("Cannot initial protocol sender", e);
- protocol = new Axis2Protocol();
- }
-
- protocol.setTimeout(Long.parseLong(config.getConfig(WsmgCommonConstants.CONFIG_SOCKET_TIME_OUT, "20000")));
-
+ }
+
+ public void setProtocol(DeliveryProtocol protocol){
+ this.protocol = protocol;
}
public void send(ConsumerInfo consumerInfo, OMElement notificationMessageBodyEl,
Copied: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java (from r1174883, incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java)
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java?p2=incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java&p1=incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java&r1=1174883&r2=1175072&rev=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java Fri Sep 23 23:37:01 2011
@@ -19,7 +19,7 @@
*
*/
-package org.apache.airavata.wsmg.messenger.protocol;
+package org.apache.airavata.wsmg.messenger.protocol.impl;
import java.io.StringReader;
import java.util.LinkedList;
@@ -31,6 +31,8 @@ import org.apache.airavata.wsmg.broker.A
import org.apache.airavata.wsmg.broker.ConsumerInfo;
import org.apache.airavata.wsmg.commons.CommonRoutines;
import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.airavata.wsmg.messenger.protocol.SendingException;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.util.ElementHelper;
@@ -40,6 +42,7 @@ import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.transport.http.HTTPConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,15 +50,11 @@ public class Axis2Protocol implements De
private static final Logger logger = LoggerFactory.getLogger(Axis2Protocol.class);
- SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();
+ private static SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();
- ServiceClient nonThreadLocalServiceClient = null;
+ private ServiceClient nonThreadLocalServiceClient;
- long tcpConnectionTimeout = 0;
-
- public void setTimeout(long timeout) {
- this.tcpConnectionTimeout = timeout;
- }
+ long tcpConnectionTimeout;
public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent)
throws SendingException {
@@ -82,36 +81,35 @@ public class Axis2Protocol implements De
}
}
+ ServiceClient client = null;
try {
- ServiceClient client = configureServiceClient(actionString, consumerReference,
- additionalMessageContent.getMessageID(), soapHeaders);
+ client = configureServiceClient(actionString, consumerReference, additionalMessageContent.getMessageID(),
+ soapHeaders);
client.sendRobust(message);
- client.cleanupTransport();
} catch (AxisFault ex) {
throw new SendingException(ex.getCause());
+ } finally {
+ if (client != null) {
+ try {
+ client.cleanupTransport();
+ } catch (AxisFault ex) {
+ logger.error(ex.getMessage(), ex);
+ }
+ }
}
}
- private ServiceClient getServiceClient() throws AxisFault {
-
- ServiceClient ret = nonThreadLocalServiceClient;
- if (ret == null) {
- ret = new ServiceClient();
-
- nonThreadLocalServiceClient = ret;
- }
- ret.removeHeaders();
- return ret;
+ public void setTimeout(long timeout) {
+ this.tcpConnectionTimeout = timeout;
}
private ServiceClient configureServiceClient(String action, EndpointReference consumerLocation, String msgId,
List<OMElement> soapHeaders) throws AxisFault {
// not engaging addressing modules
-
ServiceClient client = getServiceClient();
SOAPHeaderBlock msgIdEl = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
@@ -127,15 +125,12 @@ public class Axis2Protocol implements De
client.addHeader(to);
for (OMElement omHeader : soapHeaders) {
-
try {
SOAPHeaderBlock headerBlock = ElementHelper.toSOAPHeaderBlock(omHeader, soapfactory);
-
client.addHeader(headerBlock);
} catch (Exception e) {
throw AxisFault.makeFault(e);
}
-
}
Options opts = new Options();
@@ -143,12 +138,18 @@ public class Axis2Protocol implements De
opts.setMessageId(msgId);
opts.setTo(consumerLocation);
opts.setAction(action);
- opts.setProperty(org.apache.axis2.transport.http.HTTPConstants.CHUNKED, Boolean.FALSE);
-
- opts.setProperty(org.apache.axis2.transport.http.HTTPConstants.HTTP_PROTOCOL_VERSION,
- org.apache.axis2.transport.http.HTTPConstants.HEADER_PROTOCOL_10);
+ opts.setProperty(HTTPConstants.CHUNKED, Boolean.FALSE);
+ opts.setProperty(HTTPConstants.HTTP_PROTOCOL_VERSION, HTTPConstants.HEADER_PROTOCOL_10);
client.setOptions(opts);
return client;
}
+
+ private ServiceClient getServiceClient() throws AxisFault {
+ if (nonThreadLocalServiceClient == null) {
+ nonThreadLocalServiceClient = new ServiceClient();
+ }
+ nonThreadLocalServiceClient.removeHeaders();
+ return nonThreadLocalServiceClient;
+ }
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java Fri Sep 23 23:37:01 2011
@@ -21,8 +21,11 @@
package org.apache.airavata.wsmg.messenger.strategy;
-public interface SendingStrategy {
- public void start();
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.messenger.Deliverable;
- public void shutdown();
+public interface SendingStrategy {
+ void init();
+ void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable);
+ void shutdown();
}
Added: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java?rev=1175072&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java (added)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java Fri Sep 23 23:37:01 2011
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ConsumerHandler implements Runnable {
+
+ private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class);
+
+ protected LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
+
+ private final long id;
+
+ private String consumerUrl;
+
+ private Deliverable deliverable;
+
+ public ConsumerHandler(long handlerId, String url, Deliverable deliverable) {
+ id = handlerId;
+ consumerUrl = url;
+ this.deliverable = deliverable;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String getConsumerUrl() {
+ return consumerUrl;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ConsumerHandler) {
+ ConsumerHandler h = (ConsumerHandler) o;
+ return h.getId() == this.id && h.getConsumerUrl().equals(this.getConsumerUrl());
+ }
+ return false;
+ }
+
+ public void submitMessage(LightweightMsg msg) {
+ try {
+ queue.put(msg);
+ } catch (InterruptedException e) {
+ log.error("Interrupted when trying to add message");
+ }
+ }
+
+ protected void send(List<LightweightMsg> list) {
+ for (LightweightMsg m : list) {
+ try {
+ OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
+ deliverable.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java Fri Sep 23 23:37:01 2011
@@ -21,221 +21,112 @@
package org.apache.airavata.wsmg.messenger.strategy.impl;
-import java.io.StringReader;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.commons.CommonRoutines;
import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
-import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.Deliverable;
import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
-import org.apache.axiom.om.OMElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FixedParallelSender extends Thread implements SendingStrategy {
+public class FixedParallelSender implements SendingStrategy {
private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class);
- private ConcurrentHashMap<String, ConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ConsumerHandler>();
-
- private ThreadCrew threadCrew = null;
-
- private ConsumerUrlManager urlManager = null;
- private ConfigurationManager configManager = null;
+ private ConcurrentHashMap<String, FixedParallelConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, FixedParallelConsumerHandler>();
private long consumerHandlerIdCounter;
- private boolean stopFlag = false;
-
- public FixedParallelSender(ConfigurationManager config, ConsumerUrlManager urlMan) {
-
- int poolSize = config.getConfig(WsmgCommonConstants.CONFIG_SENDING_THREAD_POOL_SIZE,
- WsmgCommonConstants.DEFAULT_SENDING_THREAD_POOL_SIZE);
+ private int batchSize;
- threadCrew = new ThreadCrew(poolSize);
- urlManager = urlMan;
- configManager = config;
- }
+ private ExecutorService threadPool;
- public void shutdown() {
- stopFlag = true;
+ public FixedParallelSender(int poolsize, int batchsize) {
+ this.threadPool = Executors.newFixedThreadPool(poolsize);
+ this.batchSize = batchsize;
}
- public void run() {
- int dequeuedMessageCounter = 0;
-
- while (!stopFlag) {
-
- try {
+ public void init() {
- if (log.isDebugEnabled())
- log.debug("before dequeue - delivery thread");
-
- OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
-
- if (WSMGParameter.showTrackId)
- log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
- + ": dequeued from outgoing queue");
-
- distributeOverConsumerQueues(outGoingMessage);
-
- } catch (Exception e) {
- log.error("Unexpected_exception:", e);
- }
-
- dequeuedMessageCounter++;
- }
+ }
- threadCrew.stop();
+ public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+ distributeOverConsumerQueues(outMessage, deliverable);
+ }
+ public void shutdown() {
+ threadPool.shutdown();
}
- public void distributeOverConsumerQueues(OutGoingMessage message) {
+ public void distributeOverConsumerQueues(OutGoingMessage message, Deliverable deliverable) {
List<ConsumerInfo> consumerInfoList = message.getConsumerInfoList();
for (ConsumerInfo consumer : consumerInfoList) {
-
- sendToConsumerHandler(consumer, message);
-
+ sendToConsumerHandler(consumer, message, deliverable);
}
-
}
- private ConsumerHandler sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message) {
+ private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message,
+ Deliverable deliverable) {
String consumerUrl = consumer.getConsumerEprStr();
LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
message.getAdditionalMessageContent());
- ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
-
- if (handler == null) {
- handler = new ConsumerHandler(getNextConsumerHandlerId(), consumerUrl, configManager, urlManager);
- activeConsumerHanders.put(consumerUrl, handler);
- handler.submitMessage(lwm); // import to submit before execute.
- threadCrew.submitTask(handler);
- // (to remove a possible race
- // condition)
- } else {
- handler.submitMessage(lwm);
- }
-
- return handler;
-
- }
-
- private long getNextConsumerHandlerId() {
- return ++consumerHandlerIdCounter;
- }
-
- class ConsumerHandler implements RunnableEx {
-
- LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
-
- final long id;
- int batchSize;
-
- ThreadLocal<SenderUtils> threadlocalSender = new ThreadLocal<SenderUtils>();
-
- // SenderUtils sender = null;
- String consumerUrl;
-
- ConfigurationManager configMan;
- ConsumerUrlManager consumerURLManager;
-
- public ConsumerHandler(long handlerId, String url, ConfigurationManager config, ConsumerUrlManager urlMan) {
-
- configMan = config;
- consumerURLManager = urlMan;
- // sender = new SenderUtils(urlMan, config, true);
- id = handlerId;
- consumerUrl = url;
-
- batchSize = config.getConfig(WsmgCommonConstants.CONFIG_SENDING_BATCH_SIZE,
- WsmgCommonConstants.DEFAULT_SENDING_BATCH_SIZE);
- }
-
- public long getId() {
- return id;
- }
-
- public String getConsumerUrl() {
- return consumerUrl;
- }
-
- private SenderUtils getSender() {
-
- SenderUtils s = threadlocalSender.get();
-
- if (s == null) {
- s = new SenderUtils(consumerURLManager, configMan);
- threadlocalSender.set(s);
- }
-
- return s;
- }
-
- @Override
- public boolean equals(Object o) {
-
- if (o instanceof ConsumerHandler) {
- ConsumerHandler h = (ConsumerHandler) o;
- return h.getId() == id && h.getConsumerUrl().equals(this.getConsumerUrl());
+ synchronized (activeConsumerHanders) {
+ FixedParallelConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+ if (handler == null) {
+ handler = new FixedParallelConsumerHandler(consumerHandlerIdCounter++, consumerUrl, deliverable);
+ activeConsumerHanders.put(consumerUrl, handler);
+ handler.submitMessage(lwm);
+ threadPool.submit(handler);
+ } else {
+ handler.submitMessage(lwm);
}
+ }
+ }
- return false;
+ public void removeFromList(ConsumerHandler h) {
+ if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
+ log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s", h.getId(),
+ h.getConsumerUrl()));
}
+ }
+
+ class FixedParallelConsumerHandler extends ConsumerHandler {
- public void submitMessage(LightweightMsg msg) {
- queue.add(msg);
+ public FixedParallelConsumerHandler(long handlerId, String url, Deliverable deliverable) {
+ super(handlerId, url, deliverable);
}
public void run() {
- if (log.isDebugEnabled())
- log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
+ log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
- LinkedList<LightweightMsg> localList = new LinkedList<LightweightMsg>();
+ ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
queue.drainTo(localList, batchSize);
send(localList);
localList.clear();
- if (log.isDebugEnabled())
- log.debug(String.format("calling on completion from : %d,", getId()));
-
- }
-
- private void send(LinkedList<LightweightMsg> list) {
-
- SenderUtils s = getSender();
-
- while (!list.isEmpty()) {
-
- LightweightMsg m = list.removeFirst();
-
- try {
- OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
-
- s.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
-
- } catch (Exception e) {
- log.error(e.getMessage(), e);
+ log.debug(String.format("calling on completion from : %d,", getId()));
+
+
+ /*
+ * Remove handler if there is no message
+ */
+ synchronized (activeConsumerHanders) {
+ if(queue.size() == 0){
+ removeFromList(this);
}
-
}
-
}
}
-
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java Fri Sep 23 23:37:01 2011
@@ -21,211 +21,109 @@
package org.apache.airavata.wsmg.messenger.strategy.impl;
-import java.io.StringReader;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.commons.CommonRoutines;
import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
-import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.Deliverable;
import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
-import org.apache.axiom.om.OMElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ParallelSender extends Thread implements SendingStrategy {
+/**
+ * Each subscriber (URL Endpoint) will have its own thread to send a message to
+ *
+ */
+public class ParallelSender implements SendingStrategy {
private static final Logger log = LoggerFactory.getLogger(ParallelSender.class);
- private ConcurrentHashMap<String, ConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ConsumerHandler>();
+ private ConcurrentHashMap<String, ParallelConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ParallelConsumerHandler>();
- private final ExecutorService threadPool;
- private long consumerHandlerIdCounter = 0L;
- private boolean stopFlag = false;
-
- private ConsumerUrlManager urlManager = null;
- private ConfigurationManager configManager = null;
-
- private ConsumerHandlerCompletionCallback consumerCallback = new ConsumerHandlerCompletionCallback() {
-
- public void onCompletion(ConsumerHandler h) {
-
- if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
-
- if (log.isDebugEnabled())
- log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s",
- h.getId(), h.getConsumerUrl()));
- }
-
- }
- };
-
- public ParallelSender(ConfigurationManager config, ConsumerUrlManager urlMan) {
- urlManager = urlMan;
- configManager = config;
-
- threadPool = Executors.newCachedThreadPool();
+ private ExecutorService threadPool;
+ private long consumerHandlerIdCounter;
+ public void init() {
+ this.threadPool = Executors.newCachedThreadPool();
}
- public void shutdown() {
- stopFlag = true;
+ public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+ distributeOverConsumerQueues(outMessage, deliverable);
}
- public void run() {
- int dequeuedMessageCounter = 0;
-
- while (!stopFlag) {
-
- try {
-
- if (log.isDebugEnabled())
- log.debug("before dequeue - delivery thread");
-
- OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
-
- if (WSMGParameter.showTrackId)
- log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
- + ": dequeued from outgoing queue");
-
- distributeOverConsumerQueues(outGoingMessage);
-
- } catch (Exception e) {
- log.error("Unexpected_exception:", e);
- }
-
- dequeuedMessageCounter++;
- }
-
+ public void shutdown() {
threadPool.shutdown();
-
}
- public void distributeOverConsumerQueues(OutGoingMessage message) {
+ public void distributeOverConsumerQueues(OutGoingMessage message, Deliverable deliverable) {
List<ConsumerInfo> consumerInfoList = message.getConsumerInfoList();
-
for (ConsumerInfo consumer : consumerInfoList) {
-
- sendToConsumerHandler(consumer, message);
-
+ sendToConsumerHandler(consumer, message, deliverable);
}
-
}
- private ConsumerHandler sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message) {
-
+ private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) {
String consumerUrl = consumer.getConsumerEprStr();
LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
message.getAdditionalMessageContent());
- ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
-
- if (handler == null || (!handler.isActive())) {
- handler = new ConsumerHandler(getNextConsumerHandlerId(), consumerUrl, consumerCallback, configManager,
- urlManager);
+ ParallelConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+ if (handler == null || !handler.isActive()) {
+ handler = new ParallelConsumerHandler(consumerHandlerIdCounter++, consumerUrl, deliverable);
activeConsumerHanders.put(consumerUrl, handler);
- handler.submitMessage(lwm); // import to submit before execute.
- // (to remove a possible race
- // condition)
- threadPool.execute(handler);
+ handler.submitMessage(lwm);
+ threadPool.submit(handler);
} else {
handler.submitMessage(lwm);
}
-
- return handler;
- }
-
- private long getNextConsumerHandlerId() {
- return ++consumerHandlerIdCounter;
}
- interface ConsumerHandlerCompletionCallback {
-
- public void onCompletion(ConsumerHandler h);
-
+ public void removeFromList(ConsumerHandler h) {
+ if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
+ log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s", h.getId(),
+ h.getConsumerUrl()));
+ }
}
- class ConsumerHandler implements Runnable {
-
- LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
+ class ParallelConsumerHandler extends ConsumerHandler {
- ReadWriteLock activeLock = new ReentrantReadWriteLock();
+ private ReadWriteLock activeLock = new ReentrantReadWriteLock();
- final long id;
- final int MAX_UNSUCCESSFULL_DRAINS = 3;
- final int SLEEP_TIME_SECONDS = 1;
- int numberOfUnsuccessfullDrainAttempts = 0;
+ private static final int MAX_UNSUCCESSFULL_DRAINS = 3;
+ private static final int SLEEP_TIME_SECONDS = 1;
+ private int numberOfUnsuccessfullDrainAttempts = 0;
- boolean active = true;
+ private boolean active;
- ConsumerHandlerCompletionCallback callback = null;
- SenderUtils sender = null;
- String consumerUrl;
-
- public ConsumerHandler(long handlerId, String url, ConsumerHandlerCompletionCallback c,
- ConfigurationManager config, ConsumerUrlManager urlMan) {
- callback = c;
- sender = new SenderUtils(urlMan, config);
- id = handlerId;
- consumerUrl = url;
- }
-
- public long getId() {
- return id;
- }
-
- public String getConsumerUrl() {
- return consumerUrl;
+ public ParallelConsumerHandler(long handlerId, String url, Deliverable deliverable) {
+ super(handlerId, url, deliverable);
}
public boolean isActive() {
-
boolean ret = false;
-
activeLock.readLock().lock();
try {
ret = active;
} finally {
activeLock.readLock().unlock();
}
-
return ret;
}
- @Override
- public boolean equals(Object o) {
-
- if (o instanceof ConsumerHandler) {
- ConsumerHandler h = (ConsumerHandler) o;
- return h.getId() == id && h.getConsumerUrl().equals(this.getConsumerUrl());
- }
-
- return false;
- }
-
- public void submitMessage(LightweightMsg msg) {
- queue.add(msg);
- }
-
public void run() {
+ this.active = true;
- if (log.isDebugEnabled())
- log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
-
- LinkedList<LightweightMsg> localList = new LinkedList<LightweightMsg>();
+ log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
+ ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
while (active) {
int drainedMsgs = 0;
@@ -241,11 +139,10 @@ public class ParallelSender extends Thre
}
if (numberOfUnsuccessfullDrainAttempts >= MAX_UNSUCCESSFULL_DRAINS) {
-
log.debug(String.format("inactivating, %d", getId()));
-
active = false;
numberOfUnsuccessfullDrainAttempts = 0;
+ break;
}
} finally {
@@ -258,38 +155,16 @@ public class ParallelSender extends Thre
if (numberOfUnsuccessfullDrainAttempts > 0) {
waitForMessages();
}
-
}
- if (log.isDebugEnabled())
- log.debug(String.format("calling on completion from : %d,", getId()));
+ log.debug(String.format("calling on completion from : %d,", getId()));
- callback.onCompletion(this);
-
- }
-
- private void send(LinkedList<LightweightMsg> list) {
-
- while (!list.isEmpty()) {
-
- LightweightMsg m = list.removeFirst();
-
- try {
- OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
-
- sender.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
-
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
-
- }
+ removeFromList(this);
}
private void waitForMessages() {
try {
-
TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS);
log.debug("finished - waiting for messages");
} catch (InterruptedException e) {
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java?rev=1175072&r1=1175071&r2=1175072&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java Fri Sep 23 23:37:01 2011
@@ -30,82 +30,46 @@ import org.apache.airavata.wsmg.broker.A
import org.apache.airavata.wsmg.broker.ConsumerInfo;
import org.apache.airavata.wsmg.commons.CommonRoutines;
import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
-import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.Deliverable;
import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
import org.apache.axiom.om.OMElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SerialSender extends Thread implements SendingStrategy {
+public class SerialSender implements SendingStrategy {
private static final Logger log = LoggerFactory.getLogger(SerialSender.class);
-
- private boolean stopFlag = false;
-
- SenderUtils sender;
-
- public SerialSender(ConfigurationManager config, ConsumerUrlManager urlman) {
- sender = new SenderUtils(urlman, config);
+
+ public void init() {
}
-
+
public void shutdown() {
- stopFlag = true;
- log.info("delivery thread termination notificaiton recieved");
- }
-
- public void run() {
-
- log.debug("run - delivery thread");
-
- while (!stopFlag) {
-
- try {
-
- OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
-
- if (WSMGParameter.showTrackId)
- log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
- + ": dequeued from outgoing queue");
-
- sendNotification(outGoingMessage);
-
- } catch (Exception e) {
- log.error("Unexpected_exception:", e);
- }
- }
+ }
+
+ public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+ sendNotification(outMessage, deliverable);
}
- public synchronized void sendNotification(OutGoingMessage outGoingMessage) {
+ public void sendNotification(OutGoingMessage outGoingMessage, Deliverable deliverable) {
if (outGoingMessage == null) {
- log.error("got a null outgoing message");
+ log.error("Got a null outgoing message");
return;
}
String messageString = outGoingMessage.getTextMessage();
List<ConsumerInfo> consumerInfoList = outGoingMessage.getConsumerInfoList();
AdditionalMessageContent soapHeader = outGoingMessage.getAdditionalMessageContent();
- deliverMessage(consumerInfoList, messageString, soapHeader);
- }
-
- private void deliverMessage(List<ConsumerInfo> consumerInfoList, String messageString,
- AdditionalMessageContent additionalMessageContent) {
-
+
try {
OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(messageString));
for (ConsumerInfo obj : consumerInfoList) {
-
- sender.send(obj, messgae2Send, additionalMessageContent);
-
+ deliverable.send(obj, messgae2Send, soapHeader);
}
} catch (XMLStreamException e) {
log.error(e.getMessage(), e);
- }
-
+ }
}
}