You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/06/12 18:51:54 UTC
svn commit: r784190 - in
/servicemix/components/engines/servicemix-camel/trunk/src:
main/java/org/apache/servicemix/camel/ test/java/org/apache/servicemix/camel/
Author: gertv
Date: Fri Jun 12 16:51:54 2009
New Revision: 784190
URL: http://svn.apache.org/viewvc?rev=784190&view=rev
Log:
SMXCOMP-567: ConcurrentModificationException when running servicemix-camel routes under high load
Added:
servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java (with props)
servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java (with props)
Modified:
servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiMessage.java
servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiMessageTest.java
Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiMessage.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiMessage.java?rev=784190&r1=784189&r2=784190&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiMessage.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiMessage.java Fri Jun 12 16:51:54 2009
@@ -16,9 +16,6 @@
*/
package org.apache.servicemix.camel;
-import java.util.Iterator;
-import java.util.Map;
-
import javax.activation.DataHandler;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
@@ -45,7 +42,7 @@
@Override
public String toString() {
if (normalizedMessage != null) {
- return "JbiMessage: " + normalizedMessage;
+ return "JbiMessage: " + toString(normalizedMessage);
} else {
return "JbiMessage: " + getBody();
}
@@ -65,7 +62,7 @@
return normalizedMessage;
}
- public void setNormalizedMessage(NormalizedMessage normalizedMessage) {
+ protected void setNormalizedMessage(NormalizedMessage normalizedMessage) {
this.normalizedMessage = normalizedMessage;
}
@@ -85,9 +82,8 @@
public void setHeader(String name , Object value) {
if (normalizedMessage != null) {
normalizedMessage.setProperty(name, value);
- } else {
- super.setHeader(name, value);
}
+ super.setHeader(name, value);
}
@Override
@@ -127,30 +123,6 @@
return null;
}
- @Override
- protected void populateInitialHeaders(Map<String, Object> map) {
- if (normalizedMessage != null) {
- Iterator iter = normalizedMessage.getPropertyNames().iterator();
- while (iter.hasNext()) {
- String name = iter.next().toString();
- Object value = normalizedMessage.getProperty(name);
- map.put(name, value);
- }
- }
- }
-
- @Override
- protected void populateInitialAttachments(Map<String, DataHandler> map) {
- if (normalizedMessage != null) {
- Iterator iter = normalizedMessage.getAttachmentNames().iterator();
- while (iter.hasNext()) {
- String id = iter.next().toString();
- DataHandler content = normalizedMessage.getAttachment(id);
- map.put(id, content);
- }
- }
- }
-
// @Override
public void setBody(Object body) {
if (normalizedMessage != null) {
@@ -165,4 +137,13 @@
}
super.setBody(body);
}
+
+ /*
+ * Avoid use of normalizedMessage.toString() because it may iterate over the NormalizedMessage headers
+ * after it has been sent through the NMR
+ */
+ private String toString(NormalizedMessage message) {
+ return String.format("NormalizedMessage@%s(%s)",
+ Integer.toHexString(message.hashCode()), message.getContent());
+ }
}
Added: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java?rev=784190&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java (added)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java Fri Jun 12 16:51:54 2009
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.camel;
+
+import java.util.List;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+
+/**
+ * Test to make sure that a Camel Pipeline is capable of preserving JBI headers
+ */
+public class JbiInOnlyPropertiesPipelineTest extends JbiTestSupport {
+
+ private static final String MESSAGE = "<just><a>test</a></just>";
+
+ private static final String HEADER_ORIGINAL = "original";
+ private static final String HEADER_TRANSFORMER = "transformer";
+
+ public void testPipelinePreservesMessageHeaders() throws Exception {
+ MockEndpoint output = getMockEndpoint("mock:output");
+ output.expectedBodiesReceived(MESSAGE);
+
+ ServiceMixClient client = new DefaultServiceMixClient(jbiContainer);
+ InOnly exchange = client.createInOnlyExchange();
+ exchange.setService(new QName("urn:test", "input"));
+ exchange.getInMessage().setContent(new StringSource(MESSAGE));
+ exchange.getInMessage().setProperty(HEADER_ORIGINAL, "my-original-header-value");
+ client.send(exchange);
+ client.receive(1000);
+ assertEquals(ExchangeStatus.DONE, exchange.getStatus());
+
+ output.assertIsSatisfied();
+ JbiExchange result = (JbiExchange) output.getExchanges().get(0);
+
+ NormalizedMessage normalizedMessage = result.getInMessage();
+ assertNotNull(normalizedMessage.getProperty(HEADER_ORIGINAL));
+ assertNotNull(normalizedMessage.getProperty(HEADER_TRANSFORMER));
+ }
+
+ @Override
+ protected void appendJbiActivationSpecs(List<ActivationSpec> activationSpecList) {
+ // no additional activation specs required
+ }
+
+ @Override
+ protected RouteBuilder createRoutes() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ from("jbi:service:urn:test:input")
+ .to("jbi:service:urn:test:transformer?mep=in-out")
+ .to("jbi:service:urn:test:output");
+
+ from("jbi:service:urn:test:transformer").process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ // let's copy everything
+ exchange.getOut().copyFrom(exchange.getIn());
+ // check the headers and add another one
+ assertNotNull(exchange.getIn().getHeader(HEADER_ORIGINAL));
+ exchange.getOut().setHeader(HEADER_TRANSFORMER, "my-transformer-header-value");
+ }
+ });
+
+ from("jbi:service:urn:test:output").to("mock:output");
+ }
+ };
+ }
+}
Propchange: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java?rev=784190&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java (added)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java Fri Jun 12 16:51:54 2009
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.camel;
+
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.servicemix.jbi.helper.MessageExchangePattern;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.messaging.NormalizedMessageImpl;
+import org.apache.servicemix.tck.mock.MockMessageExchange;
+
+/**
+ * Test cases for making sure that JbiMessage behaves properly when accessed concurrently
+ */
+public class JbiMessageConcurrencyTest extends TestCase {
+
+ private static final String MESSAGE = "<just><a>test</a></just>";
+ private static final int COUNT = 500;
+
+ public void testRunMessageCopyFromConcurrently() throws Exception {
+ MessageExchange exchange = new MockMessageExchange() {
+ @Override
+ public URI getPattern() {
+ return MessageExchangePattern.IN_OUT;
+ }
+ @Override
+ public NormalizedMessage createMessage() throws MessagingException {
+ // let's take a 'real' NormalizedMessageImpl to reproduce the toString() behavior
+ return new NormalizedMessageImpl();
+ }
+ };
+ exchange.setMessage(exchange.createMessage(), "in");
+ exchange.getMessage("in").setContent(new StringSource(MESSAGE));
+
+ final JbiExchange camelExchange = new JbiExchange(new DefaultCamelContext(), new JbiBinding(), exchange);
+ ExecutorService executor = Executors.newFixedThreadPool(50);
+ final CountDownLatch latch = new CountDownLatch(COUNT);
+ final List<Exception> exceptions = new LinkedList<Exception>();
+ for (int i = 0; i < COUNT; i++) {
+ final int count = i;
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ // let's set headers
+ camelExchange.getIn().setHeader("@" + count, "ok");
+ camelExchange.getOut().setHeader("@" + count, "ok");
+
+ // and access them as well
+ assertNotNull(camelExchange.getIn().toString());
+ assertNotNull(camelExchange.getOut().toString());
+ } catch (Exception e) {
+ exceptions.add(e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ }
+
+ // give the threads some time
+ latch.await(3, TimeUnit.SECONDS);
+ for (Exception e : exceptions) {
+ e.printStackTrace();
+ }
+ assertEquals("Should not thrown any exceptions due to concurrent access", 0, exceptions.size());
+ }
+}
Propchange: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiMessageTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiMessageTest.java?rev=784190&r1=784189&r2=784190&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiMessageTest.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiMessageTest.java Fri Jun 12 16:51:54 2009
@@ -35,6 +35,11 @@
private static final DataHandler ATTACHMENT = new DataHandler(new FileDataSource("attachment.png"));
private static final DataHandler ANOTHER_ATTACHMENT = new DataHandler(new FileDataSource("attachment.png"));
+ private static final String HEADER = "header";
+ private static final String ANOTHER_HEADER = "another-header";
+ private static final String VALUE = "header's value";
+ private static final String ANOTHER_VALUE = "another header's value";
+
public void testAttachmentsWithNormalizedMessage() throws Exception {
NormalizedMessage jbiMessage = new MockNormalizedMessage();
jbiMessage.addAttachment(ATTACHMENT_ID, ATTACHMENT);
@@ -52,6 +57,23 @@
assertSame(ANOTHER_ATTACHMENT, camelMessage.getAttachment(ANOTHER_ATTACHMENT_ID));
}
+ public void testHeadersWithNormalizedMessage() throws Exception {
+ NormalizedMessage jbiMessage = new MockNormalizedMessage();
+ jbiMessage.setProperty(HEADER, VALUE);
+
+ // make sure the Camel Message also has the header
+ JbiMessage camelMessage = new JbiMessage(jbiMessage);
+ assertSame(VALUE, camelMessage.getHeader(HEADER));
+
+ // and ensure the headers are propagated back to the underlying NormalizedMessage
+ camelMessage.setHeader(ANOTHER_HEADER, ANOTHER_VALUE);
+ assertSame(ANOTHER_VALUE, jbiMessage.getProperty(ANOTHER_HEADER));
+
+ // and they should also be on the Camel Message itself
+ camelMessage.setNormalizedMessage(null);
+ assertSame(ANOTHER_VALUE, camelMessage.getHeader(ANOTHER_HEADER));
+ }
+
public void testCopyMessage() throws Exception {
JbiMessage message = new JbiMessage();
Message copy = message.copy();