You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/08/06 10:48:23 UTC

svn commit: r801541 - in /camel/sandbox/tuning-experiment/camel-core/src: main/java/org/apache/camel/component/bean/ main/java/org/apache/camel/component/mock/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/processor/ test/java/org/apache/...

Author: davsclaus
Date: Thu Aug  6 08:48:22 2009
New Revision: 801541

URL: http://svn.apache.org/viewvc?rev=801541&view=rev
Log:
Pipeline is no longer copying exchanges anynore.

Added:
    camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerMutateMessageTest.java   (with props)
Modified:
    camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/component/bean/BeanInfo.java
    camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/impl/CopyOnWriteMessageFacade.java
    camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java
    camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java
    camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
    camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/AOPAfterFinallyTest.java
    camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
    camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java
    camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java

Modified: camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/component/bean/BeanInfo.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/component/bean/BeanInfo.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/component/bean/BeanInfo.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/component/bean/BeanInfo.java Thu Aug  6 08:48:22 2009
@@ -39,7 +39,6 @@
 import org.apache.camel.OutHeaders;
 import org.apache.camel.Properties;
 import org.apache.camel.Property;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.ExpressionBuilder;
 import org.apache.camel.language.LanguageAnnotation;
 import org.apache.camel.spi.Registry;

Modified: camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Thu Aug  6 08:48:22 2009
@@ -42,11 +42,13 @@
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.BrowsableEndpoint;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ExpressionComparator;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -809,8 +811,19 @@
 
     @SuppressWarnings("unchecked")
     protected void performAssertions(Exchange exchange) throws Exception {
-        Message in = exchange.getIn();
-        Object actualBody = in.getBody();
+        // must use a copy of the in message
+        Exchange copy = new DefaultExchange(exchange);
+        copy.setExchangeId(exchange.getExchangeId());
+
+        if (exchange.hasProperties()) {
+            copy.getProperties().putAll(exchange.getProperties());
+        }
+        if (exchange.hasOut()) {
+            copy.getOut().copyFrom(exchange.getOut());
+        }
+        copy.getIn().copyFrom(exchange.getIn());
+
+        Message in = copy.getIn();
 
         if (headerName != null) {
             actualHeader = in.getHeader(headerName);
@@ -820,6 +833,7 @@
             actualProperty = exchange.getProperty(propertyName);
         }
 
+        Object actualBody = in.getBody();
         if (expectedBodyValues != null) {
             int index = actualBodyValues.size();
             if (expectedBodyValues.size() > index) {
@@ -833,10 +847,10 @@
 
         ++counter;
         if (LOG.isDebugEnabled()) {
-            LOG.debug(getEndpointUri() + " >>>> " + counter + " : " + exchange + " with body: " + actualBody);
+            LOG.debug(getEndpointUri() + " >>>> " + counter + " : " + copy + " with body: " + actualBody);
         }
 
-        receivedExchanges.add(exchange);
+        receivedExchanges.add(copy);
 
         Processor processor = processors.get(getReceivedCounter()) != null
                 ? processors.get(getReceivedCounter()) : defaultProcessor;

Modified: camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/impl/CopyOnWriteMessageFacade.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/impl/CopyOnWriteMessageFacade.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/impl/CopyOnWriteMessageFacade.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/impl/CopyOnWriteMessageFacade.java Thu Aug  6 08:48:22 2009
@@ -45,6 +45,9 @@
     protected Message ensureCopied() {
         if (copiedMessage == null) {
             copiedMessage = readOnly.copy();
+            if (copiedMessage instanceof MessageSupport) {
+                ((MessageSupport) copiedMessage).setExchange(readOnly.getExchange());
+            }
         }
         return copiedMessage;
     }

Modified: camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Thu Aug  6 08:48:22 2009
@@ -30,6 +30,7 @@
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.LRUCache;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
@@ -168,6 +169,14 @@
                     LOG.debug(">>>> " + endpoint + " " + exchange);
                 }
                 producer.process(exchange);
+
+                // if we are out capable then ensure we set the result as out
+                if (!exchange.isFailed() && ExchangeHelper.isOutCapable(exchange) && !exchange.hasOut()) {
+                    // TODO: Should we set OUT even if it failed?
+                    if (exchange.getPattern() != ExchangePattern.InOptionalOut && exchange.getIn().getBody() != null) {
+                        exchange.setOut(exchange.getIn());
+                    }
+                }
                 return exchange;
             }
         });

