You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by codentravel <sh...@gmail.com> on 2017/05/04 06:11:20 UTC

Sometimes the data in body is null..

I have the following route strategy- JAVA DSL

restConfiguration().component("restlet")
                    .componentProperty("maxQueued", "-1")
                    .componentProperty("persistingConnections", "true")
                    .componentProperty("pipeliningConnections", "true")
                   
.host("localhost").port(Constants.ROUTINGPORT).bindingMode(RestBindingMode.off);
            // ... tell Camel what api to respond to and where to deliver
the incoming messages, ...
            rest("api/v1").
                    post("/dc").
                    consumes("application/octet-stream").
                    route().routeId("DC Inlet").
                    to("seda:aggregate?waitForTaskToComplete=Never");
            // ... aggregate the messages in to batches of 'completionSize',
...
            from("seda:aggregate").
                    routeId("Message Aggregation").
                    setHeader("id", constant("n/a")).
                    aggregate(header("id"), new MessageAggregation()).
                    completionSize(Constants.BATCHSIZE).
                    to("seda:process?waitForTaskToComplete=Never");

            // ... enrich the resulting message with meta data ...
            from("seda:process").
                    routeId("Data Enricher").
                    process(new Enricher("1234")).
                    to("seda:server?waitForTaskToComplete=Always");

            // ... and put it on messaging bus--asynchronous bus.
            from("seda:server").
                    routeId("Messenger").
                    process(new Messenger(m_communicator)); 


*in the messageAgregation class,*

if (oldExchange == null) {
            byte[] newData = newExchange.getIn().getBody(byte[].class);
            newExchange.getIn().setBody(newData, byte[].class);
            if (newData.length == 0) droppedCounter++;
            return newExchange;
        }

        byte[] oldData = oldExchange.getIn().getBody(byte[].class);
        byte[] newData = newExchange.getIn().getBody(byte[].class);
if (newData.length == 0) {
            droppedCounter++;
  logger.error("-----------------Message drop detected -----------------");
        }

        //create new aggregated data
        byte[] aggregatedData = join(oldData, newData);
        oldExchange.getIn().setBody(aggregatedData, byte[].class);

     
        return oldExchange;

*Context Initialization code:*
CamelContext context = new DefaultCamelContext();
 CamelContextNameStrategy name = new
ExplicitCamelContextNameStrategy("DCContext");
            context.setNameStrategy(name);
            context.disableJMX();
            context.setAllowUseOriginalMessage(false);
            context.setStreamCaching(false);
            context.addRoutes(new DataRoute());

			//I FEEL THESE ARE NOT REQUIRED
            ExecutorServiceManager exec
=context.getExecutorServiceManager();
           ThreadPoolProfile thread = exec.getDefaultThreadPoolProfile();]
            thread.setMaxPoolSize(200);
            thread.setPoolSize(100);
            thread.setMaxQueueSize(9000);
            ExecutorServiceManager exec1
=context.getExecutorServiceManager();
            ThreadPoolProfile thread1= exec1.getDefaultThreadPoolProfile();

            
            context.start();



I sometimes see message drop detected log whn i keep for a long run (like
50000+ messages in an hour). What can cause this issue ? and how do i debug
where is the problem.
Also can you suggest if something is wrong with my routing strategy?




--
View this message in context: http://camel.465427.n5.nabble.com/Sometimes-the-data-in-body-is-null-tp5798624.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Sometimes the data in body is null..

Posted by codentravel <sh...@gmail.com>.
Thanks for the suggestion. yeah currently i am trying to log all the details
.. I suspect network connection .

I need to handle 10 messages(~600 byte) /s received on the rest endpoint, 
aggregate these in batches 100 then, add some metadata and publish on a
message bus. This will have to run 24/7 .

This module will run as a plugin and will be loaded/initialized in the main
application.
This should work on a basic hardware with Linux(something like a
beaglebone/pi).  


Currently the CPU usage (<40% during peak time) and memory looks fine so i
am ok with default threadpool ..  There was some lag earlier but disabling
the streamcaching helped.
But i am not too sure what changes can be made on that.





--
View this message in context: http://camel.465427.n5.nabble.com/Sometimes-the-data-in-body-is-null-tp5798624p5798630.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Sometimes the data in body is null..

Posted by souciance <so...@gmail.com>.
I guess it could be network errors or something else that takes time? Maybe
add more logging such as log the the headers, properties, timestamp or the
old exchange, to give you a clue why this exchange body was empty. Maybe
some other business logic that caused it?

