You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/06/04 13:59:00 UTC

svn commit: r411549 - in /incubator/servicemix/trunk/servicemix-common/src: main/java/org/apache/servicemix/common/ test/java/org/apache/servicemix/common/

Author: gnodet
Date: Sun Jun  4 04:58:59 2006
New Revision: 411549

URL: http://svn.apache.org/viewvc?rev=411549&view=rev
Log:
Improve support for transactions.
Add an AsyncBaseLifeCycle which uses Pull delivery and BaseLifeCycle now uses Push delivery.

Added:
    incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
    incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java
Modified:
    incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java

Added: incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java?rev=411549&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java (added)
+++ incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java Sun Jun  4 04:58:59 2006
@@ -0,0 +1,460 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.common;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import javax.jbi.JBIException;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.component.ComponentLifeCycle;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkManager;
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.servicemix.JbiConstants;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Base class for life cycle management of components.
+ * This class may be used as is.
+ * 
+ * @author Guillaume Nodet
+ * @version $Revision: 399873 $
+ * @since 3.0
+ */
+public class AsyncBaseLifeCycle implements ComponentLifeCycle {
+
+    protected final transient Log logger;
+    
+    protected BaseComponent component;
+    protected ComponentContext context;
+    protected ObjectName mbeanName;
+    protected WorkManager workManager;
+    protected AtomicBoolean running;
+    protected DeliveryChannel channel;
+    protected Thread poller;
+    protected AtomicBoolean polling;
+    protected TransactionManager transactionManager;
+    protected boolean workManagerCreated;
+    protected Map processors = new ConcurrentHashMap();
+    
+    
+    public AsyncBaseLifeCycle(BaseComponent component) {
+        this.component = component;
+        this.logger = component.logger;
+        this.running = new AtomicBoolean(false);
+        this.polling = new AtomicBoolean(false);
+        this.processors = new ConcurrentHashMap();
+    }
+    
+    /* (non-Javadoc)
+     * @see javax.jbi.component.ComponentLifeCycle#getExtensionMBeanName()
+     */
+    public ObjectName getExtensionMBeanName() {
+        return mbeanName;
+    }
+
+    protected Object getExtensionMBean() throws Exception {
+        return null;
+    }
+    
+    protected ObjectName createExtensionMBeanName() throws Exception {
+        return this.context.getMBeanNames().createCustomComponentMBeanName("Configuration");
+    }
+    
+    protected QName getEPRServiceName() {
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see javax.jbi.component.ComponentLifeCycle#init(javax.jbi.component.ComponentContext)
+     */
+    public void init(ComponentContext context) throws JBIException {
+        try {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Initializing component");
+            }
+            this.context = context;
+            this.channel = context.getDeliveryChannel();
+            this.transactionManager = (TransactionManager) context.getTransactionManager();
+            doInit();
+            if (logger.isDebugEnabled()) {
+                logger.debug("Component initialized");
+            }
+        } catch (JBIException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new JBIException("Error calling init", e);
+        }
+    }
+
+    protected void doInit() throws Exception {
+        // Register extension mbean
+        Object mbean = getExtensionMBean();
+        if (mbean != null) {
+            MBeanServer server = this.context.getMBeanServer();
+            if (server == null) {
+                // TODO: log a warning ?
+                //throw new JBIException("null mBeanServer");
+            } else {
+                this.mbeanName = createExtensionMBeanName();
+                if (server.isRegistered(this.mbeanName)) {
+                    server.unregisterMBean(this.mbeanName);
+                }
+                server.registerMBean(mbean, this.mbeanName);
+            }
+        }
+        // Obtain or create the work manager
+        // When using the WorkManager from ServiceMix,
+        // some class loader problems can appear when
+        // trying to uninstall the components.
+        // Some threads owned by the work manager have a 
+        // security context referencing the component class loader
+        // so that every loaded classes are locked
+        //this.workManager = findWorkManager();
+        if (this.workManager == null) {
+            this.workManagerCreated = true;
+            this.workManager = createWorkManager();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see javax.jbi.component.ComponentLifeCycle#shutDown()
+     */
+    public void shutDown() throws JBIException {
+        try {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Shutting down component");
+            }
+            doShutDown();
+            this.context = null;
+            if (logger.isDebugEnabled()) {
+                logger.debug("Component shut down");
+            }
+        } catch (JBIException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new JBIException("Error calling shutdown", e);
+        }
+    }
+
+    protected void doShutDown() throws Exception {
+        // Unregister mbean
+        if (this.mbeanName != null) {
+            MBeanServer server = this.context.getMBeanServer();
+            if (server == null) {
+                throw new JBIException("null mBeanServer");
+            }
+            if (server.isRegistered(this.mbeanName)) {
+                server.unregisterMBean(this.mbeanName);
+            }
+        }
+        // Destroy work manager, if created
+        if (this.workManagerCreated) {
+            if (this.workManager instanceof BasicWorkManager) {
+                ((BasicWorkManager) this.workManager).shutDown();
+            }
+            this.workManager = null;
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see javax.jbi.component.ComponentLifeCycle#start()
+     */
+    public void start() throws JBIException {
+        try {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Starting component");
+            }
+            if (this.running.compareAndSet(false, true)) {
+                doStart();
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("Component started");
+            }
+        } catch (JBIException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new JBIException("Error calling start", e);
+        }
+    }
+
+    protected void doStart() throws Exception {
+        synchronized (this.polling) {
+            workManager.startWork(new Work() {
+                public void release() { }
+                public void run() {
+                    poller = Thread.currentThread();
+                    pollDeliveryChannel();
+                }
+            });
+            polling.wait();
+        }
+    }
+    
+    protected void pollDeliveryChannel() {
+        synchronized (polling) {
+            polling.set(true);
+            polling.notify();
+        }
+        while (running.get()) {
+            try {
+                final MessageExchange exchange = channel.accept(1000L);
+                if (exchange != null) {
+                    final Transaction tx = (Transaction) exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
+                    if (tx != null) {
+                        if (transactionManager == null) {
+                            throw new IllegalStateException("Exchange is enlisted in a transaction, but no transaction manager is available");
+                        }
+                        transactionManager.suspend();
+                    }
+                    workManager.scheduleWork(new Work() {
+                        public void release() {
+                        }
+                        public void run() {
+                            processExchangeInTx(exchange, tx);
+                        }
+                    });
+                }
+            } catch (Throwable t) {
+                if (running.get() == false) {
+                    // Should have been interrupted, discard the throwable
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Polling thread will stop");
+                    }
+                } else {
+                    logger.error("Error polling delivery channel", t);
+                }
+            }
+        }
+        synchronized (polling) {
+            polling.set(false);
+            polling.notify();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see javax.jbi.component.ComponentLifeCycle#stop()
+     */
+    public void stop() throws JBIException {
+        try {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Stopping component");
+            }
+            if (this.running.compareAndSet(true, false)) {
+                doStop();
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("Component stopped");
+            }
+        } catch (JBIException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new JBIException("Error calling stop", e);
+        }
+    }
+
+    protected void doStop() throws Exception {
+        // Interrupt the polling thread and await termination
+        try {
+            synchronized (polling) {
+                if (polling.get()) {
+                    poller.interrupt();
+                    polling.wait();
+                }
+            }
+        } finally {
+            poller = null;
+        }
+    }
+
+    /**
+     * @return Returns the context.
+     */
+    public ComponentContext getContext() {
+        return context;
+    }
+
+    public WorkManager getWorkManager() {
+        return workManager;
+    }
+
+    protected WorkManager createWorkManager() {
+        // Create a very simple one
+        return new BasicWorkManager();
+    }
+
+    protected WorkManager findWorkManager() {
+        // If inside ServiceMix, retrieve its work manager
+        try {
+            Method getContainerMth = context.getClass().getMethod("getContainer", new Class[0]);
+            Object container = getContainerMth.invoke(context, new Object[0]);
+            Method getWorkManagerMth = container.getClass().getMethod("getWorkManager", new Class[0]);
+            return (WorkManager) getWorkManagerMth.invoke(container, new Object[0]);
+        } catch (Throwable t) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("JBI container is not ServiceMix. Will create our own WorkManager", t);
+            }
+        }
+        // TODO: should look in jndi for an existing work manager
+        return null;
+    }
+    
+    protected void processExchangeInTx(MessageExchange exchange, Transaction tx) {
+        try {
+            if (tx != null) {
+                transactionManager.resume(tx);
+            }
+            processExchange(exchange);
+        } catch (Exception e) {
+            logger.error("Error processing exchange " + exchange, e);
+            try {
+                // If we are transacted and this is a runtime exception
+                // try to mark transaction as rollback
+                if (tx != null && e instanceof RuntimeException) {
+                    transactionManager.setRollbackOnly();
+                } else  {
+                    exchange.setError(e);
+                    channel.send(exchange);
+                }
+            } catch (Exception inner) {
+                logger.error("Error setting exchange status to ERROR", inner);
+            }
+        } finally {
+            try {
+                // Check transaction status
+                if (tx != null) {
+                    int status = transactionManager.getStatus();
+                    // We use pull delivery, so the transaction should already
+                    // have been transfered to another thread because the component
+                    // must have answered.
+                    if (status != Status.STATUS_NO_TRANSACTION) {
+                        logger.error("Transaction is still active after exchange processing. Trying to rollback transaction.");
+                        try {
+                            transactionManager.rollback();
+                        } catch (Throwable t) {
+                            logger.error("Error trying to rollback transaction.", t);
+                        }
+                    }
+                }
+            } catch (Throwable t) {
+                logger.error("Error checking transaction status.", t);
+            }
+        }
+    }
+    
+    public void processExchange(MessageExchange exchange) throws Exception {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Received exchange: status: " + exchange.getStatus() + ", role: " + exchange.getRole());
+        }
+        if (exchange.getRole() == Role.PROVIDER) {
+            boolean dynamic = false;
+            ServiceEndpoint endpoint = exchange.getEndpoint();
+            String key = EndpointSupport.getKey(exchange.getEndpoint());
+            Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key);
+            if (ep == null) {
+                if (endpoint.getServiceName().equals(getEPRServiceName())) {
+                    ep = getResolvedEPR(exchange.getEndpoint());
+                    dynamic = true;
+                } 
+                if (ep == null) {
+                    throw new IllegalStateException("Endpoint not found: " + key);
+                }
+            }
+            ExchangeProcessor processor = ep.getProcessor();
+            if (processor == null) {
+                throw new IllegalStateException("No processor found for endpoint: " + key);
+            }
+            try {
+                processor.process(exchange);
+            } finally {
+                // If the endpoint is dynamic, deactivate it
+                if (dynamic) {
+                    ep.deactivate();
+                }
+            }
+        } else {
+            ExchangeProcessor processor = null;
+            if (exchange.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
+                String key = exchange.getProperty(JbiConstants.SENDER_ENDPOINT).toString();
+                Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key);
+                if (ep != null) {
+                    processor = ep.getProcessor();
+                }
+            } else {
+                processor = (ExchangeProcessor) processors.remove(exchange.getExchangeId());
+            }
+            if (processor == null) {
+                throw new IllegalStateException("No processor found for: " + exchange.getExchangeId());
+            }
+            processor.process(exchange);
+        }
+    }
+
+    /**
+     * 
+     * @param exchange
+     * @param processor
+     * @throws MessagingException
+     * @deprecated use sendConsumerExchange(MessageExchange, Endpoint) instead
+     */
+    public void sendConsumerExchange(MessageExchange exchange, ExchangeProcessor processor) throws MessagingException {
+        // If the exchange is not ACTIVE, no answer is expected
+        if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+            processors.put(exchange.getExchangeId(), processor);
+        }
+        channel.send(exchange);
+    }
+    
+    /**
+     * This method allows the component to keep no state in memory so that
+     * components can be clustered and provide fail-over and load-balancing.
+     * @param exchange
+     * @param endpoint
+     * @throws MessagingException
+     */
+    public void sendConsumerExchange(MessageExchange exchange, Endpoint endpoint) throws MessagingException {
+        String key = EndpointSupport.getKey(endpoint);
+        exchange.setProperty(JbiConstants.SENDER_ENDPOINT, key);
+        channel.send(exchange);
+    }
+    
+    /**
+     * Handle an exchange sent to an EPR resolved by this component
+     * @param exchange
+     * @return an endpoint to use for handling the exchange
+     * @throws Exception
+     */
+    protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {
+        throw new UnsupportedOperationException("Component does not handle EPR exchanges");
+    }
+
+}

