You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/10/06 13:49:52 UTC
svn commit: r822227 - in
/servicemix/components/engines/servicemix-camel/trunk/src:
main/java/org/apache/servicemix/camel/ test/java/org/apache/servicemix/camel/
Author: gertv
Date: Tue Oct 6 11:49:52 2009
New Revision: 822227
URL: http://svn.apache.org/viewvc?rev=822227&view=rev
Log:
SMXCOMP-639: Add support for header filtering strategy to servicemix-camel component
Modified:
servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java
servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java
servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiBinding.java
servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiComponent.java
servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java
servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiBindingTest.java
servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOutPipelineTest.java
Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java?rev=822227&r1=822226&r2=822227&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java Tue Oct 6 11:49:52 2009
@@ -16,17 +16,13 @@
*/
package org.apache.servicemix.camel;
-import java.util.Set;
-
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.NormalizedMessage;
import javax.xml.namespace.QName;
import org.apache.camel.Exchange;
-import org.apache.camel.Message;
import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
import org.apache.servicemix.common.util.URIResolver;
import org.apache.servicemix.id.IdGenerator;
@@ -41,8 +37,8 @@
public static final QName SERVICE_NAME = new QName("http://camel.apache.org/schema/jbi", "provider");
private JbiBinding binding;
-
- private JbiEndpoint jbiEndpoint;
+
+ private JbiEndpoint jbiEndpoint;
public CamelConsumerEndpoint(JbiBinding binding, JbiEndpoint jbiEndpoint) {
setService(SERVICE_NAME);
@@ -80,17 +76,16 @@
if (messageExchange.getStatus() == ExchangeStatus.ERROR) {
exchange.setException(messageExchange.getError());
} else if (messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
- addHeaders(messageExchange, exchange);
+ // first copy the exchange headers
+ binding.copyHeadersFromJbiToCamel(messageExchange, exchange);
+ // then copy the out/fault message
if (messageExchange.getFault() != null) {
+ binding.copyFromJbiToCamel(messageExchange.getMessage("fault"), exchange.getOut());
exchange.getOut().setBody(new FaultException("Fault occured for " + exchange.getPattern() + " exchange",
messageExchange, messageExchange.getFault()));
exchange.getOut().setFault(true);
- addHeaders(messageExchange.getFault(), exchange.getOut());
- addAttachments(messageExchange.getFault(), exchange.getOut());
} else if (messageExchange.getMessage("out") != null) {
- exchange.getOut().setBody(messageExchange.getMessage("out").getContent());
- addHeaders(messageExchange.getMessage("out"), exchange.getOut());
- addAttachments(messageExchange.getMessage("out"), exchange.getOut());
+ binding.copyFromJbiToCamel(messageExchange.getMessage("out"), exchange.getOut());
}
done(messageExchange);
}
@@ -100,32 +95,4 @@
public void validate() throws DeploymentException {
// No validation required
}
-
- @SuppressWarnings("unchecked")
- private void addHeaders(MessageExchange messageExchange, Exchange camelExchange) {
- Set entries = messageExchange.getPropertyNames();
- for (Object o : entries) {
- String key = o.toString();
- camelExchange.setProperty(key, messageExchange.getProperty(key));
- }
- }
-
- @SuppressWarnings("unchecked")
- private void addHeaders(NormalizedMessage normalizedMessage, Message camelMessage) {
- Set entries = normalizedMessage.getPropertyNames();
- for (Object o : entries) {
- String key = o.toString();
- camelMessage.setHeader(key, normalizedMessage.getProperty(key));
- }
- }
-
- @SuppressWarnings("unchecked")
- private void addAttachments(NormalizedMessage normalizedMessage, Message camelMessage) {
- Set entries = normalizedMessage.getAttachmentNames();
- for (Object o : entries) {
- String id = o.toString();
- camelMessage.addAttachment(id, normalizedMessage.getAttachment(id));
- }
- }
-
}
Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java?rev=822227&r1=822226&r2=822227&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java Tue Oct 6 11:49:52 2009
@@ -117,7 +117,7 @@
String camelUri = uri.getSchemeSpecificPart();
Endpoint camelEndpoint = jbiComponent.getCamelContext().getEndpoint(camelUri);
Processor processor = jbiComponent.createCamelProcessor(camelEndpoint);
- CamelProviderEndpoint endpoint = new CamelProviderEndpoint(getServiceUnit(), camelEndpoint, jbiComponent.getBinding(), processor);
+ CamelProviderEndpoint endpoint = new CamelProviderEndpoint(getServiceUnit(), camelEndpoint, jbiComponent.createBinding(), processor);
IntrospectionSupport.setProperties(endpoint, map);
Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java?rev=822227&r1=822226&r2=822227&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java Tue Oct 6 11:49:52 2009
@@ -28,7 +28,10 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.spi.Synchronization;
+
import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.common.ServiceUnit;
import org.apache.servicemix.common.endpoints.ProviderEndpoint;
@@ -39,14 +42,14 @@
*
* @version $Revision: 426415 $
*/
-public class CamelProviderEndpoint extends ProviderEndpoint implements Synchronization {
+public class CamelProviderEndpoint extends ProviderEndpoint implements Synchronization, HeaderFilterStrategyAware {
public static final QName SERVICE_NAME = new QName("http://camel.apache.org/schema/jbi", "provider");
- private JbiBinding binding;
+ private final JbiBinding binding;
private Processor camelProcessor;
-
+
public CamelProviderEndpoint(ServiceUnit serviceUnit, QName service, String endpoint, JbiBinding binding, Processor camelProcessor) {
super(serviceUnit, service, endpoint);
this.camelProcessor = camelProcessor;
@@ -160,4 +163,13 @@
logger.warn("Unable to send JBI MessageExchange after successful Camel route invocation: " + me, e);
}
}
+
+
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ return binding.getHeaderFilterStrategy();
+ }
+
+ public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+ binding.setHeaderFilterStrategy(strategy);
+ }
}
Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiBinding.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiBinding.java?rev=822227&r1=822226&r2=822227&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiBinding.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiBinding.java Tue Oct 6 11:49:52 2009
@@ -38,6 +38,9 @@
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,7 +49,7 @@
*
* @version $Revision: 563665 $
*/
-public class JbiBinding {
+public class JbiBinding implements HeaderFilterStrategyAware {
public static final String MESSAGE_EXCHANGE = "JbiMessageExchange";
public static final String OPERATION = "JbiOperation";
@@ -55,6 +58,7 @@
private static final Log LOG = LogFactory.getLog(JbiBinding.class);
private final CamelContext context;
+ private HeaderFilterStrategy strategy;
public JbiBinding(CamelContext context) {
super();
@@ -137,6 +141,22 @@
return ExchangePattern.InOnly;
}
}
+
+ /**
+ * Copies headers from the JBI MessageExchange to the Camel Exchange, taking into account the
+ * {@link HeaderFilterStrategy} that has been configured on this binding.
+ *
+ * @param from the JBI MessageExchange
+ * @param to the Camel Exchange
+ */
+ public void copyHeadersFromJbiToCamel(MessageExchange from, Exchange to) {
+ for (Object object : from.getPropertyNames()) {
+ String key = object.toString();
+ if (!getHeaderFilterStrategy().applyFilterToCamelHeaders(key, from.getProperty(key), null)) {
+ to.setProperty(key, from.getProperty(key));
+ }
+ }
+ }
/**
* Copies content, headers, security subject and attachments from the JBI NormalizedMessage to the Camel Message.
@@ -149,8 +169,11 @@
if (from.getSecuritySubject() != null) {
to.setHeader(SECURITY_SUBJECT, from.getSecuritySubject());
}
- for (Object key : from.getPropertyNames()) {
- to.setHeader(key.toString(), from.getProperty(key.toString()));
+ for (Object object : from.getPropertyNames()) {
+ String key = object.toString();
+ if (!strategy.applyFilterToCamelHeaders(key, from.getProperty(key), to.getExchange())) {
+ to.setHeader(key, from.getProperty(key));
+ }
}
for (Object id : from.getAttachmentNames()) {
to.addAttachment(id.toString(), from.getAttachment(id.toString()));
@@ -172,7 +195,7 @@
for (String key : message.getHeaders().keySet()) {
Object value = message.getHeader(key);
- if (isSerializable(value)) {
+ if (isSerializable(value) && !getHeaderFilterStrategy().applyFilterToCamelHeaders(key, value, message.getExchange())) {
normalizedMessage.setProperty(key, value);
}
}
@@ -246,4 +269,15 @@
protected boolean isSerializable(Object object) {
return (object instanceof Serializable) && !(object instanceof Map) && !(object instanceof Collection);
}
+
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ if (strategy == null) {
+ strategy = new DefaultHeaderFilterStrategy();
+ }
+ return strategy;
+ }
+
+ public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+ this.strategy = strategy;
+ }
}
Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiComponent.java?rev=822227&r1=822226&r2=822227&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiComponent.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiComponent.java Tue Oct 6 11:49:52 2009
@@ -30,7 +30,6 @@
public class JbiComponent implements Component {
private CamelComponent camelJbiComponent;
- private JbiBinding binding;
private CamelContext camelContext;
private IdGenerator idGenerator;
private String suName;
@@ -68,22 +67,8 @@
return suName;
}
- /**
- * @return the binding
- */
- public JbiBinding getBinding() {
- if (binding == null) {
- binding = new JbiBinding(camelContext);
- }
- return binding;
- }
-
- /**
- * @param binding
- * the binding to set
- */
- public void setBinding(JbiBinding binding) {
- this.binding = binding;
+ public JbiBinding createBinding() {
+ return new JbiBinding(camelContext);
}
// Resolve Camel Endpoints
@@ -146,10 +131,11 @@
+ endpointUri);
}
jbiEndpoint = new CamelProviderEndpoint(getCamelJbiComponent()
- .getServiceUnit(), service, endpoint, getBinding(), processor);
+ .getServiceUnit(), service, endpoint, createBinding(), processor);
+ jbiEndpoint.setHeaderFilterStrategy(((JbiEndpoint) camelEndpoint).getHeaderFilterStrategy());
} else {
jbiEndpoint = new CamelProviderEndpoint(getCamelJbiComponent()
- .getServiceUnit(), camelEndpoint, getBinding(), processor);
+ .getServiceUnit(), camelEndpoint, createBinding(), processor);
}
return jbiEndpoint;
}
Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java?rev=822227&r1=822226&r2=822227&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java Tue Oct 6 11:49:52 2009
@@ -29,6 +29,9 @@
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
+import org.apache.camel.spi.Registry;
import org.apache.camel.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,7 +42,7 @@
*
* @version $Revision: 563665 $
*/
-public class JbiEndpoint extends DefaultEndpoint {
+public class JbiEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
private String destinationUri;
@@ -49,11 +52,16 @@
private JbiProducer producer;
+ private HeaderFilterStrategy headerFilterStrategy;
+
private final JbiComponent jbiComponent;
+ private final JbiBinding binding;
+
public JbiEndpoint(JbiComponent jbiComponent, String uri) {
super(uri, jbiComponent);
this.jbiComponent = jbiComponent;
+ this.binding = jbiComponent.createBinding();
parseUri(uri);
}
@@ -76,7 +84,7 @@
@Override
public void start() throws Exception {
- consumer = new CamelConsumerEndpoint(jbiComponent.getBinding(), JbiEndpoint.this);
+ consumer = new CamelConsumerEndpoint(binding, JbiEndpoint.this);
jbiComponent.getCamelJbiComponent().addEndpoint(consumer);
super.start();
}
@@ -120,6 +128,23 @@
operation = QName.valueOf(oper);
}
this.destinationUri = destinationUri.substring(0, idx);
+
+ String filter = (String) params.get("headerFilterStrategy");
+ if (StringUtils.hasLength(filter)) {
+ Registry registry = jbiComponent.getCamelContext().getRegistry();
+ if((idx = filter.indexOf('#')) != -1) {
+ filter = filter.substring(1);
+ }
+ Object object = registry.lookup(filter);
+ if (object instanceof HeaderFilterStrategy) {
+ headerFilterStrategy = (HeaderFilterStrategy)object;
+ binding.setHeaderFilterStrategy(headerFilterStrategy);
+ }
+ params.remove("headerFilterStrategy");
+ String endpointUri = this.destinationUri + URISupport.createQueryString(params);
+ this.setEndpointUri(endpointUri);
+
+ }
}
} catch (URISyntaxException e) {
throw new JbiException(e);
@@ -174,4 +199,12 @@
public boolean isSingleton() {
return true;
}
+
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ return binding.getHeaderFilterStrategy();
+ }
+
+ public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+ binding.setHeaderFilterStrategy(strategy);
+ }
}
Modified: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiBindingTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiBindingTest.java?rev=822227&r1=822226&r2=822227&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiBindingTest.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiBindingTest.java Tue Oct 6 11:49:52 2009
@@ -35,6 +35,8 @@
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.servicemix.camel.JbiInOutPipelineTest.MyHeaderFilterStrategy;
import org.apache.servicemix.jbi.jaxp.StringSource;
import org.apache.servicemix.tck.mock.MockExchangeFactory;
import org.apache.servicemix.tck.mock.MockMessageExchange;
@@ -45,7 +47,9 @@
private static final QName OPERATION = new QName("urn:test", "operation");
private static final Source CONTENT = new StringSource("<my>content</my>");
private static final String KEY = "key";
+ private static final String FILTERED_KEY = "filtered.key";
private static final Object VALUE = "value";
+ private static final Object FILTERED_VALUE = "filtered.value";
private static final DataHandler DATA = new DataHandler(new Object(), "application/dummy");
private static final String ID = "id";
private static final Subject SUBJECT = new Subject();
@@ -57,6 +61,7 @@
protected void setUp() throws Exception {
factory = new MockExchangeFactory();
binding = new JbiBinding(new DefaultCamelContext());
+ binding.setHeaderFilterStrategy(new MyHeaderFilterStrategy());
}
public void testCreateExchangeWithOperation() throws Exception {
@@ -103,6 +108,28 @@
ExchangePattern.InOptionalOut, exchange.getPattern());
}
+ public void testCreateExchangeWithInContentAndHeaderFilterStrategy() throws Exception {
+ MessageExchange me = factory.createInOptionalOutExchange();
+ MockNormalizedMessage nm = new MockNormalizedMessage();
+ nm.setContent(CONTENT);
+ nm.setProperty(KEY, VALUE);
+ nm.setProperty(FILTERED_KEY, FILTERED_VALUE);
+ me.setMessage(nm, "in");
+
+ Exchange exchange = binding.createExchange(me);
+ assertNotNull(exchange);
+ assertSame("JBI MessageExchange is available as a property",
+ me, exchange.getProperty(JbiBinding.MESSAGE_EXCHANGE));
+ assertEquals("JBI NormalizedMessage content is available in the Camel Message",
+ CONTENT, exchange.getIn().getBody());
+ assertEquals("JBI NormalizedMessage headers are available in the Camel Message",
+ VALUE, exchange.getIn().getHeader(KEY));
+ assertFalse("JBI NormalizedMessage headers have been filtered by the strategy",
+ exchange.getIn().getHeaders().containsKey(FILTERED_KEY));
+ assertEquals("Camel Exchange uses the same MEP",
+ ExchangePattern.InOptionalOut, exchange.getPattern());
+ }
+
public void testCreateExchangeWithInContentAndAttachment() throws Exception {
MessageExchange me = factory.createInOutExchange();
MockNormalizedMessage nm = new MockNormalizedMessage();
@@ -176,10 +203,36 @@
"another-value", me.getProperty("another-key"));
}
+ public void testCopyHeadersFromJbiToCamel() throws Exception {
+ MessageExchange me = new MockMessageExchange();
+ me.setProperty(KEY, VALUE);
+ me.setProperty(FILTERED_KEY, FILTERED_VALUE);
+
+ Exchange exchange = new DefaultExchange(new DefaultCamelContext());
+ binding.copyHeadersFromJbiToCamel(me, exchange);
+
+ assertEquals("Should copy header properties into the Camel Exchange",
+ VALUE, exchange.getProperty(KEY));
+ assertNull("Filtered headers should not have been copied",
+ exchange.getProperty(FILTERED_KEY));
+ }
+
public void testIsSerializable() throws Exception {
assertTrue("A String is serializable", binding.isSerializable("test"));
assertFalse("JbiBinding is not serializable", binding.isSerializable(binding));
assertFalse("Maps can contain non-serializable data", binding.isSerializable(new HashMap()));
assertFalse("Collections can contain non-serializable data", binding.isSerializable(new ArrayList()));
}
+
+ private class MyHeaderFilterStrategy implements HeaderFilterStrategy {
+
+ public boolean applyFilterToCamelHeaders(String headerName, Object headerValue, Exchange exchange) {
+ return headerName.equals(FILTERED_KEY);
+ }
+
+ public boolean applyFilterToExternalHeaders(String headerName, Object headerValue, Exchange exchange) {
+ return false;
+ }
+
+ }
}
Modified: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOutPipelineTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOutPipelineTest.java?rev=822227&r1=822226&r2=822227&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOutPipelineTest.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOutPipelineTest.java Tue Oct 6 11:49:52 2009
@@ -17,6 +17,7 @@
package org.apache.servicemix.camel;
import java.util.List;
+import javax.naming.Context;
import javax.activation.DataHandler;
import javax.activation.FileDataSource;
@@ -25,20 +26,28 @@
import javax.jbi.messaging.NormalizedMessage;
import javax.xml.namespace.QName;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.util.jndi.JndiContext;
+
import org.apache.servicemix.client.DefaultServiceMixClient;
import org.apache.servicemix.client.ServiceMixClient;
import org.apache.servicemix.jbi.container.ActivationSpec;
import org.apache.servicemix.jbi.jaxp.StringSource;
+
/**
*
*/
public class JbiInOutPipelineTest extends JbiTestSupport {
private static final String MESSAGE = "<just><a>test</a></just>";
+ private MyHeaderFilterStrategy myFilterStrategy = new MyHeaderFilterStrategy();
private static final String HEADER_ORIGINAL = "original";
private static final String HEADER_TRANSFORMER = "transformer";
@@ -64,6 +73,40 @@
assertNotNull(normalizedMessage.getProperty(HEADER_TRANSFORMER));
Thread.sleep(1000);
}
+
+ public void testPipelineWithMessageProviderHeaderFiltering() throws Exception {
+ ServiceMixClient client = new DefaultServiceMixClient(jbiContainer);
+ InOut exchange = client.createInOutExchange();
+
+ // Test providerEndpoint filterStrategy
+ exchange.setService(new QName("urn:test", "filterProvider"));
+ exchange.getInMessage().setContent(new StringSource(MESSAGE));
+ exchange.getInMessage().setProperty(HEADER_ORIGINAL, "my-original-header-value");
+ client.send(exchange);
+ assertNotNull("Expecting to receive a DONE/ERROR MessageExchange", client.receive(10000));
+ client.done(exchange);
+ assertEquals(ExchangeStatus.DONE, exchange.getStatus());
+ assertNull(exchange.getOutMessage().getProperty(HEADER_TRANSFORMER));
+ Thread.sleep(1000);
+ }
+
+ public void testPipelineWithMessageConsumerHeaderFiltering() throws Exception {
+
+ ServiceMixClient client = new DefaultServiceMixClient(jbiContainer);
+ InOut exchange = client.createInOutExchange();
+
+ // Test consumerEndpoint filterStrategy
+ exchange.setService(new QName("urn:test", "filterConsumer"));
+ exchange.getInMessage().setContent(new StringSource(MESSAGE));
+ exchange.getInMessage().setProperty(HEADER_ORIGINAL, "my-original-header-value");
+ client.send(exchange);
+ assertNotNull("Expecting to receive a DONE/ERROR MessageExchange", client.receive(180000));
+ client.done(exchange);
+ assertEquals(ExchangeStatus.DONE, exchange.getStatus());
+ assertNull(exchange.getOutMessage().getProperty(HEADER_TRANSFORMER));
+ Thread.sleep(1000);
+ }
+
@Override
protected void appendJbiActivationSpecs(List<ActivationSpec> activationSpecList) {
@@ -111,7 +154,52 @@
}
});
+
+ from("jbi:service:urn:test:filterProvider?headerFilterStrategy=#myFilterStrategy")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ // do nothing here , just walk around the issue of CAMEL-1955
+ }
+ })
+ .to("jbi:service:urn:test:addAttachments?mep=in-out")
+ .to("jbi:service:urn:test:transformer?mep=in-out");
+
+
+ from("jbi:service:urn:test:filterConsumer")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ // do nothing here , just walk around the issue of CAMEL-1955
+ }
+ })
+ .to("jbi:service:urn:test:addAttachments?mep=in-out")
+ .to("jbi:service:urn:test:transformer?mep=in-out&headerFilterStrategy=#myFilterStrategy");
+
}
};
}
-}
\ No newline at end of file
+
+ @Override
+ protected CamelContext createCamelContext() {
+ try {
+ JndiContext context = new JndiContext();
+ context.bind("myFilterStrategy", myFilterStrategy);
+ JndiRegistry registry = new JndiRegistry(context);
+ return new DefaultCamelContext(registry);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ return null;
+ }
+
+ public class MyHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+
+ public MyHeaderFilterStrategy() {
+ initialize();
+ }
+
+ protected void initialize() {
+ getOutFilter().add(HEADER_TRANSFORMER);
+ }
+ }
+
+}