You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/06/12 18:52:08 UTC

svn commit: r784191 - in /servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src: main/java/org/apache/servicemix/camel/ test/java/org/apache/servicemix/camel/

Author: gertv
Date: Fri Jun 12 16:52:08 2009
New Revision: 784191

URL: http://svn.apache.org/viewvc?rev=784191&view=rev
Log:
SMXCOMP-567: ConcurrentModificationException when running servicemix-camel routes under high load

Added:
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java   (with props)
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java   (with props)
Modified:
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiMessage.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiMessageTest.java

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiMessage.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiMessage.java?rev=784191&r1=784190&r2=784191&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiMessage.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiMessage.java Fri Jun 12 16:52:08 2009
@@ -16,9 +16,6 @@
  */
 package org.apache.servicemix.camel;
 
-import java.util.Iterator;
-import java.util.Map;
-
 import javax.activation.DataHandler;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
@@ -45,7 +42,7 @@
     @Override
     public String toString() {
         if (normalizedMessage != null) {
-            return "JbiMessage: " + normalizedMessage;
+            return "JbiMessage: " + toString(normalizedMessage);
         } else {
             return "JbiMessage: " + getBody();
         }
@@ -64,7 +61,7 @@
         return normalizedMessage;
     }
 
-    public void setNormalizedMessage(NormalizedMessage normalizedMessage) {
+    protected void setNormalizedMessage(NormalizedMessage normalizedMessage) {
         this.normalizedMessage = normalizedMessage;
     }
 
@@ -84,9 +81,8 @@
     public void setHeader(String name , Object value) {
         if (normalizedMessage != null) {
             normalizedMessage.setProperty(name, value);
-        } else {
-            super.setHeader(name, value);
         }
+        super.setHeader(name, value);
     }
 
     @Override
@@ -129,30 +125,6 @@
         return null;
     }
 
-    @Override
-    protected void populateInitialHeaders(Map<String, Object> map) {
-        if (normalizedMessage != null) {
-            Iterator iter = normalizedMessage.getPropertyNames().iterator();
-            while (iter.hasNext()) {
-                String name = iter.next().toString();
-                Object value = normalizedMessage.getProperty(name);
-                map.put(name, value);
-            }
-        }
-    }
-
-    @Override
-    protected void populateInitialAttachments(Map<String, DataHandler> map) {
-        if (normalizedMessage != null) {
-            Iterator iter = normalizedMessage.getAttachmentNames().iterator();
-            while (iter.hasNext()) {
-                String id = iter.next().toString();
-                DataHandler content = normalizedMessage.getAttachment(id);
-                map.put(id, content);
-            }
-        }
-    }
-
 //    @Override
     public void setBody(Object body) {
         if (normalizedMessage != null) {
@@ -167,4 +139,13 @@
         }
         super.setBody(body);
     }
+    
+    /*
+     * Avoid use of normalizedMessage.toString() because it may iterate over the NormalizedMessage headers
+     * after it has been sent through the NMR
+     */
+    private String toString(NormalizedMessage message) {
+        return String.format("NormalizedMessage@%s(%s)", 
+                             Integer.toHexString(message.hashCode()), message.getContent());
+    }
 }

Added: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java?rev=784191&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java Fri Jun 12 16:52:08 2009
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.camel;
+
+import java.util.List;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+
+/**
+ * Test to make sure that a Camel Pipeline is capable of preserving JBI headers 
+ */
+public class JbiInOnlyPropertiesPipelineTest extends JbiTestSupport {
+    
+    private static final String MESSAGE = "<just><a>test</a></just>";
+    
+    private static final String HEADER_ORIGINAL = "original";
+    private static final String HEADER_TRANSFORMER = "transformer";    
+
+    public void testPipelinePreservesMessageHeaders() throws Exception {
+        MockEndpoint output = getMockEndpoint("mock:output");
+        output.expectedBodiesReceived(MESSAGE);
+        
+        ServiceMixClient client = new DefaultServiceMixClient(jbiContainer);
+        InOnly exchange = client.createInOnlyExchange();
+        exchange.setService(new QName("urn:test", "input"));
+        exchange.getInMessage().setContent(new StringSource(MESSAGE));
+        exchange.getInMessage().setProperty(HEADER_ORIGINAL, "my-original-header-value");
+        client.send(exchange);
+        client.receive(1000);
+        assertEquals(ExchangeStatus.DONE, exchange.getStatus());
+        
+        output.assertIsSatisfied();
+        JbiExchange result = (JbiExchange) output.getExchanges().get(0);
+        
+        NormalizedMessage normalizedMessage = result.getInMessage();
+        assertNotNull(normalizedMessage.getProperty(HEADER_ORIGINAL));
+        assertNotNull(normalizedMessage.getProperty(HEADER_TRANSFORMER));
+    }
+
+    @Override
+    protected void appendJbiActivationSpecs(List<ActivationSpec> activationSpecList) {
+        // no additional activation specs required
+    }
+
+    @Override
+    protected RouteBuilder createRoutes() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("jbi:service:urn:test:input")
+                    .to("jbi:service:urn:test:transformer?mep=in-out")
+                    .to("jbi:service:urn:test:output");
+                
+                from("jbi:service:urn:test:transformer").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        // let's copy everything
+                        exchange.getOut().copyFrom(exchange.getIn());
+                        // check the headers and add another one
+                        assertNotNull(exchange.getIn().getHeader(HEADER_ORIGINAL));
+                        exchange.getOut().setHeader(HEADER_TRANSFORMER, "my-transformer-header-value");
+                    }                    
+                });
+                
+                from("jbi:service:urn:test:output").to("mock:output");
+            }
+        };
+    }
+}

