You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Christophe Pache <ch...@gmail.com> on 2015/04/01 13:52:14 UTC

parallelProcessing with multicast

Hello everyone

I have tried to test the parallel processing with Camel. The split was easy
and worked out of the box. I have some surprises and I'm wondering whenever
I'm understanding correctly the expected behaviour with the multicast. In
the Code below, I try to send messages to two routes. The "even" route takes
more time than the "odd" one. I would like to get the odd number I'm sending
before the even number. I was expecting the odd number to be processed
faster than the other, am I correct?

I'm currently getting the exchange in the same order I sent, is it the
expected behavior?

Thanks and have a nice day

Christophe

I'm using camel 2.12:



      val croute = new org.apache.camel.builder.RouteBuilder {
        override def configure(): Unit = {
          from("direct:input").multicast().parallelProcessing().
            to("direct:even", "direct:odd").end()

          from("direct:odd").filter(body.isEqualTo(1)).process(new Processor
{
            override def process(exchange: Exchange): Unit = {
              println(exchange.getIn.getBody)
            }
          }).to("mock:output")

          from("direct:even").filter(body.isEqualTo(0)).
            process(new Processor {
            override def process(exchange: Exchange): Unit = {
              println(exchange.getIn.getBody)
              Thread.sleep(1000)
              //or to avoid side effects on the Thread: (1 to
1000000).foreach(x => scala.util.Random.nextDouble())
              }
            }
          ).
            to("mock:output")
        }
      }

//.....
 val producer = camelContext.createProducerTemplate()
List(0, 1, 0, 1, 0, 1, 0, 1, 0, 1).foreach(x =>
producer.sendBody("direct:input", x))




--
View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: parallelProcessing with multicast

Posted by Christophe Pache <ch...@gmail.com>.
Thanks, Alexey


I added the end and then it worked.
Well, Claus was true also: Now, I've understood the parallel processing
is only on one exchange in one EIP at a time.

My sample works, thanks guys!!!

have a nice day


Le 01. 04. 15 15:47, alexey-s [via Camel] a écrit :
> An incident that is necessary to call a method .end()
> 
> from("...")
>     .filter()
>         .to("myprocess")
>         .to("mock.out")
>     .end();
> 
> 
> Your account is perceived as
> 
> from("...")
>     .filter()
>         .to("myprocess")
>     .end()
>     .to("mock.out");
> 
> 
> ------------------------------------------------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765154.html
> 
> To unsubscribe from parallelProcessing with multicast, click here
> <http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=5765146&code=Y2hwYWNoZUBnbWFpbC5jb218NTc2NTE0NnwtMTU5MDMwMjIzOQ==>.
> NAML
> <http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
> 




--
View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765157.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: parallelProcessing with multicast

Posted by alexey-s <al...@mail.ru>.
An incident that is necessary to call a method .end()

from("...")
    .filter()
        .to("myprocess")
        .to("mock.out")
    .end();


Your account is perceived as

from("...")
    .filter()
        .to("myprocess")
    .end()
    .to("mock.out");




--
View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765154.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: parallelProcessing with multicast

Posted by Christophe Pache <ch...@gmail.com>.
Thanks, You are true.

Sorry, I should have pasted the code where I send exchanges again. 

List(0, 1, 0, 1, 0, 1, 0, 1, 0, 1).foreach(x =>sendBody("direct:input", x)) 

I'm sending 10 exchanges. I hope I'll receive 10 in the end and I'm not
expecting aggregation in fact. I've controlled each sub route sends 5
exchanges to the final mock endpoint.
I receive 10 exchanges but this is just the order which interests me on this
case.

>From what I understand, this is the aggregation of results that I should
make parallel to get the odd results faster than the even one. I'm fine with
that, I would just find a way to be sure the processing (in the sub routes)
is done in parallel. 

I've had a look to  MultiCastParallelAndStreamCachingTest.java
<https://github.com/apache/camel/blob/master/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingTest.java
>   but did not find tests that validate it yet.

My latest try was to print iteratively in both sub route to prove the print
may occur at the same time, but did not succeed:

      val croute = new org.apache.camel.builder.RouteBuilder {
        override def configure(): Unit = {
          from("direct:input").multicast().parallelProcessing().streaming().
            to("direct:even", "direct:odd").end()

          from("direct:odd").filter(body.isEqualTo(1)).
            process(new Processor {
            override def process(exchange: Exchange): Unit = {
              (1 to 10).foreach(x => {
                Thread.sleep(100)
                println("odd")
              })
            }
          }
            ).to("mock:output")

          from("direct:even").filter(body.isEqualTo(0)).
            process(new Processor {
            override def process(exchange: Exchange): Unit = {
              (1 to 10).foreach(x => {
                Thread.sleep(100)
                println("even")
              })
              }
            }
          ).
            to("mock:output")
        }
      }





