You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/26 19:30:07 UTC

svn commit: r1175962 - in /incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl: ConsumerHandler.java FixedParallelSender.java ParallelSender.java

Author: patanachai
Date: Mon Sep 26 17:30:06 2011
New Revision: 1175962

URL: http://svn.apache.org/viewvc?rev=1175962&view=rev
Log:
AIRAVATA-101 Change FixedParallelSender and ParallelSender.

Modified:
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java?rev=1175962&r1=1175961&r2=1175962&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java Mon Sep 26 17:30:06 2011
@@ -23,6 +23,7 @@ package org.apache.airavata.wsmg.messeng
 
 import java.io.StringReader;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.airavata.wsmg.commons.CommonRoutines;
@@ -37,33 +38,34 @@ public abstract class ConsumerHandler im
     
     protected LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
 
-    private final long id;
+    private final String id;
 
     private String consumerUrl;
 
     private Deliverable deliverable;
 
-    public ConsumerHandler(long handlerId, String url, Deliverable deliverable) {
-        id = handlerId;
-        consumerUrl = url;
+    public ConsumerHandler(String url, Deliverable deliverable) {
+        this.id = UUID.randomUUID().toString();
+        this.consumerUrl = url;
         this.deliverable = deliverable;
     }
-
-    public long getId() {
-        return id;
-    }
-
+    
     public String getConsumerUrl() {
         return consumerUrl;
     }
-
+    
     @Override
     public boolean equals(Object o) {
         if (o instanceof ConsumerHandler) {
             ConsumerHandler h = (ConsumerHandler) o;
-            return h.getId() == this.id && h.getConsumerUrl().equals(this.getConsumerUrl());
+            return this.id.equals(h.id);            
         }
         return false;
+    }     
+
+    @Override
+    public int hashCode() {
+        return this.id.hashCode();        
     }
 
     public void submitMessage(LightweightMsg msg) {

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java?rev=1175962&r1=1175961&r2=1175962&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java Mon Sep 26 17:30:06 2011
@@ -22,8 +22,8 @@
 package org.apache.airavata.wsmg.messenger.strategy.impl;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -38,9 +38,7 @@ public class FixedParallelSender impleme
 
     private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class);
 
-    private ConcurrentHashMap<String, FixedParallelConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, FixedParallelConsumerHandler>();
-
-    private long consumerHandlerIdCounter;
+    private HashMap<String, ConsumerHandler> activeConsumerHanders = new HashMap<String, ConsumerHandler>();
 
     private int batchSize;
 
@@ -71,8 +69,7 @@ public class FixedParallelSender impleme
         }
     }
 
-    private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message,
-            Deliverable deliverable) {
+    private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) {
 
         String consumerUrl = consumer.getConsumerEprStr();
 
@@ -80,34 +77,35 @@ public class FixedParallelSender impleme
                 message.getAdditionalMessageContent());
 
         synchronized (activeConsumerHanders) {
-            FixedParallelConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+            ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
             if (handler == null) {
-                handler = new FixedParallelConsumerHandler(consumerHandlerIdCounter++, consumerUrl, deliverable);
+                handler = new FixedParallelConsumerHandler(consumerUrl, deliverable);
                 activeConsumerHanders.put(consumerUrl, handler);
                 handler.submitMessage(lwm);
                 threadPool.submit(handler);
             } else {
                 handler.submitMessage(lwm);
             }
-        }        
+        }
     }
 
     public void removeFromList(ConsumerHandler h) {
-        if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
-            log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s", h.getId(),
-                    h.getConsumerUrl()));
+        synchronized (activeConsumerHanders) {
+            if (activeConsumerHanders.remove(h.getConsumerUrl()) != null) {
+                log.debug(String.format("inactive consumer handler is already removed: url : %s", h.getConsumerUrl()));
+            }
         }
     }
