You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by tt...@apache.org on 2007/03/29 16:43:46 UTC
svn commit: r523729 -
/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/ComponentSupport.java
Author: tterm
Date: Thu Mar 29 07:43:43 2007
New Revision: 523729
URL: http://svn.apache.org/viewvc?view=rev&rev=523729
Log:
SM-906 extend ComponentSupport with methods to propagate the correlation id
Modified:
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/ComponentSupport.java
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/ComponentSupport.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/ComponentSupport.java?view=diff&rev=523729&r1=523728&r2=523729
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/ComponentSupport.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/ComponentSupport.java Thu Mar 29 07:43:43 2007
@@ -17,19 +17,14 @@
package org.apache.servicemix.components.util;
import org.apache.servicemix.jbi.NoInMessageAvailableException;
+import org.apache.servicemix.JbiConstants;
import org.w3c.dom.Document;
import org.w3c.dom.DocumentFragment;
import javax.jbi.component.Component;
import javax.jbi.component.ComponentLifeCycle;
import javax.jbi.component.ServiceUnitManager;
-import javax.jbi.messaging.DeliveryChannel;
-import javax.jbi.messaging.InOnly;
-import javax.jbi.messaging.InOut;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessageExchangeFactory;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.*;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
@@ -128,7 +123,6 @@
return true;
}
-
// Implementation methods
//-------------------------------------------------------------------------
@@ -188,11 +182,9 @@
MessageExchangeFactory factory = null;
if (service != null) {
factory = channel.createExchangeFactoryForService(service);
- }
- else if (interfaceName != null) {
+ } else if (interfaceName != null) {
factory = channel.createExchangeFactory(interfaceName);
- }
- else {
+ } else {
factory = getExchangeFactory();
}
InOnly outExchange = factory.createInOnlyExchange();
@@ -208,6 +200,11 @@
return outExchange;
}
+ public InOnly createInOnlyExchange(QName service, QName interfaceName, QName operation, MessageExchange beforeExchange) throws MessagingException {
+ InOnly inOnly = createInOnlyExchange(service, interfaceName, operation);
+ propagateCorrelationId(beforeExchange, inOnly);
+ return inOnly;
+ }
/**
* Creates a new InOut exchange for the given service, interface and/or operation (any of which can be null).
@@ -217,11 +214,9 @@
MessageExchangeFactory factory = null;
if (service != null) {
factory = channel.createExchangeFactoryForService(service);
- }
- else if (interfaceName != null) {
+ } else if (interfaceName != null) {
factory = channel.createExchangeFactory(interfaceName);
- }
- else {
+ } else {
factory = getExchangeFactory();
}
InOut outExchange = factory.createInOutExchange();
@@ -237,6 +232,93 @@
return outExchange;
}
+ public InOut creatInOutExchange(QName service, QName interfaceName, QName operation, MessageExchange srcExchange) throws MessagingException {
+ InOut inOut = createInOutExchange(service, interfaceName, operation);
+ propagateCorrelationId(srcExchange, inOut);
+ return inOut;
+ }
+
+ /**
+ * Creates an InOnly exchange and propagates the correlation id from the given exchange
+ * to the newly created exchange
+ * @param srcExchange
+ * @return InOnly
+ * @throws MessagingException
+ */
+ public InOnly createInOnlyExchange(MessageExchange srcExchange) throws MessagingException {
+ MessageExchangeFactory factory = getExchangeFactory();
+ InOnly inOnly = factory.createInOnlyExchange();
+
+ propagateCorrelationId(srcExchange, inOnly);
+
+ return inOnly;
+ }
+
+ /**
+ * Creates an InOptionalOut exchange and propagates the correlation id from the given exchange
+ * to the newly created exchange
+ * @param srcExchange
+ * @return InOptionalOut
+ * @throws MessagingException
+ */
+ public InOptionalOut createInOptionalOutExchange(MessageExchange srcExchange) throws MessagingException {
+ MessageExchangeFactory factory = getExchangeFactory();
+ InOptionalOut inOptionalOut = factory.createInOptionalOutExchange();
+
+ propagateCorrelationId(srcExchange, inOptionalOut);
+
+ return inOptionalOut;
+ }
+
+ /**
+ * Creates an InOut exchange and propagates the correlation id from the given exchange
+ * to the newly created exchange
+ * @param srcExchange
+ * @return InOut
+ * @throws MessagingException
+ */
+ public InOut createInOutExchange(MessageExchange srcExchange) throws MessagingException {
+ MessageExchangeFactory factory = getExchangeFactory();
+ InOut inOut = factory.createInOutExchange();
+
+ propagateCorrelationId(srcExchange, inOut);
+
+ return inOut;
+ }
+
+ /**
+ * Creates an RobustInOnly exchange and propagates the correlation id from the given exchange
+ * to the newly created exchange
+ * @param srcExchange
+ * @return RobustInOnly the created exchange
+ * @throws MessagingException
+ */
+ public RobustInOnly createRobustInOnlyExchange(MessageExchange srcExchange) throws MessagingException {
+ MessageExchangeFactory factory = getExchangeFactory();
+ RobustInOnly robustInOnly = factory.createRobustInOnlyExchange();
+
+ propagateCorrelationId(srcExchange, robustInOnly);
+
+ return robustInOnly;
+ }
+
+ /**
+ * Propagates the correlation id from an exchange to a newly created exchange
+ * @param source Exchange which already exists
+ * @param dest Newly created exchange which should get the correlation id
+ */
+ public void propagateCorrelationId(MessageExchange source, MessageExchange dest) {
+ if (source == null || dest == null) {
+ return;
+ }
+ String correlationId = (String) source.getProperty(JbiConstants.CORRELATION_ID);
+ if (correlationId != null) {
+ dest.setProperty(JbiConstants.CORRELATION_ID, correlationId);
+ } else {
+ dest.setProperty(JbiConstants.CORRELATION_ID, source.getExchangeId());
+ }
+ }
+
protected void forwardToExchange(MessageExchange exchange, InOnly outExchange, NormalizedMessage in, QName operationName) throws MessagingException {
if (operationName != null) {
exchange.setOperation(operationName);
@@ -250,5 +332,4 @@
getMessageTransformer().transform(exchange, in, out);
getDeliveryChannel().send(outExchange);
}
-
}