Modified: incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java?rev=411549&r1=411548&r2=411549&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java (original)
+++ incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java Sun Jun  4 04:58:59 2006
@@ -15,439 +15,46 @@
  */
 package org.apache.servicemix.common;
 
-import java.lang.reflect.Method;
-import java.util.Map;
-
-import javax.jbi.JBIException;
-import javax.jbi.component.ComponentContext;
-import javax.jbi.component.ComponentLifeCycle;
-import javax.jbi.messaging.DeliveryChannel;
-import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.MessageExchange.Role;
-import javax.jbi.servicedesc.ServiceEndpoint;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkManager;
 import javax.transaction.Status;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import javax.xml.namespace.QName;
-
-import org.apache.commons.logging.Log;
-import org.apache.servicemix.JbiConstants;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.servicemix.MessageExchangeListener;
 
 /**
  * Base class for life cycle management of components.
- * This class may be used as is.
+ * This lifecycle uses Push delivery by implementing MessageExchangeListerner interface
  * 
  * @author Guillaume Nodet
  * @version $Revision$
  * @since 3.0
  */
-public class BaseLifeCycle implements ComponentLifeCycle {
+public class BaseLifeCycle extends AsyncBaseLifeCycle implements MessageExchangeListener {
 
-    protected final transient Log logger;
-    
-    protected BaseComponent component;
-    protected ComponentContext context;
-    protected ObjectName mbeanName;
-    protected WorkManager workManager;
-    protected AtomicBoolean running;
-    protected DeliveryChannel channel;
-    protected Thread poller;
-    protected AtomicBoolean polling;
-    protected TransactionManager transactionManager;
-    protected boolean workManagerCreated;
-    protected Map processors = new ConcurrentHashMap();
-    
-    
     public BaseLifeCycle(BaseComponent component) {
-        this.component = component;
-        this.logger = component.logger;
-        this.running = new AtomicBoolean(false);
-        this.polling = new AtomicBoolean(false);
-        this.processors = new ConcurrentHashMap();
-    }
-    
-    /* (non-Javadoc)
-     * @see javax.jbi.component.ComponentLifeCycle#getExtensionMBeanName()
-     */
-    public ObjectName getExtensionMBeanName() {
-        return mbeanName;
-    }
-
-    protected Object getExtensionMBean() throws Exception {
-        return null;
-    }
-    
-    protected ObjectName createExtensionMBeanName() throws Exception {
-        return this.context.getMBeanNames().createCustomComponentMBeanName("Configuration");
-    }
-    
-    protected QName getEPRServiceName() {
-        return null;
-    }
-
-    /* (non-Javadoc)
-     * @see javax.jbi.component.ComponentLifeCycle#init(javax.jbi.component.ComponentContext)
-     */
-    public void init(ComponentContext context) throws JBIException {
-        try {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Initializing component");
-            }
-            this.context = context;
-            this.channel = context.getDeliveryChannel();
-            this.transactionManager = (TransactionManager) context.getTransactionManager();
-            doInit();
-            if (logger.isDebugEnabled()) {
-                logger.debug("Component initialized");
-            }
-        } catch (JBIException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new JBIException("Error calling init", e);
-        }
-    }
-
-    protected void doInit() throws Exception {
-        // Register extension mbean
-        Object mbean = getExtensionMBean();
-        if (mbean != null) {
-            MBeanServer server = this.context.getMBeanServer();
-            if (server == null) {
-                // TODO: log a warning ?
-                //throw new JBIException("null mBeanServer");
-            } else {
-                this.mbeanName = createExtensionMBeanName();
-                if (server.isRegistered(this.mbeanName)) {
-                    server.unregisterMBean(this.mbeanName);
-                }
-                server.registerMBean(mbean, this.mbeanName);
-            }
-        }
-        // Obtain or create the work manager
-        // When using the WorkManager from ServiceMix,
-        // some class loader problems can appear when
-        // trying to uninstall the components.
-        // Some threads owned by the work manager have a 
-        // security context referencing the component class loader
-        // so that every loaded classes are locked
-        //this.workManager = findWorkManager();
-        if (this.workManager == null) {
-            this.workManagerCreated = true;
-            this.workManager = createWorkManager();
-        }
-    }
-
-    /* (non-Javadoc)
-     * @see javax.jbi.component.ComponentLifeCycle#shutDown()
-     */
-    public void shutDown() throws JBIException {
-        try {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Shutting down component");
-            }
-            doShutDown();
-            this.context = null;
-            if (logger.isDebugEnabled()) {
-                logger.debug("Component shut down");
-            }
-        } catch (JBIException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new JBIException("Error calling shutdown", e);
-        }
-    }
-
-    protected void doShutDown() throws Exception {
-        // Unregister mbean
-        if (this.mbeanName != null) {
-            MBeanServer server = this.context.getMBeanServer();
-            if (server == null) {
-                throw new JBIException("null mBeanServer");
-            }
-            if (server.isRegistered(this.mbeanName)) {
-                server.unregisterMBean(this.mbeanName);
-            }
-        }
-        // Destroy work manager, if created
-        if (this.workManagerCreated) {
-            if (this.workManager instanceof BasicWorkManager) {
-                ((BasicWorkManager) this.workManager).shutDown();
-            }
-            this.workManager = null;
-        }
-    }
-
-    /* (non-Javadoc)
-     * @see javax.jbi.component.ComponentLifeCycle#start()
-     */
-    public void start() throws JBIException {
-        try {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Starting component");
-            }
-            if (this.running.compareAndSet(false, true)) {
-                doStart();
-            }
-            if (logger.isDebugEnabled()) {
-                logger.debug("Component started");
-            }
-        } catch (JBIException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new JBIException("Error calling start", e);
-        }
-    }
-
-    protected void doStart() throws Exception {
-        synchronized (this.polling) {
-            workManager.startWork(new Work() {
-                public void release() { }
-                public void run() {
-                    poller = Thread.currentThread();
-                    pollDeliveryChannel();
-                }
-            });
-            polling.wait();
-        }
+        super(component);
     }
     