Modified: camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Thu Aug  6 08:48:22 2009
@@ -124,10 +124,27 @@
      * <p/>
      * Remember to copy the original exchange id otherwise correlation of ids in the log is a problem
      *
-     * @param previousExchange the previous exchange
+     * @param exchange the previous exchange
      * @return a new exchange
      */
-    protected Exchange createNextExchange(Exchange previousExchange) {
+    protected Exchange createNextExchange(Exchange exchange) {
+        Message target = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+
+        CopyOnWriteMessageFacade facade;
+        if (target instanceof CopyOnWriteMessageFacade) {
+            // already a facade so just use it
+            facade = (CopyOnWriteMessageFacade) target;
+        } else {
+            // wrap message using a facade
+            facade = new CopyOnWriteMessageFacade(target);
+        }
+        exchange.setIn(facade);
+        exchange.setOut(null);
+
+        return exchange;
+    }
+
+    protected Exchange oldCreateNextExchange(Exchange previousExchange) {
         Exchange answer = new DefaultExchange(previousExchange);
         // we must use the same id as this is a snapshot strategy where Camel copies a snapshot
         // before processing the next step in the pipeline, so we have a snapshot of the exchange

Modified: camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java Thu Aug  6 08:48:22 2009
@@ -31,6 +31,11 @@
 
     private static final Log LOG = LogFactory.getLog(FileConcurrentAggregateBatchConsumerTest.class);
 
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
     public void testProcessFilesConcurrently() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
@@ -42,6 +47,7 @@
                     .aggregate(header("country"), new MyBusinessTotal()).batchSizeFromConsumer().batchTimeout(60000).to("mock:result");
             }
         });
+        context.start();
 
         long start = System.currentTimeMillis();
 
@@ -64,6 +70,7 @@
                     .aggregate(header("country"), new MyBusinessTotal()).batchSizeFromConsumer().batchTimeout(60000).to("mock:result");
             }
         });
+        context.start();
 
         long start = System.currentTimeMillis();
 

Modified: camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java Thu Aug  6 08:48:22 2009
@@ -44,6 +44,11 @@
     private static final Log LOG = LogFactory.getLog(FileConcurrentTest.class);
 
     @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Override
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry jndi = super.createRegistry();
         jndi.bind("business", new MyBusinessBean());
@@ -71,6 +76,7 @@
                     .aggregate(header("country"), new MyBusinessTotal()).batchSize(10).batchTimeout(60000).to("mock:result");
             }
         });
+        context.start();
 
         long start = System.currentTimeMillis();
 
@@ -93,6 +99,7 @@
                     .aggregate(header("country"), new MyBusinessTotal()).batchSize(10).batchTimeout(60000).to("mock:result");
             }
         });
+        context.start();
 
         long start = System.currentTimeMillis();
 

Modified: camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java Thu Aug  6 08:48:22 2009
@@ -56,7 +56,7 @@
         Exchange result = future.get();
 
         long delta = System.currentTimeMillis() - start;
-        assertEquals("Hello World", result.getOut().getBody());
+        assertEquals("Hello World", result.getIn().getBody());
         assertTrue("Should take longer than: " + delta, delta > 250);
 
         assertMockEndpointsSatisfied();
@@ -77,7 +77,7 @@
         Exchange result = future.get();
 
         long delta = System.currentTimeMillis() - start;
-        assertEquals("Hello World", result.getOut().getBody());
+        assertEquals("Hello World", result.getIn().getBody());
         assertTrue("Should take longer than: " + delta, delta > 250);
     }
 

