You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/04/20 15:33:14 UTC
svn commit: r766688 - in
/servicemix/components/engines/servicemix-camel/trunk/src:
main/java/org/apache/servicemix/camel/ test/java/org/apache/servicemix/camel/
Author: gertv
Date: Mon Apr 20 13:33:14 2009
New Revision: 766688
URL: http://svn.apache.org/viewvc?rev=766688&view=rev
Log:
SMXCOMP-18: Camel component crashes when using JMS flow (NotSerializableException)
Added:
servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiSerializableMessageExchangeTest.java (with props)
Modified:
servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java
servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java
Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java?rev=766688&r1=766687&r2=766688&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java Mon Apr 20 13:33:14 2009
@@ -16,23 +16,25 @@
*/
package org.apache.servicemix.camel;
-import java.util.Set;
import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
-import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.NormalizedMessage;
import javax.xml.namespace.QName;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
import org.apache.servicemix.common.util.URIResolver;
import org.apache.servicemix.id.IdGenerator;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.AsyncCallback;
/**
* A consumer endpoint that will be used to send JBI exchanges
@@ -45,6 +47,9 @@
private JbiBinding binding;
private JbiEndpoint jbiEndpoint;
+
+ private Map<String, AsyncCallback> callbacks = new ConcurrentHashMap<String, AsyncCallback>();
+ private Map<String, Exchange> exchanges = new ConcurrentHashMap<String, Exchange>();
public CamelConsumerEndpoint(JbiBinding binding, JbiEndpoint jbiEndpoint) {
setService(SERVICE_NAME);
@@ -54,24 +59,23 @@
}
public void process(MessageExchange messageExchange) throws Exception {
- Exchange exchange = (Exchange) messageExchange.getProperty(Exchange.class.getName());
- AsyncCallback asyncCallback =(AsyncCallback) messageExchange.getProperty(AsyncCallback.class.getName());
- if (messageExchange.getStatus() == ExchangeStatus.ERROR) {
- exchange.setException(messageExchange.getError());
- } else if (messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
- addHeaders(messageExchange, exchange);
- if (messageExchange.getFault() != null) {
- exchange.getFault().setBody(messageExchange.getFault().getContent());
- addHeaders(messageExchange.getFault(), exchange.getFault());
- addAttachments(messageExchange.getFault(), exchange.getFault());
- } else if (messageExchange.getMessage("out") != null) {
- exchange.getOut().setBody(messageExchange.getMessage("out").getContent());
- addHeaders(messageExchange.getMessage("out"), exchange.getOut());
- addAttachments(messageExchange.getMessage("out"), exchange.getOut());
+ Exchange exchange = exchanges.remove(messageExchange.getExchangeId());
+ if (exchange == null) {
+ String message = String.format("Unable to find Camel Exchange for JBI MessageExchange %s", messageExchange.getExchangeId());
+ logger.warn(message);
+ if (messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
+ fail(messageExchange, new JbiException(message));
}
- done(messageExchange);
+ } else {
+ processReponse(messageExchange, exchange);
+ }
+
+ AsyncCallback asyncCallback = callbacks.remove(messageExchange.getExchangeId());
+ if (asyncCallback == null) {
+ logger.warn(String.format("Unable to find Camel AsyncCallback for JBI MessageExchange %s", messageExchange.getExchangeId()));
+ } else {
+ asyncCallback.done(false);
}
- asyncCallback.done(false);
}
public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
@@ -83,8 +87,8 @@
}
URIResolver.configureExchange(messageExchange, getContext(), jbiEndpoint.getDestinationUri());
- messageExchange.setProperty(Exchange.class.getName(), exchange);
- messageExchange.setProperty(AsyncCallback.class.getName(), asyncCallback);
+ exchanges.put(messageExchange.getExchangeId(), exchange);
+ callbacks.put(messageExchange.getExchangeId(), asyncCallback);
send(messageExchange);
return false;
@@ -107,21 +111,7 @@
sendSync(messageExchange);
- if (messageExchange.getStatus() == ExchangeStatus.ERROR) {
- exchange.setException(messageExchange.getError());
- } else if (messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
- addHeaders(messageExchange, exchange);
- if (messageExchange.getFault() != null) {
- exchange.getFault().setBody(messageExchange.getFault().getContent());
- addHeaders(messageExchange.getFault(), exchange.getFault());
- addAttachments(messageExchange.getFault(), exchange.getFault());
- } else if (messageExchange.getMessage("out") != null) {
- exchange.getOut().setBody(messageExchange.getMessage("out").getContent());
- addHeaders(messageExchange.getMessage("out"), exchange.getOut());
- addAttachments(messageExchange.getMessage("out"), exchange.getOut());
- }
- done(messageExchange);
- }
+ processReponse(messageExchange, exchange);
} catch (MessagingException e) {
exchange.setException(e);
@@ -131,6 +121,24 @@
throw new JbiException(e);
}
}
+
+ private void processReponse(MessageExchange messageExchange, Exchange exchange) throws MessagingException {
+ if (messageExchange.getStatus() == ExchangeStatus.ERROR) {
+ exchange.setException(messageExchange.getError());
+ } else if (messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
+ addHeaders(messageExchange, exchange);
+ if (messageExchange.getFault() != null) {
+ exchange.getFault().setBody(messageExchange.getFault().getContent());
+ addHeaders(messageExchange.getFault(), exchange.getFault());
+ addAttachments(messageExchange.getFault(), exchange.getFault());
+ } else if (messageExchange.getMessage("out") != null) {
+ exchange.getOut().setBody(messageExchange.getMessage("out").getContent());
+ addHeaders(messageExchange.getMessage("out"), exchange.getOut());
+ addAttachments(messageExchange.getMessage("out"), exchange.getOut());
+ }
+ done(messageExchange);
+ }
+ }
@Override
public void validate() throws DeploymentException {
Added: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiSerializableMessageExchangeTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiSerializableMessageExchangeTest.java?rev=766688&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiSerializableMessageExchangeTest.java (added)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiSerializableMessageExchangeTest.java Mon Apr 20 13:33:14 2009
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.camel;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.xml.namespace.QName;
+
+import junit.framework.AssertionFailedError;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.jaxp.StringSource;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.management.InstrumentationLifecycleStrategy;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+/**
+ * Tests on handling JBI InOnly exchanges by Camel
+ */
+public class JbiSerializableMessageExchangeTest extends JbiTestSupport {
+
+ private static final String MESSAGE = "<just><a>test</a></just>";
+
+ public void testInOnlyExchangeConvertBody() throws Exception {
+ MockEndpoint done = getMockEndpoint("mock:done");
+ done.expectedBodiesReceived(MESSAGE);
+
+ client.sendBody("seda:in-only", MESSAGE);
+
+ done.assertIsSatisfied();
+
+
+ }
+
+
+ private void assertSerializable(Object object) throws IOException {
+ ObjectOutputStream stream = new ObjectOutputStream(new ByteArrayOutputStream());
+ stream.writeObject(object);
+ }
+
+ protected void appendJbiActivationSpecs(List<ActivationSpec> activationSpecList) {
+ // no additional activation specs required
+ activationSpecList.add(createActivationSpec(new ReceiverComponent() {
+ @Override
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ try {
+ assertSerializable(exchange);
+ super.onMessageExchange(exchange);
+ } catch (IOException e) {
+ fail(exchange, e);
+ }
+ }
+ }, new QName("urn:test", "receiver")));
+ }
+
+ protected CamelContext createCamelContext() {
+ DefaultCamelContext context = new DefaultCamelContext();
+ context.setLifecycleStrategy(new InstrumentationLifecycleStrategy());
+ return context;
+ }
+
+ @Override
+ protected RouteBuilder createRoutes() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("log:error?showHeaders=true").maximumRedeliveries(0));
+ from("seda:in-only").to("jbi:service:urn:test:receiver").to("mock:done");
+ }
+
+ };
+ }
+}
Propchange: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiSerializableMessageExchangeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java?rev=766688&r1=766687&r2=766688&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java Mon Apr 20 13:33:14 2009
@@ -50,7 +50,7 @@
protected Exchange receivedExchange;
- protected CamelContext camelContext = new DefaultCamelContext();
+ protected CamelContext camelContext;
protected SpringJBIContainer jbiContainer = new SpringJBIContainer();
@@ -62,7 +62,7 @@
protected String startEndpointUri = "jbi:endpoint:serviceNamespace:serviceA:endpointA";
- protected ProducerTemplate<Exchange> client = camelContext.createProducerTemplate();
+ protected ProducerTemplate<Exchange> client;
protected ServiceMixClient servicemixClient;
@@ -115,6 +115,11 @@
@Override
protected void setUp() throws Exception {
+ if (camelContext == null) {
+ camelContext = createCamelContext();
+ client = camelContext.createProducerTemplate();
+ }
+
configureContainer(jbiContainer);
List<ActivationSpec> activationSpecList = new ArrayList<ActivationSpec>();
@@ -144,6 +149,10 @@
camelContext.start();
}
+ protected CamelContext createCamelContext() {
+ return new DefaultCamelContext();
+ }
+
protected void configureComponent(CamelJbiComponent component) throws Exception {
// add the ServiceMix Camel component to the CamelContext
camelContext.addComponent("jbi", new JbiComponent(component));