-    
+
     class FixedParallelConsumerHandler extends ConsumerHandler {
 
-        public FixedParallelConsumerHandler(long handlerId, String url, Deliverable deliverable) {
-            super(handlerId, url, deliverable);
+        public FixedParallelConsumerHandler(String url, Deliverable deliverable) {
+            super(url, deliverable);
         }
 
         public void run() {
 
-            log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
+            log.debug(String.format("FixedParallelConsumerHandler starting: %s", getConsumerUrl()));
 
             ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
 
@@ -116,16 +114,13 @@ public class FixedParallelSender impleme
             send(localList);
             localList.clear();
 
-            log.debug(String.format("calling on completion from : %d,", getId()));
-            
-            
+            log.debug(String.format("FixedParallelConsumerHandler done: %s,", getConsumerUrl()));
+
             /*
              * Remove handler if there is no message
              */
-            synchronized (activeConsumerHanders) {  
-                if(queue.size() == 0){
-                    removeFromList(this);
-                }
+            if (queue.size() == 0) {
+                removeFromList(this);
             }
         }
     }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java?rev=1175962&r1=1175961&r2=1175962&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java Mon Sep 26 17:30:06 2011
@@ -22,13 +22,11 @@
 package org.apache.airavata.wsmg.messenger.strategy.impl;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.airavata.wsmg.broker.ConsumerInfo;
 import org.apache.airavata.wsmg.commons.OutGoingMessage;
@@ -45,10 +43,9 @@ public class ParallelSender implements S
 
     private static final Logger log = LoggerFactory.getLogger(ParallelSender.class);
 
-    private ConcurrentHashMap<String, ParallelConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ParallelConsumerHandler>();
+    private HashMap<String, ConsumerHandler> activeConsumerHanders = new HashMap<String, ConsumerHandler>();
 
     private ExecutorService threadPool;
-    private long consumerHandlerIdCounter;
 
     public void init() {
         this.threadPool = Executors.newCachedThreadPool();
@@ -75,78 +72,63 @@ public class ParallelSender implements S
         LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
                 message.getAdditionalMessageContent());
 
-        ParallelConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
-        if (handler == null || !handler.isActive()) {
-            handler = new ParallelConsumerHandler(consumerHandlerIdCounter++, consumerUrl, deliverable);
-            activeConsumerHanders.put(consumerUrl, handler);
-            handler.submitMessage(lwm);
-            threadPool.submit(handler);
-        } else {
-            handler.submitMessage(lwm);
+        synchronized (activeConsumerHanders) {
+            ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+            if (handler == null) {
+                handler = new ParallelConsumerHandler(consumerUrl, deliverable);
+                activeConsumerHanders.put(consumerUrl, handler);
+                handler.submitMessage(lwm);
+                threadPool.submit(handler);
+            } else {
+                handler.submitMessage(lwm);
+            }
         }
     }
 
     public void removeFromList(ConsumerHandler h) {
-        if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
-            log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s", h.getId(),
-                    h.getConsumerUrl()));
+        synchronized (activeConsumerHanders) {
+            if (activeConsumerHanders.remove(h.getConsumerUrl()) != null) {
+                log.debug(String.format("inactive consumer handler is already removed: url : %s", h.getConsumerUrl()));
+            }
         }
     }
 
     class ParallelConsumerHandler extends ConsumerHandler {
 
-        private ReadWriteLock activeLock = new ReentrantReadWriteLock();
-
         private static final int MAX_UNSUCCESSFULL_DRAINS = 3;
         private static final int SLEEP_TIME_SECONDS = 1;
         private int numberOfUnsuccessfullDrainAttempts = 0;
 
-        private boolean active;
-
-        public ParallelConsumerHandler(long handlerId, String url, Deliverable deliverable) {
-            super(handlerId, url, deliverable);
-        }
-
-        public boolean isActive() {
-            boolean ret = false;
-            activeLock.readLock().lock();
-            try {
-                ret = active;
-            } finally {
-                activeLock.readLock().unlock();
-            }
-            return ret;
+        public ParallelConsumerHandler(String url, Deliverable deliverable) {
+            super(url, deliverable);
         }
 
         public void run() {
-            this.active = true;
-
-            log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
+            log.debug(String.format("ParallelConsumerHandler starting: %s", getConsumerUrl()));
 
             ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
-            while (active) {
+            while (true) {
 
-                int drainedMsgs = 0;
-                try {
-                    activeLock.writeLock().lock();
-
-                    drainedMsgs = queue.drainTo(localList);
-
-                    if (drainedMsgs <= 0) {
-                        numberOfUnsuccessfullDrainAttempts++;
-                    } else {
-                        numberOfUnsuccessfullDrainAttempts = 0;
-                    }
-
-                    if (numberOfUnsuccessfullDrainAttempts >= MAX_UNSUCCESSFULL_DRAINS) {
-                        log.debug(String.format("inactivating, %d", getId()));
-                        active = false;
-                        numberOfUnsuccessfullDrainAttempts = 0;
-                        break;
-                    }
+                /*
+                 * Try to find more message to send out
+                 */
+                if (queue.drainTo(localList) <= 0) {
+                    numberOfUnsuccessfullDrainAttempts++;
+                } else {
+                    numberOfUnsuccessfullDrainAttempts = 0;
+                }
 
-                } finally {
-                    activeLock.writeLock().unlock();
+                /*
+                 * No new message for sometimes
+                 */
+                if (numberOfUnsuccessfullDrainAttempts >= MAX_UNSUCCESSFULL_DRAINS) {
+                    log.debug(String.format("ParallelConsumerHandler inactivating, %s", getConsumerUrl()));
+                    numberOfUnsuccessfullDrainAttempts = 0;
+
+                    log.debug(String.format("ParallelConsumerHandler done: %s,",
+                            getConsumerUrl()));
+                    removeFromList(this);
+                    break;
                 }
 
                 send(localList);
@@ -156,11 +138,6 @@ public class ParallelSender implements S
                     waitForMessages();
                 }
             }
-
-            log.debug(String.format("calling on completion from : %d,", getId()));
-
-            removeFromList(this);
-
         }
 
         private void waitForMessages() {