Modified: camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/AOPAfterFinallyTest.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/AOPAfterFinallyTest.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/AOPAfterFinallyTest.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/AOPAfterFinallyTest.java Thu Aug  6 08:48:22 2009
@@ -27,7 +27,7 @@
 public class AOPAfterFinallyTest extends ContextTestSupport {
 
     public void testAOPAfterFinally() throws Exception {
-        getMockEndpoint("mock:after").message(0).outBody().isEqualTo("Bye World");
+        getMockEndpoint("mock:after").message(0).body().isEqualTo("Bye World");
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Bye World");
 
@@ -38,7 +38,7 @@
     }
 
     public void testAOPAfterFinallyWithException() throws Exception {
-        getMockEndpoint("mock:after").message(0).outBody().isEqualTo("Kabom the World");
+        getMockEndpoint("mock:after").message(0).body().isEqualTo("Kabom the World");
 
         try {
             template.requestBody("direct:start", "Kabom", String.class);

Added: camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerMutateMessageTest.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerMutateMessageTest.java?rev=801541&view=auto
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerMutateMessageTest.java (added)
+++ camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerMutateMessageTest.java Thu Aug  6 08:48:22 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultErrorHandlerMutateMessageTest extends ContextTestSupport {
+
+    private static int counter;
+
+    public void testMessageIsMutatedWhenRedelivering() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hellox");
+
+        template.sendBody("direct:start", "Hello");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(defaultErrorHandler().maximumRedeliveries(5).redeliverDelay(0));
+
+                from("direct:start")
+                        .to("mock:before")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                String body = exchange.getIn().getBody(String.class);
+                                exchange.getOut().setBody(body + "x");
+                                if (counter++ < 2) {
+                                    throw new IllegalArgumentException("Damn");
+                                }
+                            }
+                        }).to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerMutateMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerMutateMessageTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java Thu Aug  6 08:48:22 2009
@@ -69,12 +69,21 @@
             public void configure() {
                 ThreadPoolExecutor tpExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
                 // START SNIPPET: example
+
                 // The message will be sent parallelly to the endpoints
                 from("direct:parallel")
                     .multicast(new BodyOutAggregatingStrategy(), true).executorService(tpExecutor)
-                        .to("direct:x", "direct:y", "direct:z");
+                        .to("direct:x", "direct:y", "direct:z")
+                    .end()
+                    .to("mock:result");
+
                 // Multicast the message in a sequential way
-                from("direct:sequential").multicast(new BodyOutAggregatingStrategy()).to("direct:x", "direct:y", "direct:z");
+                from("direct:sequential")
+                    .multicast(new BodyOutAggregatingStrategy())
+                        .to("direct:x", "direct:y", "direct:z")
+                    .end()
+                    .to("mock:result");
+
 
                 from("direct:x").process(new AppendingProcessor("x")).to("direct:aggregator");
                 from("direct:y").process(new AppendingProcessor("y")).to("direct:aggregator");

Modified: camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java Thu Aug  6 08:48:22 2009
@@ -71,8 +71,9 @@
         assertIsInstanceOf(RollbackExchangeException.class, out.getException());
         assertEquals("Should be marked as rollback", true, out.isRollbackOnly());
         // should not try to redeliver if exchange was marked as rollback only
-        assertEquals(0, out.getOut().getHeader(Exchange.REDELIVERY_COUNTER));
-        assertEquals(false, out.getOut().getHeader(Exchange.REDELIVERED));
+        // TODO: ProducerCache should we set OUT anyway in case of failure?
+        // assertEquals(0, out.getOut().getHeader(Exchange.REDELIVERY_COUNTER));
+        // assertEquals(false, out.getOut().getHeader(Exchange.REDELIVERED));
     }
 
     @Override

Modified: camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java
URL: http://svn.apache.org/viewvc/camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java?rev=801541&r1=801540&r2=801541&view=diff
==============================================================================
--- camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java (original)
+++ camel/sandbox/tuning-experiment/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java Thu Aug  6 08:48:22 2009
@@ -94,8 +94,7 @@
             }
         });
         assertEquals("bar", exchange.getIn().getHeader("foo"));
-        assertEquals("test:blah", exchange.getIn().getBody());
-        assertFalse(exchange.hasOut());
+        assertEquals("test:blah", exchange.getOut().getBody());
         assertNull(exchange.getException());
     }