You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/04 17:16:27 UTC

svn commit: r951440 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/model/config/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/util/ camel-c...

Author: davsclaus
Date: Fri Jun  4 15:16:25 2010
New Revision: 951440

URL: http://svn.apache.org/viewvc?rev=951440&view=rev
Log:
CAMEL-2537: Added option allowDuplicates on batch resequencer so you can resequence messages with duplicate correlated keys. Only in batch mode.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerWithDuplicateTest.java
      - copied, changed from r951396, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringBatchResequencerAllowDuplicatesTest.java
      - copied, changed from r951396, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPAfterTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml
      - copied, changed from r951396, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopafter.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/Ordered.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=951440&r1=951439&r2=951440&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java Fri Jun  4 15:16:25 2010
@@ -168,6 +168,18 @@ public class ResequenceDefinition extend
     }
 
     /**
+     * Enables duplicates for the batch resequencer mode
+     * @return the builder
+     */
+    public ResequenceDefinition allowDuplicates() {
+        if (batchConfig == null) {
+            throw new IllegalStateException("allowDuplicates() only supported for batch resequencer");
+        }
+        batchConfig.setAllowDuplicates(true);
+        return this;
+    }
+
+    /**
      * Sets the comparator to use for stream resequencer
      *
      * @param comparator  the comparator
@@ -252,7 +264,7 @@ public class ResequenceDefinition extend
             BatchResequencerConfig config) throws Exception {
 
         Processor processor = this.createChildProcessor(routeContext, true);
-        Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, resolveExpressionList(routeContext));
+        Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, resolveExpressionList(routeContext), config.getAllowDuplicates());
         resequencer.setBatchSize(config.getBatchSize());
         resequencer.setBatchTimeout(config.getBatchTimeout());
         return resequencer;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java?rev=951440&r1=951439&r2=951440&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java Fri Jun  4 15:16:25 2010
@@ -19,6 +19,7 @@ package org.apache.camel.model.config;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlEnum;
 import javax.xml.bind.annotation.XmlRootElement;
 
 /**
@@ -61,6 +62,9 @@ public class BatchResequencerConfig {
     @XmlAttribute
     private Long batchTimeout; // optional XML attribute requires wrapper object
 
+    @XmlAttribute
+    private Boolean allowDuplicates = Boolean.FALSE;
+
     /**
      * Creates a new {@link BatchResequencerConfig} instance using default
      * values for <code>batchSize</code> (100) and <code>batchTimeout</code>
@@ -110,5 +114,12 @@ public class BatchResequencerConfig {
     public void setBatchTimeout(long batchTimeout) {
         this.batchTimeout = batchTimeout;
     }
-    
+
+    public Boolean getAllowDuplicates() {
+        return allowDuplicates;
+    }
+
+    public void setAllowDuplicates(Boolean allowDuplicates) {
+        this.allowDuplicates = allowDuplicates;
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=951440&r1=951439&r2=951440&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Fri Jun  4 15:16:25 2010
@@ -39,11 +39,11 @@ public class Resequencer extends BatchPr
     // TODO: Rework to avoid using BatchProcessor
 
     public Resequencer(CamelContext camelContext, Processor processor, Expression expression) {
-        this(camelContext, processor, createSet(expression));
+        this(camelContext, processor, createSet(expression, false));
     }
 
-    public Resequencer(CamelContext camelContext, Processor processor, List<Expression> expressions) {
-        this(camelContext, processor, createSet(expressions));
+    public Resequencer(CamelContext camelContext, Processor processor, List<Expression> expressions, boolean allowDuplicates) {
+        this(camelContext, processor, createSet(expressions, allowDuplicates));
     }
 
     public Resequencer(CamelContext camelContext, Processor processor, Set<Exchange> collection) {
@@ -62,18 +62,35 @@ public class Resequencer extends BatchPr
     // Implementation methods
     //-------------------------------------------------------------------------
 
-    protected static Set<Exchange> createSet(Expression expression) {
-        return createSet(new ExpressionComparator(expression));
+    protected static Set<Exchange> createSet(Expression expression, boolean allowDuplicates) {
+        return createSet(new ExpressionComparator(expression), allowDuplicates);
     }
 
-    protected static Set<Exchange> createSet(List<Expression> expressions) {
+    protected static Set<Exchange> createSet(List<Expression> expressions, boolean allowDuplicates) {
         if (expressions.size() == 1) {
-            return createSet(expressions.get(0));
+            return createSet(expressions.get(0), allowDuplicates);
         }
-        return createSet(new ExpressionListComparator(expressions));
+        return createSet(new ExpressionListComparator(expressions), allowDuplicates);
     }
 
-    protected static Set<Exchange> createSet(Comparator<? super Exchange> comparator) {
-        return new TreeSet<Exchange>(comparator);
+    protected static Set<Exchange> createSet(final Comparator<? super Exchange> comparator, boolean allowDuplicates) {
+        Comparator<? super Exchange> comp = comparator;
+
+        // if we allow duplicates then we need to cater for that in the comparator
+        if (allowDuplicates) {
+            comp = new Comparator<Exchange>() {
+                public int compare(Exchange o1, Exchange o2) {
+                    int answer = comparator.compare(o1, o2);
+                    if (answer == 0) {
+                        // they are equal but we should allow duplicates so say that o2 is higher
+                        // so it will come next
+                        return 1;
+                    }
+                    return answer;
+                }
+            };
+        }
+        return new TreeSet<Exchange>(comp);
     }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/Ordered.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/Ordered.java?rev=951440&r1=951439&r2=951440&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/Ordered.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/Ordered.java Fri Jun  4 15:16:25 2010
@@ -38,7 +38,7 @@ public interface Ordered {
      * Gets the order.
      * <p/>
      * Use low numbers for higher priority. Normally the sorting will start from 0 and move upwards.
-     * So if you want to be last then use {@link Integer#MAX_VALUE}.
+     * So if you want to be last then use {@link Integer#MAX_VALUE} or eg {@link #LOWEST}.
      *
      * @return the order
      */

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java?rev=951440&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java Fri Jun  4 15:16:25 2010
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class BatchResequencerAllowDuplicatesTest extends ContextTestSupport {
+
+    public void testBatchResequencerAllowDuplicate() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("1A", "1B", "2C", "2D", "2E", "2F", "3G", "4H");
+
+        template.sendBodyAndHeader("direct:start", "1A", "id", "1");
+        template.sendBodyAndHeader("direct:start", "2C", "id", "2");
+        template.sendBodyAndHeader("direct:start", "2D", "id", "2");
+        template.sendBodyAndHeader("direct:start", "4H", "id", "4");
+        template.sendBodyAndHeader("direct:start", "1B", "id", "1");
+        template.sendBodyAndHeader("direct:start", "2E", "id", "2");
+        template.sendBodyAndHeader("direct:start", "3G", "id", "3");
+        template.sendBodyAndHeader("direct:start", "2F", "id", "2");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                from("direct:start")
+                    // allow duplicates which means messages with same id is retained
+                    .resequence(header("id")).allowDuplicates()
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerWithDuplicateTest.java (from r951396, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerWithDuplicateTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerWithDuplicateTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java&r1=951396&r2=951440&rev=951440&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerWithDuplicateTest.java Fri Jun  4 15:16:25 2010
@@ -16,70 +16,64 @@
  */
 package org.apache.camel.processor;
 
-import java.util.List;
-
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Route;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.EventDrivenConsumerRoute;
-import org.apache.camel.management.JmxSystemPropertyKeys;
-import org.apache.camel.processor.interceptor.StreamCaching;
 
 /**
  * @version $Revision$
  */
-public class ResequencerTest extends ContextTestSupport {
-    protected Endpoint startEndpoint;
-    protected MockEndpoint resultEndpoint;
-
-    public void testSendMessagesInWrongOrderButReceiveThemInCorrectOrder() throws Exception {
-        resultEndpoint.expectedBodiesReceived("Guillaume", "Hiram", "James", "Rob");
-        sendBodies("direct:start", "Rob", "Hiram", "Guillaume", "James");
-        resultEndpoint.assertIsSatisfied();
-    }
+public class BatchResequencerWithDuplicateTest extends ContextTestSupport {
 
     @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        resultEndpoint = getMockEndpoint("mock:result");
+    public boolean isUseRouteBuilder() {
+        return false;
     }
 
-    @Override 
-    protected void tearDown() throws Exception {
-        super.tearDown();
-        System.clearProperty(JmxSystemPropertyKeys.DISABLED);
-    }
-    
-    protected RouteBuilder createRouteBuilder() {
-        return new RouteBuilder() {
-            public void configure() {
-                // START SNIPPET: example
-                from("direct:start").resequence(body()).to("mock:result");
-                // END SNIPPET: example
+    public void testBatchResequencerAllowDuplicate() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").resequence(header("id")).allowDuplicates().to("mock:result");
             }
-        };
-    }
+        });
+        context.start();
 
-    public void testBatchResequencerTypeWithJmx() throws Exception {
-        System.setProperty(JmxSystemPropertyKeys.DISABLED, "true");
-        testBatchResequencerTypeWithoutJmx();
-    }
-
-    public void testBatchResequencerTypeWithoutJmx() throws Exception {
-        List<Route> list = getRouteList(createRouteBuilder());
-        assertEquals("Number of routes created: " + list, 1, list.size());
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("1A", "1B", "2C", "2D", "2E", "2F", "3G", "4H");
 
-        Route route = list.get(0);
-        EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+        template.sendBodyAndHeader("direct:start", "1A", "id", "1");
+        template.sendBodyAndHeader("direct:start", "2C", "id", "2");
+        template.sendBodyAndHeader("direct:start", "2D", "id", "2");
+        template.sendBodyAndHeader("direct:start", "4H", "id", "4");
+        template.sendBodyAndHeader("direct:start", "1B", "id", "1");
+        template.sendBodyAndHeader("direct:start", "2E", "id", "2");
+        template.sendBodyAndHeader("direct:start", "3G", "id", "3");
+        template.sendBodyAndHeader("direct:start", "2F", "id", "2");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testBatchResequencerNoDuplicate() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").resequence(header("id")).to("mock:result");
+            }
+        });
+        context.start();
 
