You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/10/05 00:31:54 UTC
svn commit: r1178990 - in /incubator/airavata/trunk/modules/ws-messenger:
commons/src/main/java/org/apache/airavata/wsmg/commons/storage/
messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/
messagebroker/src/main/java/org/apache/airavata/wsmg/bro...
Author: patanachai
Date: Tue Oct 4 22:31:53 2011
New Revision: 1178990
URL: http://svn.apache.org/viewvc?rev=1178990&view=rev
Log:
Add more shutdown method to WS-Messenger
Modified:
incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java
Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java Tue Oct 4 22:31:53 2011
@@ -285,6 +285,8 @@ public class ConnectionPool {
* regarding when the connections are closed.
*/
public synchronized void dispose() {
+ logger.info("Connection Pool Shutting down");
+
// stop clean up thread
this.stop = true;
this.clenupThread.interrupt();
@@ -299,12 +301,16 @@ public class ConnectionPool {
busyConnections = new Stack<Connection>();
lastAccessTimeRecord.clear();
+ logger.info("All connection is closed");
+
try {
this.clenupThread.join();
this.producerThread.join();
} catch (Exception e) {
logger.error("Cannot shutdown cleanup thread", e);
}
+
+ logger.info("Connection Pool Shutdown");
}
private void closeConnections(Stack<Connection> connections) {
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java Tue Oct 4 22:31:53 2011
@@ -177,7 +177,7 @@ public class MsgBoxServiceSkeleton imple
storage.removeAncientMessages();
}
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ logger.error("Msgbox cleanup thread is interrupted to close");
}
}
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java Tue Oct 4 22:31:53 2011
@@ -57,12 +57,19 @@ public class BrokerServiceLifeCycle impl
private static final long DEFAULT_SOCKET_TIME_OUT = 20000l;
private DeliveryProcessor proc;
+ private ConsumerUrlManager urlManager;
public void shutDown(ConfigurationContext arg, AxisService service) {
log.info("broker shutting down");
if (proc != null) {
proc.stop();
+ proc = null;
}
+ if(urlManager != null){
+ urlManager.stop();
+ urlManager = null;
+ }
+ log.info("broker shut down");
}
public void startUp(ConfigurationContext configContext, AxisService axisService) {
@@ -201,7 +208,7 @@ public class BrokerServiceLifeCycle impl
/*
* Create Deliverable
*/
- ConsumerUrlManager urlManager = new ConsumerUrlManager(configMan);
+ urlManager = new ConsumerUrlManager(configMan);
Deliverable senderUtils = new SenderUtils(urlManager);
senderUtils.setProtocol(protocol);
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java Tue Oct 4 22:31:53 2011
@@ -246,7 +246,7 @@ public class WsmgPersistantStorage imple
}
}
- public Object blockingDequeue() {
+ public Object blockingDequeue() throws InterruptedException {
while (true) {
try {
return retrive();
@@ -256,9 +256,6 @@ public class WsmgPersistantStorage imple
} catch (IOException e) {
logger.error(e.getMessage(), e);
e.printStackTrace();
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- e.printStackTrace();
}
}
}
@@ -438,7 +435,7 @@ public class WsmgPersistantStorage imple
logger.debug("Wait=" + wait);
Thread.sleep(wait);
} catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
+ logger.error("Queue is interrupted to close");
throw e;
}
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java Tue Oct 4 22:31:53 2011
@@ -28,7 +28,7 @@ public interface WsmgQueue {
void enqueue(Object object, String trackId);
- Object blockingDequeue();
+ Object blockingDequeue() throws InterruptedException;
void dispose();
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java Tue Oct 4 22:31:53 2011
@@ -61,7 +61,14 @@ public class ConsumerUrlManager {
cleanupTimer = new Timer("Failed consumer url handler", true);
cleanupTimer.scheduleAtFixedRate(new URLCleanUpTask(), 0, timerThreadInterval);
-
+ }
+
+ public void stop(){
+ logger.info("Stop ConsumerUrlManager");
+ if(this.cleanupTimer != null){
+ this.cleanupTimer.cancel();
+ }
+ logger.info("ConsumerUrlManager Stopped");
}
public void onFailedDelivery(EndpointReference consumerEndpointReference, long timeFinished, long timeTaken,
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java Tue Oct 4 22:31:53 2011
@@ -26,18 +26,18 @@ import org.apache.airavata.wsmg.messenge
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DeliveryProcessor{
+public class DeliveryProcessor {
private static final Logger logger = LoggerFactory.getLogger(DeliveryProcessor.class);
-
+
private SendingStrategy strategy;
private Deliverable deliverable;
-
+
private boolean running;
private Thread t;
public DeliveryProcessor(Deliverable deliverable, SendingStrategy strategy) {
- this.strategy = strategy;
+ this.strategy = strategy;
this.deliverable = deliverable;
}
@@ -49,15 +49,19 @@ public class DeliveryProcessor{
public void stop() {
this.running = false;
-
- try{
- this.t.join();
- }catch(InterruptedException ie){
- logger.error("Wait for sending thread to finish (join) is interrupted");
+
+ if (this.t != null) {
+ this.t.interrupt();
+
+ try {
+ this.t.join();
+ } catch (InterruptedException ie) {
+ logger.error("Wait for sending thread to finish (join) is interrupted");
+ }
}
-
+
WSMGParameter.OUT_GOING_QUEUE.dispose();
- }
+ }
private class CheckingAndSending implements Runnable {
@@ -79,6 +83,7 @@ public class DeliveryProcessor{
logger.error("Unexpected_exception:", e);
}
}
+ logger.debug("Shutdown Strategy");
strategy.shutdown();
}
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java Tue Oct 4 22:31:53 2011
@@ -55,6 +55,7 @@ public class MessengerServlet extends Ht
private static final long DEFAULT_SOCKET_TIME_OUT = 20000l;
private DeliveryProcessor proc;
+ private ConsumerUrlManager urlManager;
public void init(ServletConfig config) throws ServletException {
logger.info("Starting messenger servlet");
@@ -112,7 +113,7 @@ public class MessengerServlet extends Ht
/*
* Create Deliverable
*/
- ConsumerUrlManager urlManager = new ConsumerUrlManager(configMan);
+ urlManager = new ConsumerUrlManager(configMan);
Deliverable senderUtils = new SenderUtils(urlManager);
senderUtils.setProtocol(protocol);
@@ -126,6 +127,11 @@ public class MessengerServlet extends Ht
if (proc != null) {
proc.stop();
}
+ if(urlManager != null){
+ urlManager.stop();
+ urlManager = null;
+ }
+ logger.info("wsmg-messenger shut down");
}
public ServletConfig getServletConfig() {