You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/10/05 00:31:54 UTC

svn commit: r1178990 - in /incubator/airavata/trunk/modules/ws-messenger: commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/ messagebroker/src/main/java/org/apache/airavata/wsmg/bro...

Author: patanachai
Date: Tue Oct  4 22:31:53 2011
New Revision: 1178990

URL: http://svn.apache.org/viewvc?rev=1178990&view=rev
Log:
Add more shutdown method to WS-Messenger

Modified:
    incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
    incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java

Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java Tue Oct  4 22:31:53 2011
@@ -285,6 +285,8 @@ public class ConnectionPool {
      * regarding when the connections are closed.
      */
     public synchronized void dispose() {
+        logger.info("Connection Pool Shutting down");
+        
         // stop clean up thread
         this.stop = true;
         this.clenupThread.interrupt();
@@ -299,12 +301,16 @@ public class ConnectionPool {
         busyConnections = new Stack<Connection>();
         lastAccessTimeRecord.clear();
 
+        logger.info("All connection is closed");
+        
         try {
             this.clenupThread.join();
             this.producerThread.join();
         } catch (Exception e) {
             logger.error("Cannot shutdown cleanup thread", e);
         }
+        
+        logger.info("Connection Pool Shutdown");
     }
 
     private void closeConnections(Stack<Connection> connections) {

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java Tue Oct  4 22:31:53 2011
@@ -177,7 +177,7 @@ public class MsgBoxServiceSkeleton imple
                         storage.removeAncientMessages();
                     }
                 } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
+                    logger.error("Msgbox cleanup thread is interrupted to close");
                 }
             }
         }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java Tue Oct  4 22:31:53 2011
@@ -57,12 +57,19 @@ public class BrokerServiceLifeCycle impl
     private static final long DEFAULT_SOCKET_TIME_OUT = 20000l;
     
     private DeliveryProcessor proc;
+    private ConsumerUrlManager urlManager;
 
     public void shutDown(ConfigurationContext arg, AxisService service) {
         log.info("broker shutting down");
         if (proc != null) {
             proc.stop();
+            proc = null;
         }
+        if(urlManager != null){
+            urlManager.stop();
+            urlManager = null;
+        }
+        log.info("broker shut down");
     }
 
     public void startUp(ConfigurationContext configContext, AxisService axisService) {
@@ -201,7 +208,7 @@ public class BrokerServiceLifeCycle impl
         /*
          * Create Deliverable
          */
-        ConsumerUrlManager urlManager = new ConsumerUrlManager(configMan);
+        urlManager = new ConsumerUrlManager(configMan);
         Deliverable senderUtils = new SenderUtils(urlManager);
         senderUtils.setProtocol(protocol);
         

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java Tue Oct  4 22:31:53 2011
@@ -246,7 +246,7 @@ public class WsmgPersistantStorage imple
         }
     }
 
-    public Object blockingDequeue() {
+    public Object blockingDequeue() throws InterruptedException {
         while (true) {
             try {
                 return retrive();
@@ -256,9 +256,6 @@ public class WsmgPersistantStorage imple
             } catch (IOException e) {
                 logger.error(e.getMessage(), e);
                 e.printStackTrace();
-            } catch (InterruptedException e) {
-                logger.error(e.getMessage(), e);
-                e.printStackTrace();
             }
         }
     }
@@ -438,7 +435,7 @@ public class WsmgPersistantStorage imple
                 logger.debug("Wait=" + wait);
                 Thread.sleep(wait);
             } catch (InterruptedException e) {
-                logger.error(e.getMessage(), e);
+                logger.error("Queue is interrupted to close");
                 throw e;
             }
         }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java Tue Oct  4 22:31:53 2011
@@ -28,7 +28,7 @@ public interface WsmgQueue {
 
     void enqueue(Object object, String trackId);
 
-    Object blockingDequeue();
+    Object blockingDequeue() throws InterruptedException;
     
     void dispose();
     

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java Tue Oct  4 22:31:53 2011
@@ -61,7 +61,14 @@ public class ConsumerUrlManager {
 
         cleanupTimer = new Timer("Failed consumer url handler", true);
         cleanupTimer.scheduleAtFixedRate(new URLCleanUpTask(), 0, timerThreadInterval);
-
+    }
+    
+    public void stop(){
+        logger.info("Stop ConsumerUrlManager");
+        if(this.cleanupTimer != null){
+            this.cleanupTimer.cancel();
+        }
+        logger.info("ConsumerUrlManager Stopped");
     }
 
     public void onFailedDelivery(EndpointReference consumerEndpointReference, long timeFinished, long timeTaken,

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java Tue Oct  4 22:31:53 2011
@@ -26,18 +26,18 @@ import org.apache.airavata.wsmg.messenge
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DeliveryProcessor{
+public class DeliveryProcessor {
 
     private static final Logger logger = LoggerFactory.getLogger(DeliveryProcessor.class);
-    
+
     private SendingStrategy strategy;
     private Deliverable deliverable;
-    
+
     private boolean running;
     private Thread t;
 
     public DeliveryProcessor(Deliverable deliverable, SendingStrategy strategy) {
-        this.strategy = strategy;       
+        this.strategy = strategy;
         this.deliverable = deliverable;
     }
 
@@ -49,15 +49,19 @@ public class DeliveryProcessor{
 
     public void stop() {
         this.running = false;
-        
-        try{
-            this.t.join();
-        }catch(InterruptedException ie){
-            logger.error("Wait for sending thread to finish (join) is interrupted");
+
+        if (this.t != null) {
+            this.t.interrupt();
+
+            try {
+                this.t.join();
+            } catch (InterruptedException ie) {
+                logger.error("Wait for sending thread to finish (join) is interrupted");
+            }
         }
-        
+
         WSMGParameter.OUT_GOING_QUEUE.dispose();
-    }        
+    }
 
     private class CheckingAndSending implements Runnable {
 
@@ -79,6 +83,7 @@ public class DeliveryProcessor{
                     logger.error("Unexpected_exception:", e);
                 }
             }
+            logger.debug("Shutdown Strategy");
             strategy.shutdown();
         }
     }

Modified: incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java?rev=1178990&r1=1178989&r2=1178990&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java Tue Oct  4 22:31:53 2011
@@ -55,6 +55,7 @@ public class MessengerServlet extends Ht
     private static final long DEFAULT_SOCKET_TIME_OUT = 20000l;
 
     private DeliveryProcessor proc;
+    private ConsumerUrlManager urlManager;
 
     public void init(ServletConfig config) throws ServletException {
         logger.info("Starting messenger servlet");
@@ -112,7 +113,7 @@ public class MessengerServlet extends Ht
         /*
          * Create Deliverable
          */
-        ConsumerUrlManager urlManager = new ConsumerUrlManager(configMan);
+        urlManager = new ConsumerUrlManager(configMan);
         Deliverable senderUtils = new SenderUtils(urlManager);
         senderUtils.setProtocol(protocol);
 
@@ -126,6 +127,11 @@ public class MessengerServlet extends Ht
         if (proc != null) {
             proc.stop();
         }
+        if(urlManager != null){
+            urlManager.stop();
+            urlManager = null;
+        }
+        logger.info("wsmg-messenger shut down");
     }
 
     public ServletConfig getServletConfig() {