You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/09/15 09:16:04 UTC

svn commit: r695362 - in /servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi: container/JBIContainer.java framework/AutoDeploymentService.java framework/Registry.java messaging/DeliveryChannelImpl.java

Author: gertv
Date: Mon Sep 15 00:16:04 2008
New Revision: 695362

URL: http://svn.apache.org/viewvc?rev=695362&view=rev
Log:
SM-1574: Allow forced shutdown by cancelling pending sendsync exchanges

Modified:
    servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/container/JBIContainer.java
    servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java
    servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java
    servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java

Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/container/JBIContainer.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/container/JBIContainer.java?rev=695362&r1=695361&r2=695362&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/container/JBIContainer.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/container/JBIContainer.java Mon Sep 15 00:16:04 2008
@@ -21,6 +21,9 @@
 import java.util.Collection;
 import java.util.EventListener;
 import java.util.MissingResourceException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
 
@@ -125,6 +128,7 @@
     private AtomicBoolean started = new AtomicBoolean(false);
     private AtomicBoolean containerInitialized = new AtomicBoolean(false);
     private IdGenerator idGenerator = new IdGenerator();
+    private long forceShutdown;
 
     /**
      * Default Constructor
@@ -540,6 +544,20 @@
     public boolean isGenerateRootDir() {
         return generateRootDir;
     }
+    
+    public long getForceShutdown() {
+        return forceShutdown;
+    }
+    
+    /**
+     * Set the timeout (in ms) before a shutdown is forced by cancelling all pending exchanges.
+     * The default value is 0 -- no forced shutdown
+     * 
+     * @param forceShutdown the timeout in ms
+     */
+    public void setForceShutdown(long forceShutdown) {
+        this.forceShutdown = forceShutdown;
+    }
 
     /**
      * Creates an auto-generated rootDir which is useful for integration testing
@@ -686,12 +704,8 @@
             // Shutdown broker before registry to avoid the JCA/JMS flow to send
             // lots of messages when components and endpoints are stopped.
             broker.shutDown();
-            registry.shutDown();
-            if (services != null) {
-                for (int i = services.length - 1; i >= 0; i--) {
-                    services[i].shutDown();
-                }
-            }
+            shutdownRegistry();           
+            shutdownServices();
             clientFactory.shutDown();
             environmentContext.shutDown();
             // shutdown the management context last, because it will close the mbean server
@@ -702,6 +716,40 @@
         }
     }
 
+    private void shutdownServices() throws JBIException {
+        if (services != null) {
+            for (int i = services.length - 1; i >= 0; i--) {
+                services[i].shutDown();
+            }
+        }
+    }
+
+    private void shutdownRegistry() throws JBIException {
+        FutureTask<Boolean> shutdown = new FutureTask<Boolean>(new Callable<Boolean>() {
+            public Boolean call() throws Exception {
+                registry.shutDown();
+                return true;
+            };
+        });
+
+        executorFactory.createExecutor("ServiceMix -- shutting down registry").execute(shutdown);
+        
+        try {
+            if (forceShutdown > 0) {
+                LOG.info("Waiting another " + forceShutdown + " ms for complete shutdown of the components and service assemblies");
+                shutdown.get(forceShutdown, TimeUnit.MILLISECONDS);
+            } else {
+                LOG.info("Waiting for complete shutdown of the components and service assemblies");
+                shutdown.get();
+            }
+            LOG.info("Components and service assemblies have been shut down");
+        } catch (Exception e) {
+            LOG.warn("Unable to shutdown components and service assemblies normally: " + e, e);
+            LOG.warn("Forcing shutdown by cancelling all pending exchanges");
+            registry.cancelPendingExchanges();
+        }
+    }
+
     protected void addShutdownHook() {
         if (useShutdownHook) {
             shutdownHook = new Thread("ServiceMix ShutdownHook") {

Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java?rev=695362&r1=695361&r2=695362&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java Mon Sep 15 00:16:04 2008
@@ -624,6 +624,9 @@
             }
             timerTask = new TimerTask() {
                 public void run() {
+                    if (!isStarted()) {
+                        return;
+                    }
                     if (isMonitorInstallationDirectory()) {
                         monitorDirectory(environmentContext.getInstallationDir(), installFileMap);
                     }

Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java?rev=695362&r1=695361&r2=695362&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java Mon Sep 15 00:16:04 2008
@@ -52,6 +52,7 @@
 import org.apache.servicemix.jbi.deployment.ServiceUnit;
 import org.apache.servicemix.jbi.management.AttributeInfoHelper;
 import org.apache.servicemix.jbi.management.BaseSystemService;
+import org.apache.servicemix.jbi.messaging.DeliveryChannelImpl;
 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.apache.servicemix.jbi.resolver.URIResolver;
 import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
@@ -945,4 +946,17 @@
         return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos());
     }
 
+    /**
+     * Cancel pending exchanges in all components
+     */
+    public void cancelPendingExchanges() {
+        for (ComponentMBeanImpl mbean : componentRegistry.getComponents()) {
+            DeliveryChannelImpl channel = mbean.getDeliveryChannel();
+            if (channel != null) {
+                channel.cancelPendingExchanges();
+            }
+        }
+        
+    }
+
 }

Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?rev=695362&r1=695361&r2=695362&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java Mon Sep 15 00:16:04 2008
@@ -806,4 +806,16 @@
         return "DeliveryChannel{" + component.getName() + "}";
     }
 
+    /**
+     * Cancel all pending exchanges currently being handled by the DeliveryChannel
+     */
+    public void cancelPendingExchanges() {
+        for (String id : exchangesById.keySet()) {
+            MessageExchange exchange = exchangesById.get(id);
+            synchronized (exchange) {
+                exchange.notifyAll();   
+            }
+        }
+    }
+
 }