You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2012/02/08 19:14:03 UTC
svn commit: r1242034 - in
/incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking:
impl/publish/ impl/subscription/ util/
Author: lahiru
Date: Wed Feb 8 18:14:02 2012
New Revision: 1242034
URL: http://svn.apache.org/viewvc?rev=1242034&view=rev
Log:
fixing https://issues.apache.org/jira/browse/AIRAVATA-295.
Modified:
incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java
incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java
incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java
Modified: incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java?rev=1242034&r1=1242033&r2=1242034&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java (original)
+++ incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/publish/AbstractPublisher.java Wed Feb 8 18:14:02 2012
@@ -53,6 +53,8 @@ public abstract class AbstractPublisher
private boolean deleted = false;
+ private boolean deleteNow = false;
+
private final Thread pubThread;
protected AbstractPublisher(int capacity, boolean defaultAsync) {
@@ -68,7 +70,8 @@ public abstract class AbstractPublisher
// public abstract void publishSync(String leadMessage);
public final void delete() {
deleted = true;
- pubThread.interrupt();
+ messageQueue.setCanStop(true);
+ deleteNow = true;
}
public final boolean isDeleted() {
@@ -149,13 +152,16 @@ public abstract class AbstractPublisher
public final void run() {
BrokerEntry brokerEntry = null;
- while (true) {
+ while (deleteNow) {
try {
// get the head from queue, but dont remove it yet
// block for message to arrive only if not finished;
// if finished, dont block...just quit
brokerEntry = finished ? messageQueue.peek() : messageQueue.get();
+ if(deleteNow){
+ break;
+ }
if (brokerEntry == null) {
// the queue has been flushed
Modified: incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java?rev=1242034&r1=1242033&r2=1242034&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java (original)
+++ incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.java Wed Feb 8 18:14:02 2012
@@ -110,7 +110,7 @@ public class MessageBoxNotificationHandl
logger.info("Unsubscribing the messagebox that was destroyed," + " SubscriptionID:" + this.subscriptionId);
- msgboxHandler.deleteMsgBox(msgBoxEpr, 1000L);
+ msgboxHandler.deleteMsgBox(msgBoxEpr, 2000L);
} catch (MsgBrokerClientException e) {
Modified: incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java?rev=1242034&r1=1242033&r2=1242034&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java (original)
+++ incubator/airavata/trunk/modules/commons/workflow-tracking/src/main/java/org/apache/airavata/workflow/tracking/util/LinkedMessageQueue.java Wed Feb 8 18:14:02 2012
@@ -37,7 +37,9 @@ public class LinkedMessageQueue<E> imple
private final int capacity;
private final Object takeLock = new Object();
- private final Object putLock = new Object();
+ private final Object putLock = new Object();
+
+ private boolean canStop = false;
public LinkedMessageQueue() {
this(Integer.MAX_VALUE); // default capacity is MAX_INT
@@ -122,10 +124,11 @@ public class LinkedMessageQueue<E> imple
public final E get() throws InterruptedException {
if (count.get() <= 0) { // do initial check before checking & waiting
+ while (count.get() <= 0 && !canStop) {
synchronized (putLock) {
- while (count.get() <= 0) {
- putLock.wait();
+ putLock.wait(1);
}
+ return null;
}
}
@@ -172,4 +175,7 @@ public class LinkedMessageQueue<E> imple
return list.iterator();
}
+ public void setCanStop(boolean canStop) {
+ this.canStop = canStop;
+ }
}