-    protected void pollDeliveryChannel() {
-        synchronized (polling) {
-            polling.set(true);
-            polling.notify();
-        }
-        while (running.get()) {
-            try {
-                final MessageExchange exchange = channel.accept(1000L);
-                if (exchange != null) {
-                    final Transaction tx = (Transaction) exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
-                    if (tx != null) {
-                        if (transactionManager == null) {
-                            throw new IllegalStateException("Exchange is enlisted in a transaction, but no transaction manager is available");
-                        }
-                        transactionManager.suspend();
-                    }
-                    workManager.scheduleWork(new Work() {
-                        public void release() {
-                        }
-                        public void run() {
-                            try {
-                                if (tx != null) {
-                                    transactionManager.resume(tx);
-                                }
-                                processExchange(exchange);
-                            } catch (Throwable t) {
-                                logger.error("Error processing exchange " + exchange, t);
-                                // Set an error on message
-                                try {
-                                    if (t instanceof Exception) {
-                                        exchange.setError((Exception) t);
-                                    } else {
-                                        exchange.setError(new Exception("Throwable", t));
-                                    }
-                                    channel.send(exchange);
-                                } catch (Exception inner) {
-                                    logger.error("Error setting exchange status to ERROR", inner);
-                                }
-                            } finally {
-                                // Check transaction status
-                                if (tx != null) {
-                                    int status = Status.STATUS_NO_TRANSACTION;
-                                    try {
-                                        status = transactionManager.getStatus();
-                                    } catch (Throwable t) {
-                                        logger.error("Error checking transaction status.", t);
-                                    }
-                                    if (status != Status.STATUS_NO_TRANSACTION) {
-                                        logger.error("Transaction is still active after exchange processing. Trying to rollback transaction.");
-                                        try {
-                                            transactionManager.rollback();
-                                        } catch (Throwable t) {
-                                            logger.error("Error trying to rollback transaction.", t);
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    });
-                }
-            } catch (Throwable t) {
-                if (running.get() == false) {
-                    // Should have been interrupted, discard the throwable
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Polling thread will stop");
-                    }
-                } else {
-                    logger.error("Error polling delivery channel", t);
-                }
-            }
-        }
-        synchronized (polling) {
-            polling.set(false);
-            polling.notify();
-        }
-    }
-
     /* (non-Javadoc)
-     * @see javax.jbi.component.ComponentLifeCycle#stop()
+     * @see org.apache.servicemix.common.AsyncBaseLifeCycle#onMessageExchange(javax.jbi.messaging.MessageExchange)
      */
-    public void stop() throws JBIException {
+    public void onMessageExchange(MessageExchange exchange) {
         try {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Stopping component");
-            }
-            if (this.running.compareAndSet(true, false)) {
-                doStop();
-            }
-            if (logger.isDebugEnabled()) {
-                logger.debug("Component stopped");
-            }
-        } catch (JBIException e) {
-            throw e;
+            processExchange(exchange);
         } catch (Exception e) {
-            throw new JBIException("Error calling stop", e);
-        }
-    }
-
-    protected void doStop() throws Exception {
-        // Interrupt the polling thread and await termination
-        try {
-            synchronized (polling) {
-                if (polling.get()) {
-                    poller.interrupt();
-                    polling.wait();
-                }
-            }
-        } finally {
-            poller = null;
-        }
-    }
-
-    /**
-     * @return Returns the context.
-     */
-    public ComponentContext getContext() {
-        return context;
-    }
-
-    public WorkManager getWorkManager() {
-        return workManager;
-    }
-
-    protected WorkManager createWorkManager() {
-        // Create a very simple one
-        return new BasicWorkManager();
-    }
-
-    protected WorkManager findWorkManager() {
-        // If inside ServiceMix, retrieve its work manager
-        try {
-            Method getContainerMth = context.getClass().getMethod("getContainer", new Class[0]);
-            Object container = getContainerMth.invoke(context, new Object[0]);
-            Method getWorkManagerMth = container.getClass().getMethod("getWorkManager", new Class[0]);
-            return (WorkManager) getWorkManagerMth.invoke(container, new Object[0]);
-        } catch (Throwable t) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("JBI container is not ServiceMix. Will create our own WorkManager", t);
-            }
-        }
-        // TODO: should look in jndi for an existing work manager
-        return null;
-    }
-    
-    public void processExchange(MessageExchange exchange) throws Exception {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Received exchange: status: " + exchange.getStatus() + ", role: " + exchange.getRole());
-        }
-        if (exchange.getRole() == Role.PROVIDER) {
-            boolean dynamic = false;
-            ServiceEndpoint endpoint = exchange.getEndpoint();
-            String key = EndpointSupport.getKey(exchange.getEndpoint());
-            Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key);
-            if (ep == null) {
-                if (endpoint.getServiceName().equals(getEPRServiceName())) {
-                    ep = getResolvedEPR(exchange.getEndpoint());
-                    dynamic = true;
-                } 
-                if (ep == null) {
-                    throw new IllegalStateException("Endpoint not found: " + key);
-                }
-            }
-            ExchangeProcessor processor = ep.getProcessor();
-            if (processor == null) {
-                throw new IllegalStateException("No processor found for endpoint: " + key);
-            }
+            logger.error("Error processing exchange " + exchange, e);
             try {
-                processor.process(exchange);
-            } finally {
-                // If the endpoint is dynamic, deactivate it
-                if (dynamic) {
-                    ep.deactivate();
+                // If we are transacted and this is a runtime exception
+                // try to mark transaction as rollback
+                if (transactionManager.getStatus() != Status.STATUS_ACTIVE && e instanceof RuntimeException) {
+                    transactionManager.setRollbackOnly();
+                } else  {
+                    exchange.setError(e);
+                    channel.send(exchange);
                 }
+            } catch (Exception inner) {
+                logger.error("Error setting exchange status to ERROR", inner);
             }
-        } else {
-            ExchangeProcessor processor = null;
-            if (exchange.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
-                String key = exchange.getProperty(JbiConstants.SENDER_ENDPOINT).toString();
-                Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key);
-                if (ep != null) {
-                    processor = ep.getProcessor();
-                }
-            } else {
-                processor = (ExchangeProcessor) processors.remove(exchange.getExchangeId());
-            }
-            if (processor == null) {
-                throw new IllegalStateException("No processor found for: " + exchange.getExchangeId());
-            }
-            processor.process(exchange);
-        }
-    }
-
-    /**
-     * 
-     * @param exchange
-     * @param processor
-     * @throws MessagingException
-     * @deprecated use sendConsumerExchange(MessageExchange, Endpoint) instead
-     */
-    public void sendConsumerExchange(MessageExchange exchange, ExchangeProcessor processor) throws MessagingException {
-        // If the exchange is not ACTIVE, no answer is expected
-        if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-            processors.put(exchange.getExchangeId(), processor);
         }