Propchange: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOnlyPropertiesPipelineTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java?rev=784191&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java Fri Jun 12 16:52:08 2009
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.camel;
+
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.messaging.MessageExchangeSupport;
+import org.apache.servicemix.jbi.messaging.NormalizedMessageImpl;
+import org.apache.servicemix.tck.mock.MockMessageExchange;
+
+/**
+ * Test cases for making sure that JbiMessage behaves properly when accessed concurrently 
+ */
+public class JbiMessageConcurrencyTest extends TestCase {
+
+    private static final String MESSAGE = "<just><a>test</a></just>";
+    private static final int COUNT = 500;
+    
+    public void testRunMessageCopyFromConcurrently() throws Exception {
+        MessageExchange exchange = new MockMessageExchange() {
+            @Override
+            public URI getPattern() {
+                return MessageExchangeSupport.IN_OUT;
+            }
+            @Override
+            public NormalizedMessage createMessage() throws MessagingException {
+                // let's take a 'real' NormalizedMessageImpl to reproduce the toString() behavior
+                return new NormalizedMessageImpl();
+            }
+        };
+        exchange.setMessage(exchange.createMessage(), "in");
+        exchange.getMessage("in").setContent(new StringSource(MESSAGE));
+        
+        final JbiExchange camelExchange = new JbiExchange(new DefaultCamelContext(), new JbiBinding(), exchange);
+        ExecutorService executor = Executors.newFixedThreadPool(50);
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        final List<Exception> exceptions = new LinkedList<Exception>();
+        for (int i = 0; i < COUNT; i++) {
+            final int count = i;
+            executor.execute(new Runnable() {
+                public void run() {
+                    try {
+                        // let's set headers
+                        camelExchange.getIn().setHeader("@" + count, "ok");
+                        camelExchange.getOut().setHeader("@" + count, "ok");
+                        
+                        // and access them as well
+                        assertNotNull(camelExchange.getIn().toString());
+                        assertNotNull(camelExchange.getOut().toString());
+                    } catch (Exception e) {
+                        exceptions.add(e);
+                    } finally {
+                        latch.countDown();
+                    }
+                }
+            });
+        }
+        
+        // give the threads some time
+        latch.await(3, TimeUnit.SECONDS);
+        for (Exception e : exceptions) {
+            e.printStackTrace();
+        }
+        assertEquals("Should not thrown any exceptions due to concurrent access", 0, exceptions.size());
+    }
+}

Propchange: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiMessageConcurrencyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiMessageTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiMessageTest.java?rev=784191&r1=784190&r2=784191&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiMessageTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiMessageTest.java Fri Jun 12 16:52:08 2009
@@ -35,6 +35,11 @@
     private static final DataHandler ATTACHMENT = new DataHandler(new FileDataSource("attachment.png"));
     private static final DataHandler ANOTHER_ATTACHMENT = new DataHandler(new FileDataSource("attachment.png"));
     
+    private static final String HEADER = "header";
+    private static final String ANOTHER_HEADER = "another-header";
+    private static final String VALUE = "header's value";
+    private static final String ANOTHER_VALUE = "another header's value";
+    
     public void testAttachmentsWithNormalizedMessage() throws Exception {
         NormalizedMessage jbiMessage = new MockNormalizedMessage();
         jbiMessage.addAttachment(ATTACHMENT_ID, ATTACHMENT);
@@ -52,6 +57,23 @@
         assertSame(ANOTHER_ATTACHMENT, camelMessage.getAttachment(ANOTHER_ATTACHMENT_ID));
     }
     
+    public void testHeadersWithNormalizedMessage() throws Exception {
+        NormalizedMessage jbiMessage = new MockNormalizedMessage();
+        jbiMessage.setProperty(HEADER, VALUE);
+        
+        // make sure the Camel Message also has the header
+        JbiMessage camelMessage = new JbiMessage(jbiMessage);
+        assertSame(VALUE, camelMessage.getHeader(HEADER));
+        
+        // and ensure the headers are propagated back to the underlying NormalizedMessage
+        camelMessage.setHeader(ANOTHER_HEADER, ANOTHER_VALUE);
+        assertSame(ANOTHER_VALUE, jbiMessage.getProperty(ANOTHER_HEADER));
+        
+        // and they should also be on the Camel Message itself
+        camelMessage.setNormalizedMessage(null);
+        assertSame(ANOTHER_VALUE, camelMessage.getHeader(ANOTHER_HEADER));
+    }    
+    
     public void testCopyMessage() throws Exception {
         JbiMessage message = new JbiMessage();
         Message copy = message.copy();