You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/06/04 13:59:00 UTC
svn commit: r411549 - in /incubator/servicemix/trunk/servicemix-common/src:
main/java/org/apache/servicemix/common/
test/java/org/apache/servicemix/common/
Author: gnodet
Date: Sun Jun 4 04:58:59 2006
New Revision: 411549
URL: http://svn.apache.org/viewvc?rev=411549&view=rev
Log:
Improve support for transactions.
Add an AsyncBaseLifeCycle which uses Pull delivery and BaseLifeCycle now uses Push delivery.
Added:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java
Added: incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java?rev=411549&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java (added)
+++ incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java Sun Jun 4 04:58:59 2006
@@ -0,0 +1,460 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.common;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import javax.jbi.JBIException;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.component.ComponentLifeCycle;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkManager;
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.servicemix.JbiConstants;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Base class for life cycle management of components.
+ * This class may be used as is.
+ *
+ * @author Guillaume Nodet
+ * @version $Revision: 399873 $
+ * @since 3.0
+ */
+public class AsyncBaseLifeCycle implements ComponentLifeCycle {
+
+ protected final transient Log logger;
+
+ protected BaseComponent component;
+ protected ComponentContext context;
+ protected ObjectName mbeanName;
+ protected WorkManager workManager;
+ protected AtomicBoolean running;
+ protected DeliveryChannel channel;
+ protected Thread poller;
+ protected AtomicBoolean polling;
+ protected TransactionManager transactionManager;
+ protected boolean workManagerCreated;
+ protected Map processors = new ConcurrentHashMap();
+
+
+ public AsyncBaseLifeCycle(BaseComponent component) {
+ this.component = component;
+ this.logger = component.logger;
+ this.running = new AtomicBoolean(false);
+ this.polling = new AtomicBoolean(false);
+ this.processors = new ConcurrentHashMap();
+ }
+
+ /* (non-Javadoc)
+ * @see javax.jbi.component.ComponentLifeCycle#getExtensionMBeanName()
+ */
+ public ObjectName getExtensionMBeanName() {
+ return mbeanName;
+ }
+
+ protected Object getExtensionMBean() throws Exception {
+ return null;
+ }
+
+ protected ObjectName createExtensionMBeanName() throws Exception {
+ return this.context.getMBeanNames().createCustomComponentMBeanName("Configuration");
+ }
+
+ protected QName getEPRServiceName() {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see javax.jbi.component.ComponentLifeCycle#init(javax.jbi.component.ComponentContext)
+ */
+ public void init(ComponentContext context) throws JBIException {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Initializing component");
+ }
+ this.context = context;
+ this.channel = context.getDeliveryChannel();
+ this.transactionManager = (TransactionManager) context.getTransactionManager();
+ doInit();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Component initialized");
+ }
+ } catch (JBIException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new JBIException("Error calling init", e);
+ }
+ }
+
+ protected void doInit() throws Exception {
+ // Register extension mbean
+ Object mbean = getExtensionMBean();
+ if (mbean != null) {
+ MBeanServer server = this.context.getMBeanServer();
+ if (server == null) {
+ // TODO: log a warning ?
+ //throw new JBIException("null mBeanServer");
+ } else {
+ this.mbeanName = createExtensionMBeanName();
+ if (server.isRegistered(this.mbeanName)) {
+ server.unregisterMBean(this.mbeanName);
+ }
+ server.registerMBean(mbean, this.mbeanName);
+ }
+ }
+ // Obtain or create the work manager
+ // When using the WorkManager from ServiceMix,
+ // some class loader problems can appear when
+ // trying to uninstall the components.
+ // Some threads owned by the work manager have a
+ // security context referencing the component class loader
+ // so that every loaded classes are locked
+ //this.workManager = findWorkManager();
+ if (this.workManager == null) {
+ this.workManagerCreated = true;
+ this.workManager = createWorkManager();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see javax.jbi.component.ComponentLifeCycle#shutDown()
+ */
+ public void shutDown() throws JBIException {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Shutting down component");
+ }
+ doShutDown();
+ this.context = null;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Component shut down");
+ }
+ } catch (JBIException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new JBIException("Error calling shutdown", e);
+ }
+ }
+
+ protected void doShutDown() throws Exception {
+ // Unregister mbean
+ if (this.mbeanName != null) {
+ MBeanServer server = this.context.getMBeanServer();
+ if (server == null) {
+ throw new JBIException("null mBeanServer");
+ }
+ if (server.isRegistered(this.mbeanName)) {
+ server.unregisterMBean(this.mbeanName);
+ }
+ }
+ // Destroy work manager, if created
+ if (this.workManagerCreated) {
+ if (this.workManager instanceof BasicWorkManager) {
+ ((BasicWorkManager) this.workManager).shutDown();
+ }
+ this.workManager = null;
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see javax.jbi.component.ComponentLifeCycle#start()
+ */
+ public void start() throws JBIException {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting component");
+ }
+ if (this.running.compareAndSet(false, true)) {
+ doStart();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Component started");
+ }
+ } catch (JBIException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new JBIException("Error calling start", e);
+ }
+ }
+
+ protected void doStart() throws Exception {
+ synchronized (this.polling) {
+ workManager.startWork(new Work() {
+ public void release() { }
+ public void run() {
+ poller = Thread.currentThread();
+ pollDeliveryChannel();
+ }
+ });
+ polling.wait();
+ }
+ }
+
+ protected void pollDeliveryChannel() {
+ synchronized (polling) {
+ polling.set(true);
+ polling.notify();
+ }
+ while (running.get()) {
+ try {
+ final MessageExchange exchange = channel.accept(1000L);
+ if (exchange != null) {
+ final Transaction tx = (Transaction) exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
+ if (tx != null) {
+ if (transactionManager == null) {
+ throw new IllegalStateException("Exchange is enlisted in a transaction, but no transaction manager is available");
+ }
+ transactionManager.suspend();
+ }
+ workManager.scheduleWork(new Work() {
+ public void release() {
+ }
+ public void run() {
+ processExchangeInTx(exchange, tx);
+ }
+ });
+ }
+ } catch (Throwable t) {
+ if (running.get() == false) {
+ // Should have been interrupted, discard the throwable
+ if (logger.isDebugEnabled()) {
+ logger.debug("Polling thread will stop");
+ }
+ } else {
+ logger.error("Error polling delivery channel", t);
+ }
+ }
+ }
+ synchronized (polling) {
+ polling.set(false);
+ polling.notify();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see javax.jbi.component.ComponentLifeCycle#stop()
+ */
+ public void stop() throws JBIException {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopping component");
+ }
+ if (this.running.compareAndSet(true, false)) {
+ doStop();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Component stopped");
+ }
+ } catch (JBIException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new JBIException("Error calling stop", e);
+ }
+ }
+
+ protected void doStop() throws Exception {
+ // Interrupt the polling thread and await termination
+ try {
+ synchronized (polling) {
+ if (polling.get()) {
+ poller.interrupt();
+ polling.wait();
+ }
+ }
+ } finally {
+ poller = null;
+ }
+ }
+
+ /**
+ * @return Returns the context.
+ */
+ public ComponentContext getContext() {
+ return context;
+ }
+
+ public WorkManager getWorkManager() {
+ return workManager;
+ }
+
+ protected WorkManager createWorkManager() {
+ // Create a very simple one
+ return new BasicWorkManager();
+ }
+
+ protected WorkManager findWorkManager() {
+ // If inside ServiceMix, retrieve its work manager
+ try {
+ Method getContainerMth = context.getClass().getMethod("getContainer", new Class[0]);
+ Object container = getContainerMth.invoke(context, new Object[0]);
+ Method getWorkManagerMth = container.getClass().getMethod("getWorkManager", new Class[0]);
+ return (WorkManager) getWorkManagerMth.invoke(container, new Object[0]);
+ } catch (Throwable t) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("JBI container is not ServiceMix. Will create our own WorkManager", t);
+ }
+ }
+ // TODO: should look in jndi for an existing work manager
+ return null;
+ }
+
+ protected void processExchangeInTx(MessageExchange exchange, Transaction tx) {
+ try {
+ if (tx != null) {
+ transactionManager.resume(tx);
+ }
+ processExchange(exchange);
+ } catch (Exception e) {
+ logger.error("Error processing exchange " + exchange, e);
+ try {
+ // If we are transacted and this is a runtime exception
+ // try to mark transaction as rollback
+ if (tx != null && e instanceof RuntimeException) {
+ transactionManager.setRollbackOnly();
+ } else {
+ exchange.setError(e);
+ channel.send(exchange);
+ }
+ } catch (Exception inner) {
+ logger.error("Error setting exchange status to ERROR", inner);
+ }
+ } finally {
+ try {
+ // Check transaction status
+ if (tx != null) {
+ int status = transactionManager.getStatus();
+ // We use pull delivery, so the transaction should already
+ // have been transfered to another thread because the component
+ // must have answered.
+ if (status != Status.STATUS_NO_TRANSACTION) {
+ logger.error("Transaction is still active after exchange processing. Trying to rollback transaction.");
+ try {
+ transactionManager.rollback();
+ } catch (Throwable t) {
+ logger.error("Error trying to rollback transaction.", t);
+ }
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("Error checking transaction status.", t);
+ }
+ }
+ }
+
+ public void processExchange(MessageExchange exchange) throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received exchange: status: " + exchange.getStatus() + ", role: " + exchange.getRole());
+ }
+ if (exchange.getRole() == Role.PROVIDER) {
+ boolean dynamic = false;
+ ServiceEndpoint endpoint = exchange.getEndpoint();
+ String key = EndpointSupport.getKey(exchange.getEndpoint());
+ Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key);
+ if (ep == null) {
+ if (endpoint.getServiceName().equals(getEPRServiceName())) {
+ ep = getResolvedEPR(exchange.getEndpoint());
+ dynamic = true;
+ }
+ if (ep == null) {
+ throw new IllegalStateException("Endpoint not found: " + key);
+ }
+ }
+ ExchangeProcessor processor = ep.getProcessor();
+ if (processor == null) {
+ throw new IllegalStateException("No processor found for endpoint: " + key);
+ }
+ try {
+ processor.process(exchange);
+ } finally {
+ // If the endpoint is dynamic, deactivate it
+ if (dynamic) {
+ ep.deactivate();
+ }
+ }
+ } else {
+ ExchangeProcessor processor = null;
+ if (exchange.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
+ String key = exchange.getProperty(JbiConstants.SENDER_ENDPOINT).toString();
+ Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key);
+ if (ep != null) {
+ processor = ep.getProcessor();
+ }
+ } else {
+ processor = (ExchangeProcessor) processors.remove(exchange.getExchangeId());
+ }
+ if (processor == null) {
+ throw new IllegalStateException("No processor found for: " + exchange.getExchangeId());
+ }
+ processor.process(exchange);
+ }
+ }
+
+ /**
+ *
+ * @param exchange
+ * @param processor
+ * @throws MessagingException
+ * @deprecated use sendConsumerExchange(MessageExchange, Endpoint) instead
+ */
+ public void sendConsumerExchange(MessageExchange exchange, ExchangeProcessor processor) throws MessagingException {
+ // If the exchange is not ACTIVE, no answer is expected
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ processors.put(exchange.getExchangeId(), processor);
+ }
+ channel.send(exchange);
+ }
+
+ /**
+ * This method allows the component to keep no state in memory so that
+ * components can be clustered and provide fail-over and load-balancing.
+ * @param exchange
+ * @param endpoint
+ * @throws MessagingException
+ */
+ public void sendConsumerExchange(MessageExchange exchange, Endpoint endpoint) throws MessagingException {
+ String key = EndpointSupport.getKey(endpoint);
+ exchange.setProperty(JbiConstants.SENDER_ENDPOINT, key);
+ channel.send(exchange);
+ }
+
+ /**
+ * Handle an exchange sent to an EPR resolved by this component
+ * @param exchange
+ * @return an endpoint to use for handling the exchange
+ * @throws Exception
+ */
+ protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {
+ throw new UnsupportedOperationException("Component does not handle EPR exchanges");
+ }
+
+}
Modified: incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java?rev=411549&r1=411548&r2=411549&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java (original)
+++ incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java Sun Jun 4 04:58:59 2006
@@ -15,439 +15,46 @@
*/
package org.apache.servicemix.common;
-import java.lang.reflect.Method;
-import java.util.Map;
-
-import javax.jbi.JBIException;
-import javax.jbi.component.ComponentContext;
-import javax.jbi.component.ComponentLifeCycle;
-import javax.jbi.messaging.DeliveryChannel;
-import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.MessageExchange.Role;
-import javax.jbi.servicedesc.ServiceEndpoint;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkManager;
import javax.transaction.Status;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import javax.xml.namespace.QName;
-
-import org.apache.commons.logging.Log;
-import org.apache.servicemix.JbiConstants;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.servicemix.MessageExchangeListener;
/**
* Base class for life cycle management of components.
- * This class may be used as is.
+ * This lifecycle uses Push delivery by implementing MessageExchangeListerner interface
*
* @author Guillaume Nodet
* @version $Revision$
* @since 3.0
*/
-public class BaseLifeCycle implements ComponentLifeCycle {
+public class BaseLifeCycle extends AsyncBaseLifeCycle implements MessageExchangeListener {
- protected final transient Log logger;
-
- protected BaseComponent component;
- protected ComponentContext context;
- protected ObjectName mbeanName;
- protected WorkManager workManager;
- protected AtomicBoolean running;
- protected DeliveryChannel channel;
- protected Thread poller;
- protected AtomicBoolean polling;
- protected TransactionManager transactionManager;
- protected boolean workManagerCreated;
- protected Map processors = new ConcurrentHashMap();
-
-
public BaseLifeCycle(BaseComponent component) {
- this.component = component;
- this.logger = component.logger;
- this.running = new AtomicBoolean(false);
- this.polling = new AtomicBoolean(false);
- this.processors = new ConcurrentHashMap();
- }
-
- /* (non-Javadoc)
- * @see javax.jbi.component.ComponentLifeCycle#getExtensionMBeanName()
- */
- public ObjectName getExtensionMBeanName() {
- return mbeanName;
- }
-
- protected Object getExtensionMBean() throws Exception {
- return null;
- }
-
- protected ObjectName createExtensionMBeanName() throws Exception {
- return this.context.getMBeanNames().createCustomComponentMBeanName("Configuration");
- }
-
- protected QName getEPRServiceName() {
- return null;
- }
-
- /* (non-Javadoc)
- * @see javax.jbi.component.ComponentLifeCycle#init(javax.jbi.component.ComponentContext)
- */
- public void init(ComponentContext context) throws JBIException {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Initializing component");
- }
- this.context = context;
- this.channel = context.getDeliveryChannel();
- this.transactionManager = (TransactionManager) context.getTransactionManager();
- doInit();
- if (logger.isDebugEnabled()) {
- logger.debug("Component initialized");
- }
- } catch (JBIException e) {
- throw e;
- } catch (Exception e) {
- throw new JBIException("Error calling init", e);
- }
- }
-
- protected void doInit() throws Exception {
- // Register extension mbean
- Object mbean = getExtensionMBean();
- if (mbean != null) {
- MBeanServer server = this.context.getMBeanServer();
- if (server == null) {
- // TODO: log a warning ?
- //throw new JBIException("null mBeanServer");
- } else {
- this.mbeanName = createExtensionMBeanName();
- if (server.isRegistered(this.mbeanName)) {
- server.unregisterMBean(this.mbeanName);
- }
- server.registerMBean(mbean, this.mbeanName);
- }
- }
- // Obtain or create the work manager
- // When using the WorkManager from ServiceMix,
- // some class loader problems can appear when
- // trying to uninstall the components.
- // Some threads owned by the work manager have a
- // security context referencing the component class loader
- // so that every loaded classes are locked
- //this.workManager = findWorkManager();
- if (this.workManager == null) {
- this.workManagerCreated = true;
- this.workManager = createWorkManager();
- }
- }
-
- /* (non-Javadoc)
- * @see javax.jbi.component.ComponentLifeCycle#shutDown()
- */
- public void shutDown() throws JBIException {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Shutting down component");
- }
- doShutDown();
- this.context = null;
- if (logger.isDebugEnabled()) {
- logger.debug("Component shut down");
- }
- } catch (JBIException e) {
- throw e;
- } catch (Exception e) {
- throw new JBIException("Error calling shutdown", e);
- }
- }
-
- protected void doShutDown() throws Exception {
- // Unregister mbean
- if (this.mbeanName != null) {
- MBeanServer server = this.context.getMBeanServer();
- if (server == null) {
- throw new JBIException("null mBeanServer");
- }
- if (server.isRegistered(this.mbeanName)) {
- server.unregisterMBean(this.mbeanName);
- }
- }
- // Destroy work manager, if created
- if (this.workManagerCreated) {
- if (this.workManager instanceof BasicWorkManager) {
- ((BasicWorkManager) this.workManager).shutDown();
- }
- this.workManager = null;
- }
- }
-
- /* (non-Javadoc)
- * @see javax.jbi.component.ComponentLifeCycle#start()
- */
- public void start() throws JBIException {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Starting component");
- }
- if (this.running.compareAndSet(false, true)) {
- doStart();
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Component started");
- }
- } catch (JBIException e) {
- throw e;
- } catch (Exception e) {
- throw new JBIException("Error calling start", e);
- }
- }
-
- protected void doStart() throws Exception {
- synchronized (this.polling) {
- workManager.startWork(new Work() {
- public void release() { }
- public void run() {
- poller = Thread.currentThread();
- pollDeliveryChannel();
- }
- });
- polling.wait();
- }
+ super(component);
}
- protected void pollDeliveryChannel() {
- synchronized (polling) {
- polling.set(true);
- polling.notify();
- }
- while (running.get()) {
- try {
- final MessageExchange exchange = channel.accept(1000L);
- if (exchange != null) {
- final Transaction tx = (Transaction) exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
- if (tx != null) {
- if (transactionManager == null) {
- throw new IllegalStateException("Exchange is enlisted in a transaction, but no transaction manager is available");
- }
- transactionManager.suspend();
- }
- workManager.scheduleWork(new Work() {
- public void release() {
- }
- public void run() {
- try {
- if (tx != null) {
- transactionManager.resume(tx);
- }
- processExchange(exchange);
- } catch (Throwable t) {
- logger.error("Error processing exchange " + exchange, t);
- // Set an error on message
- try {
- if (t instanceof Exception) {
- exchange.setError((Exception) t);
- } else {
- exchange.setError(new Exception("Throwable", t));
- }
- channel.send(exchange);
- } catch (Exception inner) {
- logger.error("Error setting exchange status to ERROR", inner);
- }
- } finally {
- // Check transaction status
- if (tx != null) {
- int status = Status.STATUS_NO_TRANSACTION;
- try {
- status = transactionManager.getStatus();
- } catch (Throwable t) {
- logger.error("Error checking transaction status.", t);
- }
- if (status != Status.STATUS_NO_TRANSACTION) {
- logger.error("Transaction is still active after exchange processing. Trying to rollback transaction.");
- try {
- transactionManager.rollback();
- } catch (Throwable t) {
- logger.error("Error trying to rollback transaction.", t);
- }
- }
- }
- }
- }
- });
- }
- } catch (Throwable t) {
- if (running.get() == false) {
- // Should have been interrupted, discard the throwable
- if (logger.isDebugEnabled()) {
- logger.debug("Polling thread will stop");
- }
- } else {
- logger.error("Error polling delivery channel", t);
- }
- }
- }
- synchronized (polling) {
- polling.set(false);
- polling.notify();
- }
- }
-
/* (non-Javadoc)
- * @see javax.jbi.component.ComponentLifeCycle#stop()
+ * @see org.apache.servicemix.common.AsyncBaseLifeCycle#onMessageExchange(javax.jbi.messaging.MessageExchange)
*/
- public void stop() throws JBIException {
+ public void onMessageExchange(MessageExchange exchange) {
try {
- if (logger.isDebugEnabled()) {
- logger.debug("Stopping component");
- }
- if (this.running.compareAndSet(true, false)) {
- doStop();
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Component stopped");
- }
- } catch (JBIException e) {
- throw e;
+ processExchange(exchange);
} catch (Exception e) {
- throw new JBIException("Error calling stop", e);
- }
- }
-
- protected void doStop() throws Exception {
- // Interrupt the polling thread and await termination
- try {
- synchronized (polling) {
- if (polling.get()) {
- poller.interrupt();
- polling.wait();
- }
- }
- } finally {
- poller = null;
- }
- }
-
- /**
- * @return Returns the context.
- */
- public ComponentContext getContext() {
- return context;
- }
-
- public WorkManager getWorkManager() {
- return workManager;
- }
-
- protected WorkManager createWorkManager() {
- // Create a very simple one
- return new BasicWorkManager();
- }
-
- protected WorkManager findWorkManager() {
- // If inside ServiceMix, retrieve its work manager
- try {
- Method getContainerMth = context.getClass().getMethod("getContainer", new Class[0]);
- Object container = getContainerMth.invoke(context, new Object[0]);
- Method getWorkManagerMth = container.getClass().getMethod("getWorkManager", new Class[0]);
- return (WorkManager) getWorkManagerMth.invoke(container, new Object[0]);
- } catch (Throwable t) {
- if (logger.isDebugEnabled()) {
- logger.debug("JBI container is not ServiceMix. Will create our own WorkManager", t);
- }
- }
- // TODO: should look in jndi for an existing work manager
- return null;
- }
-
- public void processExchange(MessageExchange exchange) throws Exception {
- if (logger.isDebugEnabled()) {
- logger.debug("Received exchange: status: " + exchange.getStatus() + ", role: " + exchange.getRole());
- }
- if (exchange.getRole() == Role.PROVIDER) {
- boolean dynamic = false;
- ServiceEndpoint endpoint = exchange.getEndpoint();
- String key = EndpointSupport.getKey(exchange.getEndpoint());
- Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key);
- if (ep == null) {
- if (endpoint.getServiceName().equals(getEPRServiceName())) {
- ep = getResolvedEPR(exchange.getEndpoint());
- dynamic = true;
- }
- if (ep == null) {
- throw new IllegalStateException("Endpoint not found: " + key);
- }
- }
- ExchangeProcessor processor = ep.getProcessor();
- if (processor == null) {
- throw new IllegalStateException("No processor found for endpoint: " + key);
- }
+ logger.error("Error processing exchange " + exchange, e);
try {
- processor.process(exchange);
- } finally {
- // If the endpoint is dynamic, deactivate it
- if (dynamic) {
- ep.deactivate();
+ // If we are transacted and this is a runtime exception
+ // try to mark transaction as rollback
+ if (transactionManager.getStatus() != Status.STATUS_ACTIVE && e instanceof RuntimeException) {
+ transactionManager.setRollbackOnly();
+ } else {
+ exchange.setError(e);
+ channel.send(exchange);
}
+ } catch (Exception inner) {
+ logger.error("Error setting exchange status to ERROR", inner);
}
- } else {
- ExchangeProcessor processor = null;
- if (exchange.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
- String key = exchange.getProperty(JbiConstants.SENDER_ENDPOINT).toString();
- Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key);
- if (ep != null) {
- processor = ep.getProcessor();
- }
- } else {
- processor = (ExchangeProcessor) processors.remove(exchange.getExchangeId());
- }
- if (processor == null) {
- throw new IllegalStateException("No processor found for: " + exchange.getExchangeId());
- }
- processor.process(exchange);
- }
- }
-
- /**
- *
- * @param exchange
- * @param processor
- * @throws MessagingException
- * @deprecated use sendConsumerExchange(MessageExchange, Endpoint) instead
- */
- public void sendConsumerExchange(MessageExchange exchange, ExchangeProcessor processor) throws MessagingException {
- // If the exchange is not ACTIVE, no answer is expected
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- processors.put(exchange.getExchangeId(), processor);
}
- channel.send(exchange);
}
- /**
- * This method allows the component to keep no state in memory so that
- * components can be clustered and provide fail-over and load-balancing.
- * @param exchange
- * @param endpoint
- * @throws MessagingException
- */
- public void sendConsumerExchange(MessageExchange exchange, Endpoint endpoint) throws MessagingException {
- String key = EndpointSupport.getKey(endpoint);
- exchange.setProperty(JbiConstants.SENDER_ENDPOINT, key);
- channel.send(exchange);
- }
-
- /**
- * Handle an exchange sent to an EPR resolved by this component
- * @param exchange
- * @return an endpoint to use for handling the exchange
- * @throws Exception
- */
- protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {
- throw new UnsupportedOperationException("Component does not handle EPR exchanges");
- }
-
}
Added: incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java?rev=411549&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java (added)
+++ incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java Sun Jun 4 04:58:59 2006
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.common;
+
+import javax.jbi.component.Component;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.geronimo.transaction.context.GeronimoTransactionManager;
+import org.apache.geronimo.transaction.context.TransactionContextManager;
+import org.apache.geronimo.transaction.manager.TransactionManagerImpl;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.nmr.flow.Flow;
+import org.apache.servicemix.jbi.nmr.flow.jca.JCAFlow;
+import org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow;
+
+public class TransactionsTest extends TestCase {
+
+ private JBIContainer jbi;
+ private BrokerService broker;
+ private TransactionManagerImpl exTransactionManager;
+ private TransactionContextManager transactionContextManager;
+ private TransactionManager txManager;
+ private Component component;
+ private ServiceMixClient client;
+ private boolean setRollbackBefore = false;
+ private boolean setRollbackAfter = false;
+ private boolean throwExBefore = false;
+ private boolean throwExAfter = false;
+
+ protected void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.addConnector("tcp://localhost:61616");
+ broker.start();
+
+ exTransactionManager = new TransactionManagerImpl(600, null, null);
+ transactionContextManager = new TransactionContextManager(exTransactionManager, exTransactionManager);
+ txManager = (TransactionManager) new GeronimoTransactionManager(transactionContextManager);
+
+ JCAFlow jcaFlow = new JCAFlow();
+ jcaFlow.setTransactionContextManager(transactionContextManager);
+
+ jbi = new JBIContainer();
+ jbi.setFlows(new Flow[] { new SedaFlow(), jcaFlow });
+ jbi.setEmbedded(true);
+ jbi.setUseMBeanServer(false);
+ jbi.setTransactionManager(txManager);
+ jbi.setAutoEnlistInTransaction(true);
+ jbi.init();
+ jbi.start();
+ component = new TestComponent();
+ jbi.activateComponent(component, "test");
+ client = new DefaultServiceMixClient(jbi);
+ }
+
+ protected void tearDown() throws Exception {
+ jbi.shutDown();
+ broker.stop();
+ }
+
+ public void testTxOkAsync() throws Exception {
+ setRollbackBefore = false;
+ setRollbackAfter = false;
+ throwExBefore = false;
+ throwExAfter = false;
+ txManager.begin();
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("service"));
+ me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ client.send(me);
+ assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ txManager.commit();
+ me = (InOnly) client.receive(1000);
+ assertNotNull(me);
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ }
+
+ public void testTxOkSync() throws Exception {
+ setRollbackBefore = false;
+ setRollbackAfter = false;
+ throwExBefore = false;
+ throwExAfter = false;
+ txManager.begin();
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("service"));
+ me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ client.sendSync(me, 1000);
+ assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ txManager.commit();
+ }
+
+ public void testTxRollbackBeforeAsync() throws Exception {
+ setRollbackBefore = true;
+ setRollbackAfter = false;
+ throwExBefore = false;
+ throwExAfter = false;
+ txManager.begin();
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("service"));
+ me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ client.send(me);
+ assertEquals(Status.STATUS_MARKED_ROLLBACK, txManager.getStatus());
+ txManager.rollback();
+ }
+
+ public void testTxRollbackBeforeSync() throws Exception {
+ setRollbackBefore = true;
+ setRollbackAfter = false;
+ throwExBefore = false;
+ throwExAfter = false;
+ txManager.begin();
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("service"));
+ me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ client.sendSync(me);
+ assertEquals(Status.STATUS_MARKED_ROLLBACK, txManager.getStatus());
+ txManager.rollback();
+ }
+
+ public void testTxThrowBefore() throws Exception {
+ setRollbackBefore = false;
+ setRollbackAfter = false;
+ throwExBefore = true;
+ throwExAfter = false;
+ txManager.begin();
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("service"));
+ me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ client.send(me);
+ assertEquals(Status.STATUS_MARKED_ROLLBACK, txManager.getStatus());
+ txManager.rollback();
+ }
+
+ private class TestComponent extends BaseComponent {
+ public TestComponent() {
+ super();
+ }
+ protected BaseLifeCycle createLifeCycle() {
+ return new TestLifeCycle();
+ }
+
+ protected class TestLifeCycle extends BaseLifeCycle {
+ protected ServiceUnit su;
+ public TestLifeCycle() {
+ super(TestComponent.this);
+ }
+ protected void doInit() throws Exception {
+ super.doInit();
+ su = new ServiceUnit();
+ su.setComponent(component);
+ TestEndpoint ep = new TestEndpoint();
+ ep.setService(new QName("service"));
+ ep.setEndpoint("endpoint");
+ ep.setServiceUnit(su);
+ su.addEndpoint(ep);
+ getRegistry().registerServiceUnit(su);
+ }
+ protected void doStart() throws Exception {
+ super.doStart();
+ su.start();
+ }
+ protected void doStop() throws Exception {
+ super.doStop();
+ su.stop();
+ }
+ }
+
+ protected class TestEndpoint extends Endpoint implements ExchangeProcessor {
+ protected ServiceEndpoint activated;
+ public void activate() throws Exception {
+ ComponentContext ctx = this.serviceUnit.getComponent().getComponentContext();
+ activated = ctx.activateEndpoint(service, endpoint);
+ }
+ public void deactivate() throws Exception {
+ ComponentContext ctx = this.serviceUnit.getComponent().getComponentContext();
+ ctx.deactivateEndpoint(activated);
+ activated = null;
+ }
+ public ExchangeProcessor getProcessor() {
+ return this;
+ }
+ public Role getRole() {
+ return Role.PROVIDER;
+ }
+ public void process(MessageExchange exchange) throws Exception {
+ if (setRollbackBefore) {
+ txManager.setRollbackOnly();
+ return;
+ } else if (throwExBefore) {
+ throw new Exception("Error");
+ }
+ exchange.setStatus(ExchangeStatus.DONE);
+ getComponentContext().getDeliveryChannel().send(exchange);
+ if (setRollbackAfter) {
+ txManager.setRollbackOnly();
+ } else if (throwExAfter) {
+ throw new Exception("Error");
+ }
+ }
+ public void start() throws Exception {
+ }
+ public void stop() throws Exception {
+ }
+ }
+ }
+
+
+}