You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2012/02/08 19:14:03 UTC

svn commit: r1242034 - in /incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking: impl/publish/ impl/subscription/ util/

Author: lahiru
Date: Wed Feb  8 18:14:02 2012
New Revision: 1242034

URL: http://svn.apache.org/viewvc?rev=1242034&view=rev
Log:
fixing https://issues.apache.org/jira/browse/AIRAVATA-295.

Modified:
    incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java
    incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java
    incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java

Modified: incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java?rev=1242034&r1=1242033&r2=1242034&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java (original)
+++ incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java Wed Feb  8 18:14:02 2012
@@ -53,6 +53,8 @@ public abstract class AbstractPublisher 
 
     private boolean deleted = false;
 
+    private boolean deleteNow = false;
+
     private final Thread pubThread;
 
     protected AbstractPublisher(int capacity, boolean defaultAsync) {
@@ -68,7 +70,8 @@ public abstract class AbstractPublisher 
     // public abstract void publishSync(String leadMessage);
     public final void delete() {
         deleted = true;
-        pubThread.interrupt();
+        messageQueue.setCanStop(true);
+        deleteNow = true;
     }
 
     public final boolean isDeleted() {
@@ -149,13 +152,16 @@ public abstract class AbstractPublisher 
     public final void run() {
 
         BrokerEntry brokerEntry = null;
-        while (true) {
+        while (deleteNow) {
 
             try {
                 // get the head from queue, but dont remove it yet
                 // block for message to arrive only if not finished;
                 // if finished, dont block...just quit
                 brokerEntry = finished ? messageQueue.peek() : messageQueue.get();
+                if(deleteNow){
+                    break;
+                }
                 if (brokerEntry == null) {
 
                     // the queue has been flushed

Modified: incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java?rev=1242034&r1=1242033&r2=1242034&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java (original)
+++ incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java Wed Feb  8 18:14:02 2012
@@ -110,7 +110,7 @@ public class MessageBoxNotificationHandl
 
             logger.info("Unsubscribing the messagebox that was destroyed," + " SubscriptionID:" + this.subscriptionId);
 
-            msgboxHandler.deleteMsgBox(msgBoxEpr, 1000L);
+            msgboxHandler.deleteMsgBox(msgBoxEpr, 2000L);
 
         } catch (MsgBrokerClientException e) {
 

Modified: incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java?rev=1242034&r1=1242033&r2=1242034&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java (original)
+++ incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java Wed Feb  8 18:14:02 2012
@@ -37,7 +37,9 @@ public class LinkedMessageQueue<E> imple
     private final int capacity;
 
     private final Object takeLock = new Object();
-    private final Object putLock = new Object();
+    private final Object  putLock = new Object();
+    
+    private boolean canStop = false;
 
     public LinkedMessageQueue() {
         this(Integer.MAX_VALUE); // default capacity is MAX_INT
@@ -122,10 +124,11 @@ public class LinkedMessageQueue<E> imple
     public final E get() throws InterruptedException {
 
         if (count.get() <= 0) { // do initial check before checking & waiting
+                while (count.get() <= 0 && !canStop) {
             synchronized (putLock) {
-                while (count.get() <= 0) {
-                    putLock.wait();
+                    putLock.wait(1);
                 }
+                    return null;
             }
         }
 
@@ -172,4 +175,7 @@ public class LinkedMessageQueue<E> imple
         return list.iterator();
     }
 
+    public void setCanStop(boolean canStop) {
+        this.canStop = canStop;
+    }
 }