You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/26 19:30:07 UTC
svn commit: r1175962 - in
/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl:
ConsumerHandler.java FixedParallelSender.java ParallelSender.java
Author: patanachai
Date: Mon Sep 26 17:30:06 2011
New Revision: 1175962
URL: http://svn.apache.org/viewvc?rev=1175962&view=rev
Log:
AIRAVATA-101 Change FixedParallelSender and ParallelSender.
Modified:
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java?rev=1175962&r1=1175961&r2=1175962&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java Mon Sep 26 17:30:06 2011
@@ -23,6 +23,7 @@ package org.apache.airavata.wsmg.messeng
import java.io.StringReader;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.airavata.wsmg.commons.CommonRoutines;
@@ -37,33 +38,34 @@ public abstract class ConsumerHandler im
protected LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
- private final long id;
+ private final String id;
private String consumerUrl;
private Deliverable deliverable;
- public ConsumerHandler(long handlerId, String url, Deliverable deliverable) {
- id = handlerId;
- consumerUrl = url;
+ public ConsumerHandler(String url, Deliverable deliverable) {
+ this.id = UUID.randomUUID().toString();
+ this.consumerUrl = url;
this.deliverable = deliverable;
}
-
- public long getId() {
- return id;
- }
-
+
public String getConsumerUrl() {
return consumerUrl;
}
-
+
@Override
public boolean equals(Object o) {
if (o instanceof ConsumerHandler) {
ConsumerHandler h = (ConsumerHandler) o;
- return h.getId() == this.id && h.getConsumerUrl().equals(this.getConsumerUrl());
+ return this.id.equals(h.id);
}
return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.id.hashCode();
}
public void submitMessage(LightweightMsg msg) {
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java?rev=1175962&r1=1175961&r2=1175962&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java Mon Sep 26 17:30:06 2011
@@ -22,8 +22,8 @@
package org.apache.airavata.wsmg.messenger.strategy.impl;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -38,9 +38,7 @@ public class FixedParallelSender impleme
private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class);
- private ConcurrentHashMap<String, FixedParallelConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, FixedParallelConsumerHandler>();
-
- private long consumerHandlerIdCounter;
+ private HashMap<String, ConsumerHandler> activeConsumerHanders = new HashMap<String, ConsumerHandler>();
private int batchSize;
@@ -71,8 +69,7 @@ public class FixedParallelSender impleme
}
}
- private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message,
- Deliverable deliverable) {
+ private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) {
String consumerUrl = consumer.getConsumerEprStr();
@@ -80,34 +77,35 @@ public class FixedParallelSender impleme
message.getAdditionalMessageContent());
synchronized (activeConsumerHanders) {
- FixedParallelConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+ ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
if (handler == null) {
- handler = new FixedParallelConsumerHandler(consumerHandlerIdCounter++, consumerUrl, deliverable);
+ handler = new FixedParallelConsumerHandler(consumerUrl, deliverable);
activeConsumerHanders.put(consumerUrl, handler);
handler.submitMessage(lwm);
threadPool.submit(handler);
} else {
handler.submitMessage(lwm);
}
- }
+ }
}
public void removeFromList(ConsumerHandler h) {
- if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
- log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s", h.getId(),
- h.getConsumerUrl()));
+ synchronized (activeConsumerHanders) {
+ if (activeConsumerHanders.remove(h.getConsumerUrl()) != null) {
+ log.debug(String.format("inactive consumer handler is already removed: url : %s", h.getConsumerUrl()));
+ }
}
}
-
+
class FixedParallelConsumerHandler extends ConsumerHandler {
- public FixedParallelConsumerHandler(long handlerId, String url, Deliverable deliverable) {
- super(handlerId, url, deliverable);
+ public FixedParallelConsumerHandler(String url, Deliverable deliverable) {
+ super(url, deliverable);
}
public void run() {
- log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
+ log.debug(String.format("FixedParallelConsumerHandler starting: %s", getConsumerUrl()));
ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
@@ -116,16 +114,13 @@ public class FixedParallelSender impleme
send(localList);
localList.clear();
- log.debug(String.format("calling on completion from : %d,", getId()));
-
-
+ log.debug(String.format("FixedParallelConsumerHandler done: %s,", getConsumerUrl()));
+
/*
* Remove handler if there is no message
*/
- synchronized (activeConsumerHanders) {
- if(queue.size() == 0){
- removeFromList(this);
- }
+ if (queue.size() == 0) {
+ removeFromList(this);
}
}
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java?rev=1175962&r1=1175961&r2=1175962&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java Mon Sep 26 17:30:06 2011
@@ -22,13 +22,11 @@
package org.apache.airavata.wsmg.messenger.strategy.impl;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.airavata.wsmg.broker.ConsumerInfo;
import org.apache.airavata.wsmg.commons.OutGoingMessage;
@@ -45,10 +43,9 @@ public class ParallelSender implements S
private static final Logger log = LoggerFactory.getLogger(ParallelSender.class);
- private ConcurrentHashMap<String, ParallelConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ParallelConsumerHandler>();
+ private HashMap<String, ConsumerHandler> activeConsumerHanders = new HashMap<String, ConsumerHandler>();
private ExecutorService threadPool;
- private long consumerHandlerIdCounter;
public void init() {
this.threadPool = Executors.newCachedThreadPool();
@@ -75,78 +72,63 @@ public class ParallelSender implements S
LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
message.getAdditionalMessageContent());
- ParallelConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
- if (handler == null || !handler.isActive()) {
- handler = new ParallelConsumerHandler(consumerHandlerIdCounter++, consumerUrl, deliverable);
- activeConsumerHanders.put(consumerUrl, handler);
- handler.submitMessage(lwm);
- threadPool.submit(handler);
- } else {
- handler.submitMessage(lwm);
+ synchronized (activeConsumerHanders) {
+ ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+ if (handler == null) {
+ handler = new ParallelConsumerHandler(consumerUrl, deliverable);
+ activeConsumerHanders.put(consumerUrl, handler);
+ handler.submitMessage(lwm);
+ threadPool.submit(handler);
+ } else {
+ handler.submitMessage(lwm);
+ }
}
}
public void removeFromList(ConsumerHandler h) {
- if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
- log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s", h.getId(),
- h.getConsumerUrl()));
+ synchronized (activeConsumerHanders) {
+ if (activeConsumerHanders.remove(h.getConsumerUrl()) != null) {
+ log.debug(String.format("inactive consumer handler is already removed: url : %s", h.getConsumerUrl()));
+ }
}
}
class ParallelConsumerHandler extends ConsumerHandler {
- private ReadWriteLock activeLock = new ReentrantReadWriteLock();
-
private static final int MAX_UNSUCCESSFULL_DRAINS = 3;
private static final int SLEEP_TIME_SECONDS = 1;
private int numberOfUnsuccessfullDrainAttempts = 0;
- private boolean active;
-
- public ParallelConsumerHandler(long handlerId, String url, Deliverable deliverable) {
- super(handlerId, url, deliverable);
- }
-
- public boolean isActive() {
- boolean ret = false;
- activeLock.readLock().lock();
- try {
- ret = active;
- } finally {
- activeLock.readLock().unlock();
- }
- return ret;
+ public ParallelConsumerHandler(String url, Deliverable deliverable) {
+ super(url, deliverable);
}
public void run() {
- this.active = true;
-
- log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
+ log.debug(String.format("ParallelConsumerHandler starting: %s", getConsumerUrl()));
ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
- while (active) {
+ while (true) {
- int drainedMsgs = 0;
- try {
- activeLock.writeLock().lock();
-
- drainedMsgs = queue.drainTo(localList);
-
- if (drainedMsgs <= 0) {
- numberOfUnsuccessfullDrainAttempts++;
- } else {
- numberOfUnsuccessfullDrainAttempts = 0;
- }
-
- if (numberOfUnsuccessfullDrainAttempts >= MAX_UNSUCCESSFULL_DRAINS) {
- log.debug(String.format("inactivating, %d", getId()));
- active = false;
- numberOfUnsuccessfullDrainAttempts = 0;
- break;
- }
+ /*
+ * Try to find more message to send out
+ */
+ if (queue.drainTo(localList) <= 0) {
+ numberOfUnsuccessfullDrainAttempts++;
+ } else {
+ numberOfUnsuccessfullDrainAttempts = 0;
+ }
- } finally {
- activeLock.writeLock().unlock();
+ /*
+ * No new message for sometimes
+ */
+ if (numberOfUnsuccessfullDrainAttempts >= MAX_UNSUCCESSFULL_DRAINS) {
+ log.debug(String.format("ParallelConsumerHandler inactivating, %s", getConsumerUrl()));
+ numberOfUnsuccessfullDrainAttempts = 0;
+
+ log.debug(String.format("ParallelConsumerHandler done: %s,",
+ getConsumerUrl()));
+ removeFromList(this);
+ break;
}
send(localList);
@@ -156,11 +138,6 @@ public class ParallelSender implements S
waitForMessages();
}
}
-
- log.debug(String.format("calling on completion from : %d,", getId()));
-
- removeFromList(this);
-
}
private void waitForMessages() {