You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by "sebastien.petrucci" <se...@gmail.com> on 2010/12/02 14:31:10 UTC

Splitted exchange has incorrect correlation ID ?

Hello,

I'm an Apache Camel newbie and as I was digging into it I faced the
following issue with v2.5.0.

Say you have an exchange with an exchange ID "A".
After going through a splitter, I would expect the (splitted) child
exchange(s) to have a correlation ID pointing to "A". But it is not the
case. There is a correlation ID, but pointing to some other (unknown) ID.
Note that there is no ExchangeCreatedEvent corresponding to these IDs.

I doubt this is the expected behaviour, isn't it ??

After looking at the source code, I noticed that the original exchange is
copied (Splitter.java, line 140). This copy obtains a new exchange ID ...
which is not advertised to the EventNotifier and which is also later
referred in the correlation ID of the child exchanges. Unfortunately, that
does not allow tracing the children exchange to their parent.

Regards,
Sebastien.

Here is a test case showing that behaviour.

import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.management.EventNotifierSupport;
import org.apache.camel.management.event.*;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;

import java.util.EventObject;

public class SplitExchangeTestCase extends CamelTestSupport {

    @Test
    public void testSplitterExchangeBehaviour() throws Exception {

        // Setting logging event notifier -- DO NOT SEEM TO WORK
        // this.context.getManagementStrategy().addEventNotifier(new
MyEventNotifier());

        // Create initial exchange that will be split
        Exchange parentExchange = createExchangeWithBody("1,2,3");

        // Display ID of our initial exchange
        log.info(String.format("Sent Exchange [%s]",
parentExchange.getExchangeId()));

        // setup expectations
        MockEndpoint mock = getMockEndpoint("mock:split");
        mock.expectedBodiesReceived("1", "2", "3");
        // these expectations are NOT satisfied
//       
mock.message(0).property(Exchange.CORRELATION_ID).isEqualTo(parentExchange.getExchangeId());
//       
mock.message(1).property(Exchange.CORRELATION_ID).isEqualTo(parentExchange.getExchangeId());
//       
mock.message(2).property(Exchange.CORRELATION_ID).isEqualTo(parentExchange.getExchangeId());

        // Send parent exchange
        template.send("direct:start", parentExchange);

        assertMockEndpointsSatisfied();

        // display the child exchange ID and correlation ID
        for (Exchange exc : mock.getExchanges()) {
            log.info(String.format("Received Exchange [%s] correlated to
[%s]", exc.getExchangeId(), exc.getProperty(Exchange.CORRELATION_ID)));
        }
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start")
                        .split(body().tokenize(","))
                        .to("mock:split");
            }
        };
    }

    private static class MyEventNotifier extends EventNotifierSupport {
        @Override
        public void notify(EventObject event) throws Exception {
            if (event instanceof ExchangeCreatedEvent) {
                ExchangeCreatedEvent e = (ExchangeCreatedEvent) event;
                log.info(String.format("Exchange [%s] CREATED and correlated
to [%s]",
                        e.getExchange().getExchangeId(),
                       
e.getExchange().getProperty(Exchange.CORRELATION_ID)));
            } else if (event instanceof ExchangeSentEvent) {
                ExchangeSentEvent e = (ExchangeSentEvent) event;
                log.info(String.format("Exchange [%s] correlated to [%s]
SENT to %s",
                        e.getExchange().getExchangeId(),
                       
e.getExchange().getProperty(Exchange.CORRELATION_ID),
                        e.getEndpoint().getEndpointUri()));
            } else if (event instanceof ExchangeCompletedEvent) {
                ExchangeCompletedEvent e = (ExchangeCompletedEvent) event;
                log.info(String.format("Exchange [%s] correlated to [%s] was
COMPLETED",
                        e.getExchange().getExchangeId(),
                       
e.getExchange().getProperty(Exchange.CORRELATION_ID)));
            } else if (event instanceof ExchangeFailedEvent) {
                ExchangeFailedEvent e = (ExchangeFailedEvent) event;
                log.info(String.format("Exchange [%s] correlated to [%s] was
FAILED",
                        e.getExchange().getExchangeId(),
                       
e.getExchange().getProperty(Exchange.CORRELATION_ID)));
            }
        }

        @Override
        public boolean isEnabled(EventObject event) {
            return (event instanceof AbstractExchangeEvent);
        }

        @Override
        protected void doStart() throws Exception {
        }

        @Override
        protected void doStop() throws Exception {
        }
    }
}

-- 
View this message in context: http://camel.465427.n5.nabble.com/Splitted-exchange-has-incorrect-correlation-ID-tp3289354p3289354.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Splitted exchange has incorrect correlation ID ?

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Thanks for reporting this. I have created a ticket
https://issues.apache.org/jira/browse/CAMEL-3395

And will commit a fix shortly (if my fix passed the tests)


