You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Claus Ibsen <cl...@gmail.com> on 2012/07/16 21:12:41 UTC

Re: svn commit: r1362163 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/model/config/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/proces

Hi


Its better to use an @XmlAttribute for the new option like the other
options on stream config

+    @XmlElement
+    private Boolean rejectOld;

Then the XML DSL is not as confusing and noisy, as it would just be an
attribute instead of a xml tag.

Also remember that the Scala DSL may be impacted and needs an update.
This may happen when you change the DSL.
If I remember myself then I try to run a full test of camel-scala. But
it may slip my mind sometimes, and then later i bites you :)



On Mon, Jul 16, 2012 at 7:51 PM,  <bo...@apache.org> wrote:
> Author: boday
> Date: Mon Jul 16 17:51:17 2012
> New Revision: 1362163
>
> URL: http://svn.apache.org/viewvc?rev=1362163&view=rev
> Log:
> CAMEL-4327 added "rejectOld" option to the Resequencer EIP to throw an error if older messages are received after the last delivered message
>
> Added:
>     camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>     camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>     camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>     camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
> Modified:
>     camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>     camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>     camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>     camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java Mon Jul 16 17:51:17 2012
> @@ -150,6 +150,18 @@ public class ResequenceDefinition extend
>      }
>
>      /**
> +     * Sets the rejectOld flag to throw an error when a message older than the last delivered message is processed
> +     * @return the builder
> +     */
> +    public ResequenceDefinition rejectOld() {
> +        if (streamConfig == null) {
> +            throw new IllegalStateException("rejectOld() only supported for stream resequencer");
> +        }
> +        streamConfig.setRejectOld(true);
> +        return this;
> +    }
> +
> +    /**
>       * Sets the in batch size for number of exchanges received
>       * @param batchSize  the batch size
>       * @return the builder
> @@ -368,6 +380,7 @@ public class ResequenceDefinition extend
>          StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), processor, comparator);
>          resequencer.setTimeout(config.getTimeout());
>          resequencer.setCapacity(config.getCapacity());
> +        resequencer.setRejectOld(config.getRejectOld());
>          if (config.getIgnoreInvalidExchanges() != null) {
>              resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
>          }
>
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java Mon Jul 16 17:51:17 2012
> @@ -19,6 +19,7 @@ package org.apache.camel.model.config;
>  import javax.xml.bind.annotation.XmlAccessType;
>  import javax.xml.bind.annotation.XmlAccessorType;
>  import javax.xml.bind.annotation.XmlAttribute;
> +import javax.xml.bind.annotation.XmlElement;
>  import javax.xml.bind.annotation.XmlRootElement;
>  import javax.xml.bind.annotation.XmlTransient;
>
> @@ -41,6 +42,8 @@ public class StreamResequencerConfig ext
>      private Boolean ignoreInvalidExchanges;
>      @XmlTransient
>      private ExpressionResultComparator comparator;
> +    @XmlElement
> +    private Boolean rejectOld;
>
>      /**
>       * Creates a new {@link StreamResequencerConfig} instance using default
> @@ -123,5 +126,13 @@ public class StreamResequencerConfig ext
>      public void setComparator(ExpressionResultComparator comparator) {
>          this.comparator = comparator;
>      }
> -
> +
> +    public void setRejectOld(boolean value) {
> +        this.rejectOld = value;
> +    }
> +
> +    public Boolean getRejectOld() {
> +        return rejectOld;
> +    }
> +
>  }
>
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Mon Jul 16 17:51:17 2012
> @@ -141,6 +141,10 @@ public class StreamResequencer extends S
>          return ignoreInvalidExchanges;
>      }
>
> +    public void setRejectOld(Boolean rejectOld) {
> +        engine.setRejectOld(rejectOld);
> +    }
> +
>      /**
>       * Sets whether to ignore invalid exchanges which cannot be used by this stream resequencer.
>       * <p/>
>
> Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java?rev=1362163&view=auto
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java (added)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,32 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor.resequencer;
> +
> +import org.apache.camel.RuntimeCamelException;
> +
> +/**
> + * An exception thrown if message is rejected by the resequencer
> + *
> + * @version
> + */
> +public class MessageRejectedException extends RuntimeCamelException {
> +    private static final long serialVersionUID = 5755929795399134568L;
> +
> +    public MessageRejectedException(String message) {
> +        super(message);
> +    }
> +}
> \ No newline at end of file
>
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java Mon Jul 16 17:51:17 2012
> @@ -87,6 +87,11 @@ public class ResequencerEngine<E> {
>      private SequenceSender<E> sequenceSender;
>
>      /**
> +     * Indicates whether an error should be thrown if message older (based on Comparator) than the last delivered message is received.
> +     */
> +    private Boolean rejectOld;
> +
> +    /**
>       * Creates a new resequencer instance with a default timeout of 2000
>       * milliseconds.
>       *
> @@ -136,6 +141,14 @@ public class ResequencerEngine<E> {
>          this.timeout = timeout;
>      }
>
> +    public Boolean getRejectOld() {
> +        return rejectOld;
> +    }
> +
> +    public void setRejectOld(Boolean rejectOld) {
> +        this.rejectOld = rejectOld;
> +    }
> +
>      /**
>       * Returns the sequence sender.
>       *
> @@ -209,6 +222,9 @@ public class ResequencerEngine<E> {
>              // nothing to schedule
>          } else if (sequence.predecessor(element) != null) {
>              // nothing to schedule
> +        } else if (rejectOld != null && rejectOld.booleanValue() && beforeLastDelivered(element)) {
> +            throw new MessageRejectedException("rejecting message [" + element.getObject()
> +                    + "], it should have been sent before the last delivered message [" + lastDelivered.getObject() + "]");
>          } else {
>              element.schedule(defineTimeout());
>          }
> @@ -283,6 +299,22 @@ public class ResequencerEngine<E> {
>      }
>
>      /**
> +     * Retuns <code>true</code> if the given element is before the last delivered element.
> +     *
> +     * @param element an element.
> +     * @return <code>true</code> if the given element is before the last delivered element.
> +     */
> +    private boolean beforeLastDelivered(Element<E> element) {
> +        if (lastDelivered == null) {
> +            return false;
> +        }
> +        if (sequence.comparator().compare(element, lastDelivered) < 0) {
> +            return true;
> +        }
> +        return false;
> +    }
> +
> +    /**
>       * Creates a timeout task based on the timeout setting of this resequencer.
>       *
>       * @return a new timeout task.
>
> Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
> ==============================================================================
> --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java (added)
> +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,93 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.processor.resequencer.MessageRejectedException;
> +
> +/**
> + *
> + */
> +public class ResequenceStreamRejectOldExchangesTest extends ContextTestSupport {
> +
> +    public void testInSequenceAfterTimeout() throws Exception {
> +        getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C", "E");
> +        getMockEndpoint("mock:error").expectedMessageCount(0);
> +
> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> +        Thread.sleep(1100);
> +        template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    public void testDuplicateAfterTimeout() throws Exception {
> +        getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C");
> +        getMockEndpoint("mock:error").expectedMessageCount(0);
> +
> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> +        Thread.sleep(1100);
> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    public void testOutOfSequenceAfterTimeout() throws Exception {
> +        getMockEndpoint("mock:result").expectedBodiesReceived("A", "C", "D");
> +        getMockEndpoint("mock:error").expectedBodiesReceived("B");
> +
> +        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> +        Thread.sleep(1100);
> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    public void testOutOfSequenceAfterTimeout2() throws Exception {
> +        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
> +        getMockEndpoint("mock:error").expectedBodiesReceived("A");
> +
> +        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> +        Thread.sleep(1100);
> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +
> +                from("direct:start")
> +                        .onException(MessageRejectedException.class).handled(true).to("mock:error").end()
> +                        .resequence(header("seqno")).stream().timeout(1000).rejectOld()
> +                        .to("mock:result");
> +            }
> +        };
> +    }
> +}
>
> Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
> ==============================================================================
> --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java (added)
> +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,32 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.spring.processor;
> +
> +import org.apache.camel.CamelContext;
> +import org.apache.camel.processor.ResequenceStreamRejectOldExchangesTest;
> +import org.apache.camel.processor.ResequencerTest;
> +
> +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
> +
> +/**
> + * @version
> + */
> +public class SpringResequenceStreamRejectOldExchangesTest extends ResequenceStreamRejectOldExchangesTest {
> +    protected CamelContext createCamelContext() throws Exception {
> +        return createSpringCamelContext(this, "org/apache/camel/spring/processor/resequencerRejectOld.xml");
> +    }
> +}
> \ No newline at end of file
>
> Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362163&view=auto
> ==============================================================================
> --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml (added)
> +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,42 @@
> +<?xml version="1.0" encoding="UTF-8"?>
> +<!--
> +    Licensed to the Apache Software Foundation (ASF) under one or more
> +    contributor license agreements.  See the NOTICE file distributed with
> +    this work for additional information regarding copyright ownership.
> +    The ASF licenses this file to You under the Apache License, Version 2.0
> +    (the "License"); you may not use this file except in compliance with
> +    the License.  You may obtain a copy of the License at
> +
> +    http://www.apache.org/licenses/LICENSE-2.0
> +
> +    Unless required by applicable law or agreed to in writing, software
> +    distributed under the License is distributed on an "AS IS" BASIS,
> +    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +    See the License for the specific language governing permissions and
> +    limitations under the License.
> +-->
> +<beans xmlns="http://www.springframework.org/schema/beans"
> +       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> +       xsi:schemaLocation="
> +       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
> +       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
> +
> +    <camelContext xmlns="http://camel.apache.org/schema/spring">
> +        <route>
> +            <from uri="direct:start"/>
> +            <onException>
> +                <exception>org.apache.camel.processor.resequencer.MessageRejectedException</exception>
> +                <handled><constant>true</constant></handled>
> +                <to uri="mock:error"/>
> +            </onException>
> +            <resequence>
> +                <stream-config capacity="100" timeout="1000">
> +                    <rejectOld>true</rejectOld>
> +                </stream-config>
> +                <header>seqno</header>
> +                <to uri="mock:result"/>
> +            </resequence>
> +        </route>
> +    </camelContext>
> +
> +</beans>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: svn commit: r1362163 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/model/config/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/proces

Posted by boday <be...@initekconsulting.com>.
funny, I had it that way to start then changed it to an element thinking it
was less invasive...I see your point though and just changed it back to use
@XmlAttribute instead.  


Claus Ibsen-2 wrote
> 
> Hi
> 
> 
> Its better to use an @XmlAttribute for the new option like the other
> options on stream config
> 
> +    @XmlElement
> +    private Boolean rejectOld;
> 
> Then the XML DSL is not as confusing and noisy, as it would just be an
> attribute instead of a xml tag.
> 
> Also remember that the Scala DSL may be impacted and needs an update.
> This may happen when you change the DSL.
> If I remember myself then I try to run a full test of camel-scala. But
> it may slip my mind sometimes, and then later i bites you :)
> 
> 
> 
> On Mon, Jul 16, 2012 at 7:51 PM,  &lt;boday@&gt; wrote:
>> Author: boday
>> Date: Mon Jul 16 17:51:17 2012
>> New Revision: 1362163
>>
>> URL: http://svn.apache.org/viewvc?rev=1362163&view=rev
>> Log:
>> CAMEL-4327 added "rejectOld" option to the Resequencer EIP to throw an
>> error if older messages are received after the last delivered message
>>
>> Added:
>>    
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>>    
>> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>>    
>> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>>    
>> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
>> Modified:
>>    
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>>    
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>>    
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>>    
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>>
>> Modified:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1362163&r1=1362162&r2=1362163&view=diff
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>> Mon Jul 16 17:51:17 2012
>> @@ -150,6 +150,18 @@ public class ResequenceDefinition extend
>>      }
>>
>>      /**
>> +     * Sets the rejectOld flag to throw an error when a message older
>> than the last delivered message is processed
>> +     * @return the builder
>> +     */
>> +    public ResequenceDefinition rejectOld() {
>> +        if (streamConfig == null) {
>> +            throw new IllegalStateException("rejectOld() only supported
>> for stream resequencer");
>> +        }
>> +        streamConfig.setRejectOld(true);
>> +        return this;
>> +    }
>> +
>> +    /**
>>       * Sets the in batch size for number of exchanges received
>>       * @param batchSize  the batch size
>>       * @return the builder
>> @@ -368,6 +380,7 @@ public class ResequenceDefinition extend
>>          StreamResequencer resequencer = new
>> StreamResequencer(routeContext.getCamelContext(), processor, comparator);
>>          resequencer.setTimeout(config.getTimeout());
>>          resequencer.setCapacity(config.getCapacity());
>> +        resequencer.setRejectOld(config.getRejectOld());
>>          if (config.getIgnoreInvalidExchanges() != null) {
>>             
>> resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
>>          }
>>
>> Modified:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1362163&r1=1362162&r2=1362163&view=diff
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>> Mon Jul 16 17:51:17 2012
>> @@ -19,6 +19,7 @@ package org.apache.camel.model.config;
>>  import javax.xml.bind.annotation.XmlAccessType;
>>  import javax.xml.bind.annotation.XmlAccessorType;
>>  import javax.xml.bind.annotation.XmlAttribute;
>> +import javax.xml.bind.annotation.XmlElement;
>>  import javax.xml.bind.annotation.XmlRootElement;
>>  import javax.xml.bind.annotation.XmlTransient;
>>
>> @@ -41,6 +42,8 @@ public class StreamResequencerConfig ext
>>      private Boolean ignoreInvalidExchanges;
>>      @XmlTransient
>>      private ExpressionResultComparator comparator;
>> +    @XmlElement
>> +    private Boolean rejectOld;
>>
>>      /**
>>       * Creates a new {@link StreamResequencerConfig} instance using
>> default
>> @@ -123,5 +126,13 @@ public class StreamResequencerConfig ext
>>      public void setComparator(ExpressionResultComparator comparator) {
>>          this.comparator = comparator;
>>      }
>> -
>> +
>> +    public void setRejectOld(boolean value) {
>> +        this.rejectOld = value;
>> +    }
>> +
>> +    public Boolean getRejectOld() {
>> +        return rejectOld;
>> +    }
>> +
>>  }
>>
>> Modified:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1362163&r1=1362162&r2=1362163&view=diff
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>> Mon Jul 16 17:51:17 2012
>> @@ -141,6 +141,10 @@ public class StreamResequencer extends S
>>          return ignoreInvalidExchanges;
>>      }
>>
>> +    public void setRejectOld(Boolean rejectOld) {
>> +        engine.setRejectOld(rejectOld);
>> +    }
>> +
>>      /**
>>       * Sets whether to ignore invalid exchanges which cannot be used by
>> this stream resequencer.
>>       * <p/>
>>
>> Added:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java?rev=1362163&view=auto
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>> (added)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>> Mon Jul 16 17:51:17 2012
>> @@ -0,0 +1,32 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version
>> 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor.resequencer;
>> +
>> +import org.apache.camel.RuntimeCamelException;
>> +
>> +/**
>> + * An exception thrown if message is rejected by the resequencer
>> + *
>> + * @version
>> + */
>> +public class MessageRejectedException extends RuntimeCamelException {
>> +    private static final long serialVersionUID = 5755929795399134568L;
>> +
>> +    public MessageRejectedException(String message) {
>> +        super(message);
>> +    }
>> +}
>> \ No newline at end of file
>>
>> Modified:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362163&r1=1362162&r2=1362163&view=diff
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>> Mon Jul 16 17:51:17 2012
>> @@ -87,6 +87,11 @@ public class ResequencerEngine<E> {
>>      private SequenceSender<E> sequenceSender;
>>
>>      /**
>> +     * Indicates whether an error should be thrown if message older
>> (based on Comparator) than the last delivered message is received.
>> +     */
>> +    private Boolean rejectOld;
>> +
>> +    /**
>>       * Creates a new resequencer instance with a default timeout of 2000
>>       * milliseconds.
>>       *
>> @@ -136,6 +141,14 @@ public class ResequencerEngine<E> {
>>          this.timeout = timeout;
>>      }
>>
>> +    public Boolean getRejectOld() {
>> +        return rejectOld;
>> +    }
>> +
>> +    public void setRejectOld(Boolean rejectOld) {
>> +        this.rejectOld = rejectOld;
>> +    }
>> +
>>      /**
>>       * Returns the sequence sender.
>>       *
>> @@ -209,6 +222,9 @@ public class ResequencerEngine<E> {
>>              // nothing to schedule
>>          } else if (sequence.predecessor(element) != null) {
>>              // nothing to schedule
>> +        } else if (rejectOld != null && rejectOld.booleanValue() &&
>> beforeLastDelivered(element)) {
>> +            throw new MessageRejectedException("rejecting message [" +
>> element.getObject()
>> +                    + "], it should have been sent before the last
>> delivered message [" + lastDelivered.getObject() + "]");
>>          } else {
>>              element.schedule(defineTimeout());
>>          }
>> @@ -283,6 +299,22 @@ public class ResequencerEngine<E> {
>>      }
>>
>>      /**
>> +     * Retuns <code>true</code> if the given element is before the last
>> delivered element.
>> +     *
>> +     * @param element an element.
>> +     * @return <code>true</code> if the given element is before the last
>> delivered element.
>> +     */
>> +    private boolean beforeLastDelivered(Element<E> element) {
>> +        if (lastDelivered == null) {
>> +            return false;
>> +        }
>> +        if (sequence.comparator().compare(element, lastDelivered) < 0) {
>> +            return true;
>> +        }
>> +        return false;
>> +    }
>> +
>> +    /**
>>       * Creates a timeout task based on the timeout setting of this
>> resequencer.
>>       *
>>       * @return a new timeout task.
>>
>> Added:
>> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>> (added)
>> +++
>> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>> Mon Jul 16 17:51:17 2012
>> @@ -0,0 +1,93 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version
>> 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.processor.resequencer.MessageRejectedException;
>> +
>> +/**
>> + *
>> + */
>> +public class ResequenceStreamRejectOldExchangesTest extends
>> ContextTestSupport {
>> +
>> +    public void testInSequenceAfterTimeout() throws Exception {
>> +        getMockEndpoint("mock:result").expectedBodiesReceived("A", "B",
>> "C", "E");
>> +        getMockEndpoint("mock:error").expectedMessageCount(0);
>> +
>> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
>> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
>> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
>> +        Thread.sleep(1100);
>> +        template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
>> +
>> +        assertMockEndpointsSatisfied();
>> +    }
>> +
>> +    public void testDuplicateAfterTimeout() throws Exception {
>> +        getMockEndpoint("mock:result").expectedBodiesReceived("A", "B",
>> "C");
>> +        getMockEndpoint("mock:error").expectedMessageCount(0);
>> +
>> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
>> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
>> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
>> +        Thread.sleep(1100);
>> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
>> +
>> +        assertMockEndpointsSatisfied();
>> +    }
>> +
>> +    public void testOutOfSequenceAfterTimeout() throws Exception {
>> +        getMockEndpoint("mock:result").expectedBodiesReceived("A", "C",
>> "D");
>> +        getMockEndpoint("mock:error").expectedBodiesReceived("B");
>> +
>> +        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
>> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
>> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
>> +        Thread.sleep(1100);
>> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
>> +
>> +        assertMockEndpointsSatisfied();
>> +    }
>> +
>> +    public void testOutOfSequenceAfterTimeout2() throws Exception {
>> +        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C",
>> "D");
>> +        getMockEndpoint("mock:error").expectedBodiesReceived("A");
>> +
>> +        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
>> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
>> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
>> +        Thread.sleep(1100);
>> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
>> +
>> +        assertMockEndpointsSatisfied();
>> +    }
>> +
>> +    @Override
>> +    protected RouteBuilder createRouteBuilder() throws Exception {
>> +        return new RouteBuilder() {
>> +            @Override
>> +            public void configure() throws Exception {
>> +
>> +                from("direct:start")
>> +                       
>> .onException(MessageRejectedException.class).handled(true).to("mock:error").end()
>> +                       
>> .resequence(header("seqno")).stream().timeout(1000).rejectOld()
>> +                        .to("mock:result");
>> +            }
>> +        };
>> +    }
>> +}
>>
>> Added:
>> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
>> ==============================================================================
>> ---
>> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>> (added)
>> +++
>> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>> Mon Jul 16 17:51:17 2012
>> @@ -0,0 +1,32 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version
>> 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.spring.processor;
>> +
>> +import org.apache.camel.CamelContext;
>> +import
>> org.apache.camel.processor.ResequenceStreamRejectOldExchangesTest;
>> +import org.apache.camel.processor.ResequencerTest;
>> +
>> +import static
>> org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
>> +
>> +/**
>> + * @version
>> + */
>> +public class SpringResequenceStreamRejectOldExchangesTest extends
>> ResequenceStreamRejectOldExchangesTest {
>> +    protected CamelContext createCamelContext() throws Exception {
>> +        return createSpringCamelContext(this,
>> "org/apache/camel/spring/processor/resequencerRejectOld.xml");
>> +    }
>> +}
>> \ No newline at end of file
>>
>> Added:
>> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362163&view=auto
>> ==============================================================================
>> ---
>> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
>> (added)
>> +++
>> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
>> Mon Jul 16 17:51:17 2012
>> @@ -0,0 +1,42 @@
>> +<?xml version="1.0" encoding="UTF-8"?>
>> +
>> +<beans xmlns="http://www.springframework.org/schema/beans"
>> +       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> +       xsi:schemaLocation="
>> +       http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans.xsd
>> +       http://camel.apache.org/schema/spring
>> http://camel.apache.org/schema/spring/camel-spring.xsd">
>> +
>> +    <camelContext xmlns="http://camel.apache.org/schema/spring">
>> +        <route>
>> +            <from uri="direct:start"/>
>> +            <onException>
>> +               
>> <exception>org.apache.camel.processor.resequencer.MessageRejectedException</exception>
>> +                <handled><constant>true</constant></handled>
>> +                <to uri="mock:error"/>
>> +            </onException>
>> +            <resequence>
>> +                <stream-config capacity="100" timeout="1000">
>> +                    <rejectOld>true</rejectOld>
>> +                </stream-config>
>> +                <header>seqno</header>
>> +                <to uri="mock:result"/>
>> +            </resequence>
>> +        </route>
>> +    </camelContext>
>> +
>> +</beans>
>>
>>
> 
> 
> 
> -- 
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> 


-----
Ben O'Day
IT Consultant -http://consulting-notes.com

--
View this message in context: http://camel.465427.n5.nabble.com/Re-svn-commit-r1362163-in-camel-trunk-camel-core-src-main-java-org-apache-camel-model-camel-core-srcs-tp5716120p5716211.html
Sent from the Camel Development mailing list archive at Nabble.com.