You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Claus Ibsen <cl...@gmail.com> on 2012/07/16 21:12:41 UTC
Re: svn commit: r1362163 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/model/config/ camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/main/java/org/apache/camel/proces
Hi
Its better to use an @XmlAttribute for the new option like the other
options on stream config
+ @XmlElement
+ private Boolean rejectOld;
Then the XML DSL is not as confusing and noisy, as it would just be an
attribute instead of a xml tag.
Also remember that the Scala DSL may be impacted and needs an update.
This may happen when you change the DSL.
If I remember myself then I try to run a full test of camel-scala. But
it may slip my mind sometimes, and then later i bites you :)
On Mon, Jul 16, 2012 at 7:51 PM, <bo...@apache.org> wrote:
> Author: boday
> Date: Mon Jul 16 17:51:17 2012
> New Revision: 1362163
>
> URL: http://svn.apache.org/viewvc?rev=1362163&view=rev
> Log:
> CAMEL-4327 added "rejectOld" option to the Resequencer EIP to throw an error if older messages are received after the last delivered message
>
> Added:
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
> Modified:
> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java Mon Jul 16 17:51:17 2012
> @@ -150,6 +150,18 @@ public class ResequenceDefinition extend
> }
>
> /**
> + * Sets the rejectOld flag to throw an error when a message older than the last delivered message is processed
> + * @return the builder
> + */
> + public ResequenceDefinition rejectOld() {
> + if (streamConfig == null) {
> + throw new IllegalStateException("rejectOld() only supported for stream resequencer");
> + }
> + streamConfig.setRejectOld(true);
> + return this;
> + }
> +
> + /**
> * Sets the in batch size for number of exchanges received
> * @param batchSize the batch size
> * @return the builder
> @@ -368,6 +380,7 @@ public class ResequenceDefinition extend
> StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), processor, comparator);
> resequencer.setTimeout(config.getTimeout());
> resequencer.setCapacity(config.getCapacity());
> + resequencer.setRejectOld(config.getRejectOld());
> if (config.getIgnoreInvalidExchanges() != null) {
> resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
> }
>
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java Mon Jul 16 17:51:17 2012
> @@ -19,6 +19,7 @@ package org.apache.camel.model.config;
> import javax.xml.bind.annotation.XmlAccessType;
> import javax.xml.bind.annotation.XmlAccessorType;
> import javax.xml.bind.annotation.XmlAttribute;
> +import javax.xml.bind.annotation.XmlElement;
> import javax.xml.bind.annotation.XmlRootElement;
> import javax.xml.bind.annotation.XmlTransient;
>
> @@ -41,6 +42,8 @@ public class StreamResequencerConfig ext
> private Boolean ignoreInvalidExchanges;
> @XmlTransient
> private ExpressionResultComparator comparator;
> + @XmlElement
> + private Boolean rejectOld;
>
> /**
> * Creates a new {@link StreamResequencerConfig} instance using default
> @@ -123,5 +126,13 @@ public class StreamResequencerConfig ext
> public void setComparator(ExpressionResultComparator comparator) {
> this.comparator = comparator;
> }
> -
> +
> + public void setRejectOld(boolean value) {
> + this.rejectOld = value;
> + }
> +
> + public Boolean getRejectOld() {
> + return rejectOld;
> + }
> +
> }
>
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Mon Jul 16 17:51:17 2012
> @@ -141,6 +141,10 @@ public class StreamResequencer extends S
> return ignoreInvalidExchanges;
> }
>
> + public void setRejectOld(Boolean rejectOld) {
> + engine.setRejectOld(rejectOld);
> + }
> +
> /**
> * Sets whether to ignore invalid exchanges which cannot be used by this stream resequencer.
> * <p/>
>
> Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java?rev=1362163&view=auto
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java (added)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,32 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor.resequencer;
> +
> +import org.apache.camel.RuntimeCamelException;
> +
> +/**
> + * An exception thrown if message is rejected by the resequencer
> + *
> + * @version
> + */
> +public class MessageRejectedException extends RuntimeCamelException {
> + private static final long serialVersionUID = 5755929795399134568L;
> +
> + public MessageRejectedException(String message) {
> + super(message);
> + }
> +}
> \ No newline at end of file
>
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java Mon Jul 16 17:51:17 2012
> @@ -87,6 +87,11 @@ public class ResequencerEngine<E> {
> private SequenceSender<E> sequenceSender;
>
> /**
> + * Indicates whether an error should be thrown if message older (based on Comparator) than the last delivered message is received.
> + */
> + private Boolean rejectOld;
> +
> + /**
> * Creates a new resequencer instance with a default timeout of 2000
> * milliseconds.
> *
> @@ -136,6 +141,14 @@ public class ResequencerEngine<E> {
> this.timeout = timeout;
> }
>
> + public Boolean getRejectOld() {
> + return rejectOld;
> + }
> +
> + public void setRejectOld(Boolean rejectOld) {
> + this.rejectOld = rejectOld;
> + }
> +
> /**
> * Returns the sequence sender.
> *
> @@ -209,6 +222,9 @@ public class ResequencerEngine<E> {
> // nothing to schedule
> } else if (sequence.predecessor(element) != null) {
> // nothing to schedule
> + } else if (rejectOld != null && rejectOld.booleanValue() && beforeLastDelivered(element)) {
> + throw new MessageRejectedException("rejecting message [" + element.getObject()
> + + "], it should have been sent before the last delivered message [" + lastDelivered.getObject() + "]");
> } else {
> element.schedule(defineTimeout());
> }
> @@ -283,6 +299,22 @@ public class ResequencerEngine<E> {
> }
>
> /**
> + * Retuns <code>true</code> if the given element is before the last delivered element.
> + *
> + * @param element an element.
> + * @return <code>true</code> if the given element is before the last delivered element.
> + */
> + private boolean beforeLastDelivered(Element<E> element) {
> + if (lastDelivered == null) {
> + return false;
> + }
> + if (sequence.comparator().compare(element, lastDelivered) < 0) {
> + return true;
> + }
> + return false;
> + }
> +
> + /**
> * Creates a timeout task based on the timeout setting of this resequencer.
> *
> * @return a new timeout task.
>
> Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
> ==============================================================================
> --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java (added)
> +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,93 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.processor.resequencer.MessageRejectedException;
> +
> +/**
> + *
> + */
> +public class ResequenceStreamRejectOldExchangesTest extends ContextTestSupport {
> +
> + public void testInSequenceAfterTimeout() throws Exception {
> + getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C", "E");
> + getMockEndpoint("mock:error").expectedMessageCount(0);
> +
> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> + Thread.sleep(1100);
> + template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
> +
> + assertMockEndpointsSatisfied();
> + }
> +
> + public void testDuplicateAfterTimeout() throws Exception {
> + getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C");
> + getMockEndpoint("mock:error").expectedMessageCount(0);
> +
> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> + Thread.sleep(1100);
> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> +
> + assertMockEndpointsSatisfied();
> + }
> +
> + public void testOutOfSequenceAfterTimeout() throws Exception {
> + getMockEndpoint("mock:result").expectedBodiesReceived("A", "C", "D");
> + getMockEndpoint("mock:error").expectedBodiesReceived("B");
> +
> + template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> + Thread.sleep(1100);
> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> +
> + assertMockEndpointsSatisfied();
> + }
> +
> + public void testOutOfSequenceAfterTimeout2() throws Exception {
> + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
> + getMockEndpoint("mock:error").expectedBodiesReceived("A");
> +
> + template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> + Thread.sleep(1100);
> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> +
> + assertMockEndpointsSatisfied();
> + }
> +
> + @Override
> + protected RouteBuilder createRouteBuilder() throws Exception {
> + return new RouteBuilder() {
> + @Override
> + public void configure() throws Exception {
> +
> + from("direct:start")
> + .onException(MessageRejectedException.class).handled(true).to("mock:error").end()
> + .resequence(header("seqno")).stream().timeout(1000).rejectOld()
> + .to("mock:result");
> + }
> + };
> + }
> +}
>
> Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
> ==============================================================================
> --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java (added)
> +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,32 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.spring.processor;
> +
> +import org.apache.camel.CamelContext;
> +import org.apache.camel.processor.ResequenceStreamRejectOldExchangesTest;
> +import org.apache.camel.processor.ResequencerTest;
> +
> +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
> +
> +/**
> + * @version
> + */
> +public class SpringResequenceStreamRejectOldExchangesTest extends ResequenceStreamRejectOldExchangesTest {
> + protected CamelContext createCamelContext() throws Exception {
> + return createSpringCamelContext(this, "org/apache/camel/spring/processor/resequencerRejectOld.xml");
> + }
> +}
> \ No newline at end of file
>
> Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362163&view=auto
> ==============================================================================
> --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml (added)
> +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,42 @@
> +<?xml version="1.0" encoding="UTF-8"?>
> +<!--
> + Licensed to the Apache Software Foundation (ASF) under one or more
> + contributor license agreements. See the NOTICE file distributed with
> + this work for additional information regarding copyright ownership.
> + The ASF licenses this file to You under the Apache License, Version 2.0
> + (the "License"); you may not use this file except in compliance with
> + the License. You may obtain a copy of the License at
> +
> + http://www.apache.org/licenses/LICENSE-2.0
> +
> + Unless required by applicable law or agreed to in writing, software
> + distributed under the License is distributed on an "AS IS" BASIS,
> + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + See the License for the specific language governing permissions and
> + limitations under the License.
> +-->
> +<beans xmlns="http://www.springframework.org/schema/beans"
> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> + xsi:schemaLocation="
> + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
> + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
> +
> + <camelContext xmlns="http://camel.apache.org/schema/spring">
> + <route>
> + <from uri="direct:start"/>
> + <onException>
> + <exception>org.apache.camel.processor.resequencer.MessageRejectedException</exception>
> + <handled><constant>true</constant></handled>
> + <to uri="mock:error"/>
> + </onException>
> + <resequence>
> + <stream-config capacity="100" timeout="1000">
> + <rejectOld>true</rejectOld>
> + </stream-config>
> + <header>seqno</header>
> + <to uri="mock:result"/>
> + </resequence>
> + </route>
> + </camelContext>
> +
> +</beans>
>
>
--
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
Re: svn commit: r1362163 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/model/config/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/main/java/org/apache/camel/proces
Posted by boday <be...@initekconsulting.com>.
funny, I had it that way to start then changed it to an element thinking it
was less invasive...I see your point though and just changed it back to use
@XmlAttribute instead.
Claus Ibsen-2 wrote
>
> Hi
>
>
> Its better to use an @XmlAttribute for the new option like the other
> options on stream config
>
> + @XmlElement
> + private Boolean rejectOld;
>
> Then the XML DSL is not as confusing and noisy, as it would just be an
> attribute instead of a xml tag.
>
> Also remember that the Scala DSL may be impacted and needs an update.
> This may happen when you change the DSL.
> If I remember myself then I try to run a full test of camel-scala. But
> it may slip my mind sometimes, and then later i bites you :)
>
>
>
> On Mon, Jul 16, 2012 at 7:51 PM, <boday@> wrote:
>> Author: boday
>> Date: Mon Jul 16 17:51:17 2012
>> New Revision: 1362163
>>
>> URL: http://svn.apache.org/viewvc?rev=1362163&view=rev
>> Log:
>> CAMEL-4327 added "rejectOld" option to the Resequencer EIP to throw an
>> error if older messages are received after the last delivered message
>>
>> Added:
>>
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>>
>> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>>
>> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>>
>> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
>> Modified:
>>
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>>
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>>
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>>
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>>
>> Modified:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1362163&r1=1362162&r2=1362163&view=diff
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>> Mon Jul 16 17:51:17 2012
>> @@ -150,6 +150,18 @@ public class ResequenceDefinition extend
>> }
>>
>> /**
>> + * Sets the rejectOld flag to throw an error when a message older
>> than the last delivered message is processed
>> + * @return the builder
>> + */
>> + public ResequenceDefinition rejectOld() {
>> + if (streamConfig == null) {
>> + throw new IllegalStateException("rejectOld() only supported
>> for stream resequencer");
>> + }
>> + streamConfig.setRejectOld(true);
>> + return this;
>> + }
>> +
>> + /**
>> * Sets the in batch size for number of exchanges received
>> * @param batchSize the batch size
>> * @return the builder
>> @@ -368,6 +380,7 @@ public class ResequenceDefinition extend
>> StreamResequencer resequencer = new
>> StreamResequencer(routeContext.getCamelContext(), processor, comparator);
>> resequencer.setTimeout(config.getTimeout());
>> resequencer.setCapacity(config.getCapacity());
>> + resequencer.setRejectOld(config.getRejectOld());
>> if (config.getIgnoreInvalidExchanges() != null) {
>>
>> resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
>> }
>>
>> Modified:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1362163&r1=1362162&r2=1362163&view=diff
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>> Mon Jul 16 17:51:17 2012
>> @@ -19,6 +19,7 @@ package org.apache.camel.model.config;
>> import javax.xml.bind.annotation.XmlAccessType;
>> import javax.xml.bind.annotation.XmlAccessorType;
>> import javax.xml.bind.annotation.XmlAttribute;
>> +import javax.xml.bind.annotation.XmlElement;
>> import javax.xml.bind.annotation.XmlRootElement;
>> import javax.xml.bind.annotation.XmlTransient;
>>
>> @@ -41,6 +42,8 @@ public class StreamResequencerConfig ext
>> private Boolean ignoreInvalidExchanges;
>> @XmlTransient
>> private ExpressionResultComparator comparator;
>> + @XmlElement
>> + private Boolean rejectOld;
>>
>> /**
>> * Creates a new {@link StreamResequencerConfig} instance using
>> default
>> @@ -123,5 +126,13 @@ public class StreamResequencerConfig ext
>> public void setComparator(ExpressionResultComparator comparator) {
>> this.comparator = comparator;
>> }
>> -
>> +
>> + public void setRejectOld(boolean value) {
>> + this.rejectOld = value;
>> + }
>> +
>> + public Boolean getRejectOld() {
>> + return rejectOld;
>> + }
>> +
>> }
>>
>> Modified:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1362163&r1=1362162&r2=1362163&view=diff
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>> Mon Jul 16 17:51:17 2012
>> @@ -141,6 +141,10 @@ public class StreamResequencer extends S
>> return ignoreInvalidExchanges;
>> }
>>
>> + public void setRejectOld(Boolean rejectOld) {
>> + engine.setRejectOld(rejectOld);
>> + }
>> +
>> /**
>> * Sets whether to ignore invalid exchanges which cannot be used by
>> this stream resequencer.
>> * <p/>
>>
>> Added:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java?rev=1362163&view=auto
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>> (added)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>> Mon Jul 16 17:51:17 2012
>> @@ -0,0 +1,32 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version
>> 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor.resequencer;
>> +
>> +import org.apache.camel.RuntimeCamelException;
>> +
>> +/**
>> + * An exception thrown if message is rejected by the resequencer
>> + *
>> + * @version
>> + */
>> +public class MessageRejectedException extends RuntimeCamelException {
>> + private static final long serialVersionUID = 5755929795399134568L;
>> +
>> + public MessageRejectedException(String message) {
>> + super(message);
>> + }
>> +}
>> \ No newline at end of file
>>
>> Modified:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362163&r1=1362162&r2=1362163&view=diff
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>> Mon Jul 16 17:51:17 2012
>> @@ -87,6 +87,11 @@ public class ResequencerEngine<E> {
>> private SequenceSender<E> sequenceSender;
>>
>> /**
>> + * Indicates whether an error should be thrown if message older
>> (based on Comparator) than the last delivered message is received.
>> + */
>> + private Boolean rejectOld;
>> +
>> + /**
>> * Creates a new resequencer instance with a default timeout of 2000
>> * milliseconds.
>> *
>> @@ -136,6 +141,14 @@ public class ResequencerEngine<E> {
>> this.timeout = timeout;
>> }
>>
>> + public Boolean getRejectOld() {
>> + return rejectOld;
>> + }
>> +
>> + public void setRejectOld(Boolean rejectOld) {
>> + this.rejectOld = rejectOld;
>> + }
>> +
>> /**
>> * Returns the sequence sender.
>> *
>> @@ -209,6 +222,9 @@ public class ResequencerEngine<E> {
>> // nothing to schedule
>> } else if (sequence.predecessor(element) != null) {
>> // nothing to schedule
>> + } else if (rejectOld != null && rejectOld.booleanValue() &&
>> beforeLastDelivered(element)) {
>> + throw new MessageRejectedException("rejecting message [" +
>> element.getObject()
>> + + "], it should have been sent before the last
>> delivered message [" + lastDelivered.getObject() + "]");
>> } else {
>> element.schedule(defineTimeout());
>> }
>> @@ -283,6 +299,22 @@ public class ResequencerEngine<E> {
>> }
>>
>> /**
>> + * Retuns <code>true</code> if the given element is before the last
>> delivered element.
>> + *
>> + * @param element an element.
>> + * @return <code>true</code> if the given element is before the last
>> delivered element.
>> + */
>> + private boolean beforeLastDelivered(Element<E> element) {
>> + if (lastDelivered == null) {
>> + return false;
>> + }
>> + if (sequence.comparator().compare(element, lastDelivered) < 0) {
>> + return true;
>> + }
>> + return false;
>> + }
>> +
>> + /**
>> * Creates a timeout task based on the timeout setting of this
>> resequencer.
>> *
>> * @return a new timeout task.
>>
>> Added:
>> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>> (added)
>> +++
>> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>> Mon Jul 16 17:51:17 2012
>> @@ -0,0 +1,93 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version
>> 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.processor.resequencer.MessageRejectedException;
>> +
>> +/**
>> + *
>> + */
>> +public class ResequenceStreamRejectOldExchangesTest extends
>> ContextTestSupport {
>> +
>> + public void testInSequenceAfterTimeout() throws Exception {
>> + getMockEndpoint("mock:result").expectedBodiesReceived("A", "B",
>> "C", "E");
>> + getMockEndpoint("mock:error").expectedMessageCount(0);
>> +
>> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
>> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
>> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
>> + Thread.sleep(1100);
>> + template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
>> +
>> + assertMockEndpointsSatisfied();
>> + }
>> +
>> + public void testDuplicateAfterTimeout() throws Exception {
>> + getMockEndpoint("mock:result").expectedBodiesReceived("A", "B",
>> "C");
>> + getMockEndpoint("mock:error").expectedMessageCount(0);
>> +
>> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
>> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
>> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
>> + Thread.sleep(1100);
>> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
>> +
>> + assertMockEndpointsSatisfied();
>> + }
>> +
>> + public void testOutOfSequenceAfterTimeout() throws Exception {
>> + getMockEndpoint("mock:result").expectedBodiesReceived("A", "C",
>> "D");
>> + getMockEndpoint("mock:error").expectedBodiesReceived("B");
>> +
>> + template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
>> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
>> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
>> + Thread.sleep(1100);
>> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
>> +
>> + assertMockEndpointsSatisfied();
>> + }
>> +
>> + public void testOutOfSequenceAfterTimeout2() throws Exception {
>> + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C",
>> "D");
>> + getMockEndpoint("mock:error").expectedBodiesReceived("A");
>> +
>> + template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
>> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
>> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
>> + Thread.sleep(1100);
>> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
>> +
>> + assertMockEndpointsSatisfied();
>> + }
>> +
>> + @Override
>> + protected RouteBuilder createRouteBuilder() throws Exception {
>> + return new RouteBuilder() {
>> + @Override
>> + public void configure() throws Exception {
>> +
>> + from("direct:start")
>> +
>> .onException(MessageRejectedException.class).handled(true).to("mock:error").end()
>> +
>> .resequence(header("seqno")).stream().timeout(1000).rejectOld()
>> + .to("mock:result");
>> + }
>> + };
>> + }
>> +}
>>
>> Added:
>> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
>> ==============================================================================
>> ---
>> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>> (added)
>> +++
>> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>> Mon Jul 16 17:51:17 2012
>> @@ -0,0 +1,32 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version
>> 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.spring.processor;
>> +
>> +import org.apache.camel.CamelContext;
>> +import
>> org.apache.camel.processor.ResequenceStreamRejectOldExchangesTest;
>> +import org.apache.camel.processor.ResequencerTest;
>> +
>> +import static
>> org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
>> +
>> +/**
>> + * @version
>> + */
>> +public class SpringResequenceStreamRejectOldExchangesTest extends
>> ResequenceStreamRejectOldExchangesTest {
>> + protected CamelContext createCamelContext() throws Exception {
>> + return createSpringCamelContext(this,
>> "org/apache/camel/spring/processor/resequencerRejectOld.xml");
>> + }
>> +}
>> \ No newline at end of file
>>
>> Added:
>> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362163&view=auto
>> ==============================================================================
>> ---
>> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
>> (added)
>> +++
>> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
>> Mon Jul 16 17:51:17 2012
>> @@ -0,0 +1,42 @@
>> +<?xml version="1.0" encoding="UTF-8"?>
>> +
>> +<beans xmlns="http://www.springframework.org/schema/beans"
>> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> + xsi:schemaLocation="
>> + http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans.xsd
>> + http://camel.apache.org/schema/spring
>> http://camel.apache.org/schema/spring/camel-spring.xsd">
>> +
>> + <camelContext xmlns="http://camel.apache.org/schema/spring">
>> + <route>
>> + <from uri="direct:start"/>
>> + <onException>
>> +
>> <exception>org.apache.camel.processor.resequencer.MessageRejectedException</exception>
>> + <handled><constant>true</constant></handled>
>> + <to uri="mock:error"/>
>> + </onException>
>> + <resequence>
>> + <stream-config capacity="100" timeout="1000">
>> + <rejectOld>true</rejectOld>
>> + </stream-config>
>> + <header>seqno</header>
>> + <to uri="mock:result"/>
>> + </resequence>
>> + </route>
>> + </camelContext>
>> +
>> +</beans>
>>
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
>
-----
Ben O'Day
IT Consultant -http://consulting-notes.com
--
View this message in context: http://camel.465427.n5.nabble.com/Re-svn-commit-r1362163-in-camel-trunk-camel-core-src-main-java-org-apache-camel-model-camel-core-srcs-tp5716120p5716211.html
Sent from the Camel Development mailing list archive at Nabble.com.