On Thu, Dec 2, 2010 at 2:31 PM, sebastien.petrucci
<se...@gmail.com> wrote:
>
> Hello,
>
> I'm an Apache Camel newbie and as I was digging into it I faced the
> following issue with v2.5.0.
>
> Say you have an exchange with an exchange ID "A".
> After going through a splitter, I would expect the (splitted) child
> exchange(s) to have a correlation ID pointing to "A". But it is not the
> case. There is a correlation ID, but pointing to some other (unknown) ID.
> Note that there is no ExchangeCreatedEvent corresponding to these IDs.
>
> I doubt this is the expected behaviour, isn't it ??
>
> After looking at the source code, I noticed that the original exchange is
> copied (Splitter.java, line 140). This copy obtains a new exchange ID ...
> which is not advertised to the EventNotifier and which is also later
> referred in the correlation ID of the child exchanges. Unfortunately, that
> does not allow tracing the children exchange to their parent.
>
> Regards,
> Sebastien.
>
> Here is a test case showing that behaviour.
>
> import org.apache.camel.Exchange;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.management.EventNotifierSupport;
> import org.apache.camel.management.event.*;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
>
> import java.util.EventObject;
>
> public class SplitExchangeTestCase extends CamelTestSupport {
>
>    @Test
>    public void testSplitterExchangeBehaviour() throws Exception {
>
>        // Setting logging event notifier -- DO NOT SEEM TO WORK
>        // this.context.getManagementStrategy().addEventNotifier(new
> MyEventNotifier());
>
>        // Create initial exchange that will be split
>        Exchange parentExchange = createExchangeWithBody("1,2,3");
>
>        // Display ID of our initial exchange
>        log.info(String.format("Sent Exchange [%s]",
> parentExchange.getExchangeId()));
>
>        // setup expectations
>        MockEndpoint mock = getMockEndpoint("mock:split");
>        mock.expectedBodiesReceived("1", "2", "3");
>        // these expectations are NOT satisfied
> //
> mock.message(0).property(Exchange.CORRELATION_ID).isEqualTo(parentExchange.getExchangeId());
> //
> mock.message(1).property(Exchange.CORRELATION_ID).isEqualTo(parentExchange.getExchangeId());
> //
> mock.message(2).property(Exchange.CORRELATION_ID).isEqualTo(parentExchange.getExchangeId());
>
>        // Send parent exchange
>        template.send("direct:start", parentExchange);
>
>        assertMockEndpointsSatisfied();
>
>        // display the child exchange ID and correlation ID
>        for (Exchange exc : mock.getExchanges()) {
>            log.info(String.format("Received Exchange [%s] correlated to
> [%s]", exc.getExchangeId(), exc.getProperty(Exchange.CORRELATION_ID)));
>        }
>    }
>
>    @Override
>    protected RouteBuilder createRouteBuilder() throws Exception {
>        return new RouteBuilder() {
>            @Override
>            public void configure() throws Exception {
>                from("direct:start")
>                        .split(body().tokenize(","))
>                        .to("mock:split");
>            }
>        };
>    }
>
>    private static class MyEventNotifier extends EventNotifierSupport {
>        @Override
>        public void notify(EventObject event) throws Exception {
>            if (event instanceof ExchangeCreatedEvent) {
>                ExchangeCreatedEvent e = (ExchangeCreatedEvent) event;
>                log.info(String.format("Exchange [%s] CREATED and correlated
> to [%s]",
>                        e.getExchange().getExchangeId(),
>
> e.getExchange().getProperty(Exchange.CORRELATION_ID)));
>            } else if (event instanceof ExchangeSentEvent) {
>                ExchangeSentEvent e = (ExchangeSentEvent) event;
>                log.info(String.format("Exchange [%s] correlated to [%s]
> SENT to %s",
>                        e.getExchange().getExchangeId(),
>
> e.getExchange().getProperty(Exchange.CORRELATION_ID),
>                        e.getEndpoint().getEndpointUri()));
>            } else if (event instanceof ExchangeCompletedEvent) {
>                ExchangeCompletedEvent e = (ExchangeCompletedEvent) event;
>                log.info(String.format("Exchange [%s] correlated to [%s] was
> COMPLETED",
>                        e.getExchange().getExchangeId(),
>
> e.getExchange().getProperty(Exchange.CORRELATION_ID)));
>            } else if (event instanceof ExchangeFailedEvent) {
>                ExchangeFailedEvent e = (ExchangeFailedEvent) event;
>                log.info(String.format("Exchange [%s] correlated to [%s] was
> FAILED",
>                        e.getExchange().getExchangeId(),
>
> e.getExchange().getProperty(Exchange.CORRELATION_ID)));
>            }
>        }
>
>        @Override
>        public boolean isEnabled(EventObject event) {
>            return (event instanceof AbstractExchangeEvent);
>        }
>
>        @Override
>        protected void doStart() throws Exception {
>        }
>
>        @Override
>        protected void doStop() throws Exception {
>        }
>    }
> }
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Splitted-exchange-has-incorrect-correlation-ID-tp3289354p3289354.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/