What is it you want to accomplish as part of your routing strategy?


On Thu, May 4, 2017 at 8:11 AM, codentravel [via Camel] <
ml+s465427n5798624h7@n5.nabble.com> wrote:

> I have the following route strategy- JAVA DSL
>
> restConfiguration().component("restlet")
>                     .componentProperty("maxQueued", "-1")
>                     .componentProperty("persistingConnections", "true")
>                     .componentProperty("pipeliningConnections", "true")
>                     .host("localhost").port(Constants.ROUTINGPORT).
> bindingMode(RestBindingMode.off);
>             // ... tell Camel what api to respond to and where to deliver
> the incoming messages, ...
>             rest("api/v1").
>                     post("/dc").
>                     consumes("application/octet-stream").
>                     route().routeId("DC Inlet").
>                     to("seda:aggregate?waitForTaskToComplete=Never");
>             // ... aggregate the messages in to batches of
> 'completionSize', ...
>             from("seda:aggregate").
>                     routeId("Message Aggregation").
>                     setHeader("id", constant("n/a")).
>                     aggregate(header("id"), new MessageAggregation()).
>                     completionSize(Constants.BATCHSIZE).
>                     to("seda:process?waitForTaskToComplete=Never");
>
>             // ... enrich the resulting message with meta data ...
>             from("seda:process").
>                     routeId("Data Enricher").
>                     process(new Enricher("1234")).
>                     to("seda:server?waitForTaskToComplete=Always");
>
>             // ... and put it on messaging bus--asynchronous bus.
>             from("seda:server").
>                     routeId("Messenger").
>                     process(new Messenger(m_communicator));
>
>
> *in the messageAgregation class,*
>
> if (oldExchange == null) {
>             byte[] newData = newExchange.getIn().getBody(byte[].class);
>             newExchange.getIn().setBody(newData, byte[].class);
>             if (newData.length == 0) droppedCounter++;
>             return newExchange;
>         }
>
>         byte[] oldData = oldExchange.getIn().getBody(byte[].class);
>         byte[] newData = newExchange.getIn().getBody(byte[].class);
> if (newData.length == 0) {
>             droppedCounter++;
>   logger.error("-----------------Message drop detected
> -----------------");
>         }
>
>         //create new aggregated data
>         byte[] aggregatedData = join(oldData, newData);
>         oldExchange.getIn().setBody(aggregatedData, byte[].class);
>
>
>         return oldExchange;
>
> *Context Initialization code:*
> CamelContext context = new DefaultCamelContext();
>  CamelContextNameStrategy name = new ExplicitCamelContextNameStrategy("DCContext");
>
>             context.setNameStrategy(name);
>             context.disableJMX();
>             context.setAllowUseOriginalMessage(false);
>             context.setStreamCaching(false);
>             context.addRoutes(new DataRoute());
>
>                         //I FEEL THESE ARE NOT REQUIRED
>             ExecutorServiceManager exec =context.getExecutorServiceManager();
>
>            ThreadPoolProfile thread = exec.getDefaultThreadPoolProfile();]
>
>             thread.setMaxPoolSize(200);
>             thread.setPoolSize(100);
>             thread.setMaxQueueSize(9000);
>             ExecutorServiceManager exec1 =context.getExecutorServiceManager();
>
>             ThreadPoolProfile thread1= exec1.getDefaultThreadPoolProfile();
>
>
>
>             context.start();
>
>
>
> I sometimes see message drop detected log whn i keep for a long run (like
> 50000+ messages in an hour). What can cause this issue ? and how do i debug
> where is the problem.
> Also can you suggest if something is wrong with my routing strategy?
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://camel.465427.n5.nabble.com/Sometimes-the-data-in-
> body-is-null-tp5798624.html
> To start a new topic under Camel - Users, email ml+s465427n465428h31@n5.
> nabble.com
> To unsubscribe from Camel - Users, click here
> <http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=465428&code=c291Y2lhbmNlLmVxZGFtLnJhc2h0aUBnbWFpbC5jb218NDY1NDI4fDE1MzI5MTE2NTY=>
> .
> NAML
> <http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://camel.465427.n5.nabble.com/Sometimes-the-data-in-body-is-null-tp5798624p5798626.html
Sent from the Camel - Users mailing list archive at Nabble.com.