--
View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765151.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: parallelProcessing with multicast

Posted by Claus Ibsen <cl...@gmail.com>.
You need to be aware that multicast is sending a copy of the same
incoming message. So the filter is either true or false for all the
messages as they are from the same copy.

So in one case they are all odd, and in another case they are all
even. And then they process about the "same time" and the order may
appear as the same.

If you want one of them to sleep for 1 sec and not the other you need
to change your code.

On Wed, Apr 1, 2015 at 2:38 PM, Christophe Pache <ch...@gmail.com> wrote:
> Thanks Claus for your answer! I did test but did not succeed into making it
> work as I wanted to (change the order of received message). Following your
> advice, I've just added the streaming configuration to the multicast eip
> such as following, is that correct?
>
>       val croute = new org.apache.camel.builder.RouteBuilder {
>         override def configure(): Unit = {
>           from("direct:input").multicast().parallelProcessing().streaming().
>             to("direct:even", "direct:odd").end()
>
>           from("direct:odd").filter(body.isEqualTo(1)).to("mock:output")
>
>           from("direct:even").filter(body.isEqualTo(0)).
>             process(new Processor {
>             override def process(exchange: Exchange): Unit = {
>               Thread.sleep(1000)
>               }
>             }
>           ).
>             to("mock:output")
>         }
>       }
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765149.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Re: parallelProcessing with multicast

Posted by Christophe Pache <ch...@gmail.com>.
Thanks Claus for your answer! I did test but did not succeed into making it
work as I wanted to (change the order of received message). Following your
advice, I've just added the streaming configuration to the multicast eip
such as following, is that correct?

      val croute = new org.apache.camel.builder.RouteBuilder {
        override def configure(): Unit = {
          from("direct:input").multicast().parallelProcessing().streaming().
            to("direct:even", "direct:odd").end()

          from("direct:odd").filter(body.isEqualTo(1)).to("mock:output")

          from("direct:even").filter(body.isEqualTo(0)).
            process(new Processor {
            override def process(exchange: Exchange): Unit = {
              Thread.sleep(1000)
              }
            }
          ).
            to("mock:output")
        }
      }



--
View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765149.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: parallelProcessing with multicast

Posted by Claus Ibsen <cl...@gmail.com>.
You need to enable streaming mode to have out of order aggregation -
even for multicast. The work is done in parallel, but in non streaming
mode the aggregation happens in fixed order.


On Wed, Apr 1, 2015 at 1:52 PM, Christophe Pache <ch...@gmail.com> wrote:
> Hello everyone
>
> I have tried to test the parallel processing with Camel. The split was easy
> and worked out of the box. I have some surprises and I'm wondering whenever
> I'm understanding correctly the expected behaviour with the multicast. In
> the Code below, I try to send messages to two routes. The "even" route takes
> more time than the "odd" one. I would like to get the odd number I'm sending
> before the even number. I was expecting the odd number to be processed
> faster than the other, am I correct?
>
> I'm currently getting the exchange in the same order I sent, is it the
> expected behavior?
>
> Thanks and have a nice day
>
> Christophe
>
> I'm using camel 2.12:
>
>
>
>       val croute = new org.apache.camel.builder.RouteBuilder {
>         override def configure(): Unit = {
>           from("direct:input").multicast().parallelProcessing().
>             to("direct:even", "direct:odd").end()
>
>           from("direct:odd").filter(body.isEqualTo(1)).process(new Processor
> {
>             override def process(exchange: Exchange): Unit = {
>               println(exchange.getIn.getBody)
>             }
>           }).to("mock:output")
>
>           from("direct:even").filter(body.isEqualTo(0)).
>             process(new Processor {
>             override def process(exchange: Exchange): Unit = {
>               println(exchange.getIn.getBody)
>               Thread.sleep(1000)
>               //or to avoid side effects on the Thread: (1 to
> 1000000).foreach(x => scala.util.Random.nextDouble())
>               }
>             }
>           ).
>             to("mock:output")
>         }
>       }
>
> //.....
>  val producer = camelContext.createProducerTemplate()
> List(0, 1, 0, 1, 0, 1, 0, 1, 0, 1).foreach(x =>
> producer.sendBody("direct:input", x))
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/