-        channel.send(exchange);
     }
     
-    /**
-     * This method allows the component to keep no state in memory so that
-     * components can be clustered and provide fail-over and load-balancing.
-     * @param exchange
-     * @param endpoint
-     * @throws MessagingException
-     */
-    public void sendConsumerExchange(MessageExchange exchange, Endpoint endpoint) throws MessagingException {
-        String key = EndpointSupport.getKey(endpoint);
-        exchange.setProperty(JbiConstants.SENDER_ENDPOINT, key);
-        channel.send(exchange);
-    }
-    
-    /**
-     * Handle an exchange sent to an EPR resolved by this component
-     * @param exchange
-     * @return an endpoint to use for handling the exchange
-     * @throws Exception
-     */
-    protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {
-        throw new UnsupportedOperationException("Component does not handle EPR exchanges");
-    }
-
 }

Added: incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java?rev=411549&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java (added)
+++ incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java Sun Jun  4 04:58:59 2006
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.common;
+
+import javax.jbi.component.Component;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.geronimo.transaction.context.GeronimoTransactionManager;
+import org.apache.geronimo.transaction.context.TransactionContextManager;
+import org.apache.geronimo.transaction.manager.TransactionManagerImpl;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.nmr.flow.Flow;
+import org.apache.servicemix.jbi.nmr.flow.jca.JCAFlow;
+import org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow;
+
+public class TransactionsTest extends TestCase {
+
+    private JBIContainer jbi;
+    private BrokerService broker;
+    private TransactionManagerImpl exTransactionManager;
+    private TransactionContextManager transactionContextManager;
+    private TransactionManager txManager;
+    private Component component;
+    private ServiceMixClient client;
+    private boolean setRollbackBefore = false;
+    private boolean setRollbackAfter = false;
+    private boolean throwExBefore = false;
+    private boolean throwExAfter = false;
+    
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.addConnector("tcp://localhost:61616");
+        broker.start();
+        
+        exTransactionManager = new TransactionManagerImpl(600, null, null);
+        transactionContextManager = new TransactionContextManager(exTransactionManager, exTransactionManager);
+        txManager = (TransactionManager) new GeronimoTransactionManager(transactionContextManager);
+        
+        JCAFlow jcaFlow = new JCAFlow();
+        jcaFlow.setTransactionContextManager(transactionContextManager);
+        
+        jbi = new JBIContainer();
+        jbi.setFlows(new Flow[] { new SedaFlow(), jcaFlow });
+        jbi.setEmbedded(true);
+        jbi.setUseMBeanServer(false);
+        jbi.setTransactionManager(txManager);
+        jbi.setAutoEnlistInTransaction(true);
+        jbi.init();
+        jbi.start();
+        component = new TestComponent();
+        jbi.activateComponent(component, "test");
+        client = new DefaultServiceMixClient(jbi);
+    }
+    
+    protected void tearDown() throws Exception {
+        jbi.shutDown();
+        broker.stop();
+    }
+    
+    public void testTxOkAsync() throws Exception {
+        setRollbackBefore = false;
+        setRollbackAfter = false;
+        throwExBefore = false;
+        throwExAfter = false;
+        txManager.begin();
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("service"));
+        me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.send(me);
+        assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        txManager.commit();
+        me = (InOnly) client.receive(1000);
+        assertNotNull(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+    }
+    
+    public void testTxOkSync() throws Exception {
+        setRollbackBefore = false;
+        setRollbackAfter = false;
+        throwExBefore = false;
+        throwExAfter = false;
+        txManager.begin();
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("service"));
+        me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.sendSync(me, 1000);
+        assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        txManager.commit();
+    }
+    
+    public void testTxRollbackBeforeAsync() throws Exception {
+        setRollbackBefore = true;
+        setRollbackAfter = false;
+        throwExBefore = false;
+        throwExAfter = false;
+        txManager.begin();
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("service"));
+        me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.send(me);
+        assertEquals(Status.STATUS_MARKED_ROLLBACK, txManager.getStatus());
+        txManager.rollback();
+    }
+    
+    public void testTxRollbackBeforeSync() throws Exception {
+        setRollbackBefore = true;
+        setRollbackAfter = false;
+        throwExBefore = false;
+        throwExAfter = false;
+        txManager.begin();
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("service"));
+        me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.sendSync(me);
+        assertEquals(Status.STATUS_MARKED_ROLLBACK, txManager.getStatus());
+        txManager.rollback();
+    }
+    
+    public void testTxThrowBefore() throws Exception {
+        setRollbackBefore = false;
+        setRollbackAfter = false;
+        throwExBefore = true;
+        throwExAfter = false;
+        txManager.begin();
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("service"));
+        me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.send(me);
+        assertEquals(Status.STATUS_MARKED_ROLLBACK, txManager.getStatus());
+        txManager.rollback();
+    }
+    
+    private class TestComponent extends BaseComponent {
+        public TestComponent() {
+            super();
+        }
+        protected BaseLifeCycle createLifeCycle() {
+            return new TestLifeCycle();
+        }
+        
+        protected class TestLifeCycle extends BaseLifeCycle {
+            protected ServiceUnit su;
+            public TestLifeCycle() {
+                super(TestComponent.this);
+            }
+            protected void doInit() throws Exception {
+                super.doInit();
+                su = new ServiceUnit();
+                su.setComponent(component);
+                TestEndpoint ep = new TestEndpoint();
+                ep.setService(new QName("service"));
+                ep.setEndpoint("endpoint");
+                ep.setServiceUnit(su);
+                su.addEndpoint(ep);
+                getRegistry().registerServiceUnit(su);
+            }
+            protected void doStart() throws Exception {
+                super.doStart();
+                su.start();
+            }
+            protected void doStop() throws Exception {
+                super.doStop();
+                su.stop();
+            }
+        }
+        
+        protected class TestEndpoint extends Endpoint implements ExchangeProcessor {
+            protected ServiceEndpoint activated;
+            public void activate() throws Exception {
+                ComponentContext ctx = this.serviceUnit.getComponent().getComponentContext();
+                activated = ctx.activateEndpoint(service, endpoint);
+            }
+            public void deactivate() throws Exception {
+                ComponentContext ctx = this.serviceUnit.getComponent().getComponentContext();
+                ctx.deactivateEndpoint(activated);
+                activated = null;
+            }
+            public ExchangeProcessor getProcessor() {
+                return this;
+            }
+            public Role getRole() {
+                return Role.PROVIDER;
+            }
+            public void process(MessageExchange exchange) throws Exception {
+                if (setRollbackBefore) {
+                    txManager.setRollbackOnly();
+                    return;
+                } else if (throwExBefore) {
+                    throw new Exception("Error");
+                }
+                exchange.setStatus(ExchangeStatus.DONE);
+                getComponentContext().getDeliveryChannel().send(exchange);
+                if (setRollbackAfter) {
+                    txManager.setRollbackOnly();
+                } else if (throwExAfter) {
+                    throw new Exception("Error");
+                }
+            }
+            public void start() throws Exception {
+            }
+            public void stop() throws Exception {
+            }
+        }
+    }
+    
+    
+}