You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/09/15 09:16:04 UTC
svn commit: r695362 - in
/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi:
container/JBIContainer.java framework/AutoDeploymentService.java
framework/Registry.java messaging/DeliveryChannelImpl.java
Author: gertv
Date: Mon Sep 15 00:16:04 2008
New Revision: 695362
URL: http://svn.apache.org/viewvc?rev=695362&view=rev
Log:
SM-1574: Allow forced shutdown by cancelling pending sendsync exchanges
Modified:
servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/container/JBIContainer.java
servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java
servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java
servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/container/JBIContainer.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/container/JBIContainer.java?rev=695362&r1=695361&r2=695362&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/container/JBIContainer.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/container/JBIContainer.java Mon Sep 15 00:16:04 2008
@@ -21,6 +21,9 @@
import java.util.Collection;
import java.util.EventListener;
import java.util.MissingResourceException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -125,6 +128,7 @@
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean containerInitialized = new AtomicBoolean(false);
private IdGenerator idGenerator = new IdGenerator();
+ private long forceShutdown;
/**
* Default Constructor
@@ -540,6 +544,20 @@
public boolean isGenerateRootDir() {
return generateRootDir;
}
+
+ public long getForceShutdown() {
+ return forceShutdown;
+ }
+
+ /**
+ * Set the timeout (in ms) before a shutdown is forced by cancelling all pending exchanges.
+ * The default value is 0 -- no forced shutdown
+ *
+ * @param forceShutdown the timeout in ms
+ */
+ public void setForceShutdown(long forceShutdown) {
+ this.forceShutdown = forceShutdown;
+ }
/**
* Creates an auto-generated rootDir which is useful for integration testing
@@ -686,12 +704,8 @@
// Shutdown broker before registry to avoid the JCA/JMS flow to send
// lots of messages when components and endpoints are stopped.
broker.shutDown();
- registry.shutDown();
- if (services != null) {
- for (int i = services.length - 1; i >= 0; i--) {
- services[i].shutDown();
- }
- }
+ shutdownRegistry();
+ shutdownServices();
clientFactory.shutDown();
environmentContext.shutDown();
// shutdown the management context last, because it will close the mbean server
@@ -702,6 +716,40 @@
}
}
+ private void shutdownServices() throws JBIException {
+ if (services != null) {
+ for (int i = services.length - 1; i >= 0; i--) {
+ services[i].shutDown();
+ }
+ }
+ }
+
+ private void shutdownRegistry() throws JBIException {
+ FutureTask<Boolean> shutdown = new FutureTask<Boolean>(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ registry.shutDown();
+ return true;
+ };
+ });
+
+ executorFactory.createExecutor("ServiceMix -- shutting down registry").execute(shutdown);
+
+ try {
+ if (forceShutdown > 0) {
+ LOG.info("Waiting another " + forceShutdown + " ms for complete shutdown of the components and service assemblies");
+ shutdown.get(forceShutdown, TimeUnit.MILLISECONDS);
+ } else {
+ LOG.info("Waiting for complete shutdown of the components and service assemblies");
+ shutdown.get();
+ }
+ LOG.info("Components and service assemblies have been shut down");
+ } catch (Exception e) {
+ LOG.warn("Unable to shutdown components and service assemblies normally: " + e, e);
+ LOG.warn("Forcing shutdown by cancelling all pending exchanges");
+ registry.cancelPendingExchanges();
+ }
+ }
+
protected void addShutdownHook() {
if (useShutdownHook) {
shutdownHook = new Thread("ServiceMix ShutdownHook") {
Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java?rev=695362&r1=695361&r2=695362&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java Mon Sep 15 00:16:04 2008
@@ -624,6 +624,9 @@
}
timerTask = new TimerTask() {
public void run() {
+ if (!isStarted()) {
+ return;
+ }
if (isMonitorInstallationDirectory()) {
monitorDirectory(environmentContext.getInstallationDir(), installFileMap);
}
Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java?rev=695362&r1=695361&r2=695362&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java Mon Sep 15 00:16:04 2008
@@ -52,6 +52,7 @@
import org.apache.servicemix.jbi.deployment.ServiceUnit;
import org.apache.servicemix.jbi.management.AttributeInfoHelper;
import org.apache.servicemix.jbi.management.BaseSystemService;
+import org.apache.servicemix.jbi.messaging.DeliveryChannelImpl;
import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
import org.apache.servicemix.jbi.resolver.URIResolver;
import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
@@ -945,4 +946,17 @@
return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos());
}
+ /**
+ * Cancel pending exchanges in all components
+ */
+ public void cancelPendingExchanges() {
+ for (ComponentMBeanImpl mbean : componentRegistry.getComponents()) {
+ DeliveryChannelImpl channel = mbean.getDeliveryChannel();
+ if (channel != null) {
+ channel.cancelPendingExchanges();
+ }
+ }
+
+ }
+
}
Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?rev=695362&r1=695361&r2=695362&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java Mon Sep 15 00:16:04 2008
@@ -806,4 +806,16 @@
return "DeliveryChannel{" + component.getName() + "}";
}
+ /**
+ * Cancel all pending exchanges currently being handled by the DeliveryChannel
+ */
+ public void cancelPendingExchanges() {
+ for (String id : exchangesById.keySet()) {
+ MessageExchange exchange = exchangesById.get(id);
+ synchronized (exchange) {
+ exchange.notifyAll();
+ }
+ }
+ }
+
}