-        DefaultChannel channel = assertIsInstanceOf(DefaultChannel.class, unwrapChannel(consumerRoute.getProcessor()));
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("A", "C", "E", "F");
 
-        assertIsInstanceOf(DefaultErrorHandler.class, channel.getErrorHandler());
-        assertFalse("Should not have stream caching", channel.hasInterceptorStrategy(StreamCaching.class));
+        template.sendBodyAndHeader("direct:start", "A", "id", "1");
+        template.sendBodyAndHeader("direct:start", "C", "id", "2");
+        template.sendBodyAndHeader("direct:start", "D", "id", "2");
+        template.sendBodyAndHeader("direct:start", "F", "id", "4");
+        template.sendBodyAndHeader("direct:start", "B", "id", "1");
+        template.sendBodyAndHeader("direct:start", "E", "id", "3");
 
-        assertIsInstanceOf(Resequencer.class, channel.getNextProcessor());
+        assertMockEndpointsSatisfied();
     }
 
-}
+}
\ No newline at end of file

Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringBatchResequencerAllowDuplicatesTest.java (from r951396, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPAfterTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringBatchResequencerAllowDuplicatesTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringBatchResequencerAllowDuplicatesTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPAfterTest.java&r1=951396&r2=951440&rev=951440&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPAfterTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringBatchResequencerAllowDuplicatesTest.java Fri Jun  4 15:16:25 2010
@@ -17,15 +17,16 @@
 package org.apache.camel.spring.processor;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.processor.AOPAfterTest;
+import org.apache.camel.processor.BatchResequencerAllowDuplicatesTest;
+
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
 /**
  * @version $Revision$
  */
-public class SpringAOPAfterTest extends AOPAfterTest {
+public class SpringBatchResequencerAllowDuplicatesTest extends BatchResequencerAllowDuplicatesTest {
 
     protected CamelContext createCamelContext() throws Exception {
-        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aopafter.xml");
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml");
     }
 }
\ No newline at end of file

Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml (from r951396, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopafter.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopafter.xml&r1=951396&r2=951440&rev=951440&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopafter.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml Fri Jun  4 15:16:25 2010
@@ -26,10 +26,11 @@
     <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
         <route>
             <from uri="direct:start"/>
-            <aop afterUri="mock:after">
-                <transform><constant>Bye World</constant></transform>
+            <resequence>
+                <header>id</header>
                 <to uri="mock:result"/>
-            </aop>
+                <batch-config allowDuplicates="true"/>
+            </resequence>
         </route>
     </camelContext>
     <!-- END SNIPPET: e1 -->