You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by rjsteixeira <ri...@ctrler.eu> on 2013/10/14 19:39:20 UTC

Camel gradually slowing down with large sql results

Hi.

I'm seeing a constant slowdown in Camel. It starts at 600 inserts/s and
never ends because the rate drops to 5 or 6 inserts a second...


Here is a graph with part of the slowdown (as seen from RabbitMQ):
http://i.imgur.com/QAS6nwg.png
In this graph you can see the end of one run (I killed it) and the start of
another, the difference is staggering: http://i.imgur.com/dhd9GG1.png

I'm consuming from large database table with a simple query that outputs
about 400 000 lines that I then insert into a RabbitMQ queue.

The insert rate starts really high and then start to fall gradually.
I've tried running camel without inserting in the queue and there is also a
slowdown in processing.

The CPU is always maxed out with java...

>From the same machine can consume from the queue with storm at about 3000
lines/sec. Without slowdowns and with unmarshaling with XStream (8000
lines/s without unmarshaling).

Can anyone point me in the right direction to solve this?

Thanks.

#########################################################
Here is my Spring file.

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
       http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://camel.apache.org/schema/spring 
       http://camel.apache.org/schema/spring/camel-spring.xsd">

	
	<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
		destroy-method="close">
		<property name="driverClassName" value="oracle.jdbc.OracleDriver" />
		<property name="url" value="jdbc:oracle:thin:@localhost:1521:xe" />
		<property name="username" value="sys as sysdba" />
		<property name="password" value="xxxxx" />
	</bean>

	
	<bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
		<property name="dataSource" ref="dataSource" />
	</bean>

	
	<camelContext streamCache="true"
xmlns="http://camel.apache.org/schema/spring">
	
	
		
		<propertyPlaceholder id="placeholder" location="classpath:sql.properties"
/>

		
		<dataFormats>
			<xstream id="xstream-utf8" encoding="UTF-8" />
			<xstream id="xstream-default" />
		</dataFormats>
	
		
		<route id="databaseRoute">
			<from
uri="quartz2://myGroup/myTimerName?trigger.repeatInterval=20000&amp;trigger.repeatCount=0"
/>
 			<to uri="sql:{{sql.selectOrderIdMax}}" />
			<split streaming="true" parallelProcessing="true">
				<simple>${body}</simple>
				<to uri="direct:sqlqueue" />
			</split>
		</route> 

 		<route id="processOrder-route" >
			<from uri="direct:sqlqueue" />
			<process ref="lineProcessor" />
			<marshal ref="xstream-utf8" />
			<to
uri="rabbitmq://xx.x.x.x:5672/myExchange?username=xxx&amp;password=xxx" />
		</route>
		

	</camelContext>

	
	<bean id="lineProcessor"
class="eu.rteixeira.dis.cameldbtoqueue.LineProcessor" />

</beans>

#########################################################
The SQL is:
sql.selectOrderIdMax=select * from sales where integration_id = (select
max(integration_id) as max_int from sales)

#########################################################
The LineProcessor is just constructs an object for each line of the results
to the pass to the serializer. The slowdown happened without it.

/**
 * 
 */
package eu.rteixeira.dis.cameldbtoqueue;

import eu.rteixeira.dis.objmodel.LineBean;

import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;

public class LineProcessor implements Processor {
    
	public void process(Exchange exchange) throws Exception {
    	
    	Map<String, ?> row = (Map<String,?>) exchange.getIn().getBody();
    	
    	LineBean lb = new LineBean(row);
    	
    	
    	exchange.getIn().setBody(lb);
    }
}

#########################################################
The LineBean

package eu.rteixeira.dis.objmodel;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Map;

public class LineBean {
	
	Integer store;
	Integer sku;
	String ean;
	BigDecimal qty;
	BigDecimal pvp; 
	Timestamp sales_date; 
	Timestamp vdate;
	Integer physical_store;
	Integer integration_id;
	Integer group_no;
	String tran_type;
	BigDecimal sales_value;
	BigDecimal discount_qty;
	BigDecimal discount_value;
	BigDecimal discount_voucher_qty;
	BigDecimal discount_voucher_value;
	BigDecimal base_value;
	
	
	
	public LineBean() {
		super();
	}

	public LineBean(Map<String, ?> row){
		
		store = ((row.get("store") == null) ? null :
Integer.parseInt(row.get("STORE").toString()));
		sku = ((row.get("sku") == null) ? null :
Integer.parseInt(row.get("sku").toString()));
		ean = ((row.get("ean") == null) ? null : row.get("ean").toString());
		qty = ((row.get("qty")==null) ? null : new
BigDecimal(row.get("qty").toString()));
		pvp = ((row.get("pvp") == null) ? null : new
BigDecimal(row.get("pvp").toString()));
		sales_date = ((row.get("sales_date") == null) ? null :
Timestamp.valueOf(row.get("sales_date").toString()));
		vdate = ((row.get("vdate") == null) ? null :
Timestamp.valueOf(row.get("vdate").toString()));
		physical_store = ((row.get("physical_store") == null) ? null :
Integer.parseInt(row.get("physical_store").toString()));
		integration_id = ((row.get("integration_id") == null) ? null :
Integer.parseInt(row.get("integration_id").toString()));
		group_no = ((row.get("group_no") == null) ? null :
Integer.parseInt(row.get("group_no").toString()));
		tran_type = ((row.get("tran_type") == null) ? null :
row.get("tran_type").toString());
		sales_value = ((row.get("sales_value") == null) ? null : new
BigDecimal(row.get("sales_value").toString()));
		discount_qty = ((row.get("discount_qty")==null) ? null : new
BigDecimal(row.get("discount_qty").toString()));
		discount_value = ((row.get("discount_value")==null) ? null : new
BigDecimal(row.get("discount_value").toString()));
		discount_voucher_qty = ((row.get("discount_voucher_qty")==null) ? null :
new BigDecimal(row.get("discount_voucher_qty").toString()));
		discount_voucher_value = ((row.get("discount_voucher_value")==null) ? null
: new BigDecimal(row.get("discount_voucher_value").toString()));
		base_value = ((row.get("base_value")==null) ? null : new
BigDecimal(row.get("base_value").toString()));
	
	}



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel gradually slowing down with large sql results

Posted by rjsteixeira <ri...@ctrler.eu>.
Sorry.

I'm using camel 2.12.1.



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560p5741685.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel gradually slowing down with large sql results

Posted by rjsteixeira <ri...@ctrler.eu>.
I'm using camel 
I ended up using seda. 
Also stop converting to bean and using xstream as marshaller and did the
marshalling by hand directly from SQL output.

Results are fast enough. See code below.

http://i.imgur.com/jtBuMB8.png

Thank you for all the help.

@Component
public class ReadDB extends SpringRouteBuilder{
	
    @Override
    public void configure() throws Exception {
    	
       
from("quartz2://myGroup/myTimerName?trigger.repeatInterval=20000&trigger.repeatCount=0")
        .to("sql:select * from sales where integration_id = (select
max(integration_id) as max_int from sales)")
        .split(simple("${body}")).streaming()
            	.to("myLogger")
            	.to("seda:sqlqueue?concurrentConsumers=8");
    }

}


@Component
public class WriteToQueue extends SpringRouteBuilder{
	
    @Override
    public void configure() throws Exception {
        from("seda:sqlqueue?concurrentConsumers=8")
        .to("lineProcessor") 
       
.to("rabbitmq://xx.xxx.x.xx:5672/myExchange?username=xx&password=xx")
        ;
    }

}





--
View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560p5741643.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel gradually slowing down with large sql results

Posted by Claus Ibsen <cl...@gmail.com>.
Btw what Camel version do you use?

And if you turn off parallel on your current Camel version does it
work better? (maybe slower but no leaks)

And as usual you can always upgrade Camel if you are not on latest release.

On Tue, Oct 15, 2013 at 12:12 PM, rjsteixeira <ri...@ctrler.eu> wrote:
> Hi.
>
> This is what happens when I run it without sending it to rabbit:
>
> http://i.imgur.com/CkbMJLK.png
>
> http://i.imgur.com/RfQ9Yw6.png
>
> The speed is improved. But I still think rabbitmq is not the issue. Maybe
> rabbitmq just aggravates it?
>
> I'm thinking that the problem is trying to do the splitting with
> parallelProcessing. What about all those objects in memory?
>
> Thanks.
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560p5741590.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Camel gradually slowing down with large sql results

Posted by rjsteixeira <ri...@ctrler.eu>.
This is strange.

Upping the max memory to 3GB and changing from direct to seda with 4 paralel
consumers brought significant improvements (5x as fast), but it seems that
some messages got stuck and are sent only after a while:

First: http://i.imgur.com/oSmtZMP.png
And after a while:  http://i.imgur.com/hGh6HR2.png

That dip is really strange...

The VisualVM output: http://i.imgur.com/PsQlo1j.png

Any thoughts or help as to get this running more smoothly?









Code:
##############################################

@Component
public class ReadDB extends SpringRouteBuilder{
	
    @Override
    public void configure() throws Exception {
    	
       
from("quartz2://myGroup/myTimerName?trigger.repeatInterval=20000&trigger.repeatCount=0")
        .to("sql:select * from sales where integration_id = (select
max(integration_id) as max_int from sales)")
        .split(simple("${body}")).streaming()
            	.to("myLogger")
            	.to("seda:sqlqueue?concurrentConsumers=4");
    }

}


@Component
public class WriteToQueue extends SpringRouteBuilder{
	
    @Override
    public void configure() throws Exception {
    	
        from("seda:sqlqueue?concurrentConsumers=4")
        //.threads(4)
        .to("lineProcessor")
        .marshal("xstream-utf8")      
       
.to("rabbitmq://xx.xx.x.x.:5672/myExchange?username=xxx&password=xxx")
        ;
    }

}





--
View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560p5741595.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel gradually slowing down with large sql results

Posted by rjsteixeira <ri...@ctrler.eu>.
Got it running better by setting the split operation not parallel and by
threading on the split results.

Here is the output from VisualVM: http://i.imgur.com/m4jFQdK.png

Here is the output from Camel: http://i.imgur.com/1BZWl4b.png.
The test starts at 10:55. The previous one is from the other test, with no
.threads().

As you can see, it is much faster but it still drops. I don't know if a
larger result set won't stop it because that slope tends to zero...

I think the problem is that I'm creating a whole lot of objects just to
create the XML. Maybe I should create the XML directly by string
concatenation without creating the objects first.

Maven Ran camel:run with four threads.

BTW, I changed from SpringDSL to JavaDSL.


@Component
public class ReadDB extends SpringRouteBuilder{
	
    @Override
    public void configure() throws Exception {
    	
       
from("quartz2://myGroup/myTimerName?trigger.repeatInterval=20000&trigger.repeatCount=0")
        .to("sql:select * from sales where integration_id = (select
max(integration_id) as max_int from sales)")
        .split(simple("${body}")).streaming()
            	.to("myLogger")
            	.to("direct:sqlqueue");
    }

}

public class WriteToQueue extends SpringRouteBuilder{
	
    @Override
    public void configure() throws Exception {  	
        from("direct:sqlqueue")
        .threads(4)
        .to("lineProcessor")
        .marshal("xstream-utf8")      
       
.to("rabbitmq://xx.xx.xx.xx:5672/myExchange?username=xxxx&password=xxxx")
        ;
    }

}




--
View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560p5741594.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel gradually slowing down with large sql results

Posted by rjsteixeira <ri...@ctrler.eu>.
Hi.

This is what happens when I run it without sending it to rabbit:

http://i.imgur.com/CkbMJLK.png

http://i.imgur.com/RfQ9Yw6.png

The speed is improved. But I still think rabbitmq is not the issue. Maybe
rabbitmq just aggravates it?

I'm thinking that the problem is trying to do the splitting with
parallelProcessing. What about all those objects in memory?

Thanks.



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560p5741590.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel gradually slowing down with large sql results

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Have you see if the JVM runs out of memory? If you connect with
jconsole / jvsiaulvm it can show the memory stats / history over time.

Also you can maybe try to just send the message to "log:rabbit" or to
not send to rabbit, just to see if that runs steady. And run for a
while to see if the performance / memory is stable.

Then we would know if its the camel-rabbit component that is likely a problem.



On Mon, Oct 14, 2013 at 7:39 PM, rjsteixeira <ri...@ctrler.eu> wrote:
> Hi.
>
> I'm seeing a constant slowdown in Camel. It starts at 600 inserts/s and
> never ends because the rate drops to 5 or 6 inserts a second...
>
>
> Here is a graph with part of the slowdown (as seen from RabbitMQ):
> http://i.imgur.com/QAS6nwg.png
> In this graph you can see the end of one run (I killed it) and the start of
> another, the difference is staggering: http://i.imgur.com/dhd9GG1.png
>
> I'm consuming from large database table with a simple query that outputs
> about 400 000 lines that I then insert into a RabbitMQ queue.
>
> The insert rate starts really high and then start to fall gradually.
> I've tried running camel without inserting in the queue and there is also a
> slowdown in processing.
>
> The CPU is always maxed out with java...
>
> From the same machine can consume from the queue with storm at about 3000
> lines/sec. Without slowdowns and with unmarshaling with XStream (8000
> lines/s without unmarshaling).
>
> Can anyone point me in the right direction to solve this?
>
> Thanks.
>
> #########################################################
> Here is my Spring file.
>
> <?xml version="1.0" encoding="UTF-8"?>
>
> <beans xmlns="http://www.springframework.org/schema/beans"
>         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>         xsi:schemaLocation="
>        http://www.springframework.org/schema/beans
>        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
>        http://camel.apache.org/schema/spring
>        http://camel.apache.org/schema/spring/camel-spring.xsd">
>
>
>         <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
>                 destroy-method="close">
>                 <property name="driverClassName" value="oracle.jdbc.OracleDriver" />
>                 <property name="url" value="jdbc:oracle:thin:@localhost:1521:xe" />
>                 <property name="username" value="sys as sysdba" />
>                 <property name="password" value="xxxxx" />
>         </bean>
>
>
>         <bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
>                 <property name="dataSource" ref="dataSource" />
>         </bean>
>
>
>         <camelContext streamCache="true"
> xmlns="http://camel.apache.org/schema/spring">
>
>
>
>                 <propertyPlaceholder id="placeholder" location="classpath:sql.properties"
> />
>
>
>                 <dataFormats>
>                         <xstream id="xstream-utf8" encoding="UTF-8" />
>                         <xstream id="xstream-default" />
>                 </dataFormats>
>
>
>                 <route id="databaseRoute">
>                         <from
> uri="quartz2://myGroup/myTimerName?trigger.repeatInterval=20000&amp;trigger.repeatCount=0"
> />
>                         <to uri="sql:{{sql.selectOrderIdMax}}" />
>                         <split streaming="true" parallelProcessing="true">
>                                 <simple>${body}</simple>
>                                 <to uri="direct:sqlqueue" />
>                         </split>
>                 </route>
>
>                 <route id="processOrder-route" >
>                         <from uri="direct:sqlqueue" />
>                         <process ref="lineProcessor" />
>                         <marshal ref="xstream-utf8" />
>                         <to
> uri="rabbitmq://xx.x.x.x:5672/myExchange?username=xxx&amp;password=xxx" />
>                 </route>
>
>
>         </camelContext>
>
>
>         <bean id="lineProcessor"
> class="eu.rteixeira.dis.cameldbtoqueue.LineProcessor" />
>
> </beans>
>
> #########################################################
> The SQL is:
> sql.selectOrderIdMax=select * from sales where integration_id = (select
> max(integration_id) as max_int from sales)
>
> #########################################################
> The LineProcessor is just constructs an object for each line of the results
> to the pass to the serializer. The slowdown happened without it.
>
> /**
>  *
>  */
> package eu.rteixeira.dis.cameldbtoqueue;
>
> import eu.rteixeira.dis.objmodel.LineBean;
>
> import java.util.Map;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
>
> public class LineProcessor implements Processor {
>
>         public void process(Exchange exchange) throws Exception {
>
>         Map<String, ?> row = (Map<String,?>) exchange.getIn().getBody();
>
>         LineBean lb = new LineBean(row);
>
>
>         exchange.getIn().setBody(lb);
>     }
> }
>
> #########################################################
> The LineBean
>
> package eu.rteixeira.dis.objmodel;
>
> import java.math.BigDecimal;
> import java.sql.Timestamp;
> import java.util.Map;
>
> public class LineBean {
>
>         Integer store;
>         Integer sku;
>         String ean;
>         BigDecimal qty;
>         BigDecimal pvp;
>         Timestamp sales_date;
>         Timestamp vdate;
>         Integer physical_store;
>         Integer integration_id;
>         Integer group_no;
>         String tran_type;
>         BigDecimal sales_value;
>         BigDecimal discount_qty;
>         BigDecimal discount_value;
>         BigDecimal discount_voucher_qty;
>         BigDecimal discount_voucher_value;
>         BigDecimal base_value;
>
>
>
>         public LineBean() {
>                 super();
>         }
>
>         public LineBean(Map<String, ?> row){
>
>                 store = ((row.get("store") == null) ? null :
> Integer.parseInt(row.get("STORE").toString()));
>                 sku = ((row.get("sku") == null) ? null :
> Integer.parseInt(row.get("sku").toString()));
>                 ean = ((row.get("ean") == null) ? null : row.get("ean").toString());
>                 qty = ((row.get("qty")==null) ? null : new
> BigDecimal(row.get("qty").toString()));
>                 pvp = ((row.get("pvp") == null) ? null : new
> BigDecimal(row.get("pvp").toString()));
>                 sales_date = ((row.get("sales_date") == null) ? null :
> Timestamp.valueOf(row.get("sales_date").toString()));
>                 vdate = ((row.get("vdate") == null) ? null :
> Timestamp.valueOf(row.get("vdate").toString()));
>                 physical_store = ((row.get("physical_store") == null) ? null :
> Integer.parseInt(row.get("physical_store").toString()));
>                 integration_id = ((row.get("integration_id") == null) ? null :
> Integer.parseInt(row.get("integration_id").toString()));
>                 group_no = ((row.get("group_no") == null) ? null :
> Integer.parseInt(row.get("group_no").toString()));
>                 tran_type = ((row.get("tran_type") == null) ? null :
> row.get("tran_type").toString());
>                 sales_value = ((row.get("sales_value") == null) ? null : new
> BigDecimal(row.get("sales_value").toString()));
>                 discount_qty = ((row.get("discount_qty")==null) ? null : new
> BigDecimal(row.get("discount_qty").toString()));
>                 discount_value = ((row.get("discount_value")==null) ? null : new
> BigDecimal(row.get("discount_value").toString()));
>                 discount_voucher_qty = ((row.get("discount_voucher_qty")==null) ? null :
> new BigDecimal(row.get("discount_voucher_qty").toString()));
>                 discount_voucher_value = ((row.get("discount_voucher_value")==null) ? null
> : new BigDecimal(row.get("discount_voucher_value").toString()));
>                 base_value = ((row.get("base_value")==null) ? null : new
> BigDecimal(row.get("base_value").toString()));
>
>         }
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Camel gradually slowing down with large sql results

Posted by Taariq Levack <ta...@gmail.com>.
Ah yes,  I see that.
I use JProfiler but it's not free. I've also found Eclipse Memory Analyzer very useful, so you might try JVM monitor.
VisualVM will also be enough, for now you're just trying to see what's getting the most CPU time since that's maxed out when things slow down.

What happens when you give the heap more memory, does it take longer to slow down?


> On 14 Oct 2013, at 22:12, rjsteixeira <ri...@ctrler.eu> wrote:
> 
> I'm not using the stateful option right now because the timer is set to run
> only once (for testing reasons).
> 
> Any recomendations for a profiler?
> 
> I'm at home right now but I'm thinking of using VisualVM tomorrow.
> 
> Thanks.
> 
> 
> 
> --
> View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560p5741565.html
> Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel gradually slowing down with large sql results

Posted by rjsteixeira <ri...@ctrler.eu>.
I'm not using the stateful option right now because the timer is set to run
only once (for testing reasons).

Any recomendations for a profiler?

I'm at home right now but I'm thinking of using VisualVM tomorrow.

Thanks.



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560p5741565.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel gradually slowing down with large sql results

Posted by Taariq Levack <ta...@gmail.com>.
Hi

Try using the stateful option for your timer, which will disable concurrent executions. I think you're running that process every 20 seconds, which could keep the garbage collector very busy.

Just a guess after glancing at it, but give it a try.
If it's not that, run it through a profiler and let us know what the cpu's doing.

Taariq

> On 14 Oct 2013, at 19:39, rjsteixeira <ri...@ctrler.eu> wrote:
> 
> Hi.
> 
> I'm seeing a constant slowdown in Camel. It starts at 600 inserts/s and
> never ends because the rate drops to 5 or 6 inserts a second...
> 
> 
> Here is a graph with part of the slowdown (as seen from RabbitMQ):
> http://i.imgur.com/QAS6nwg.png
> In this graph you can see the end of one run (I killed it) and the start of
> another, the difference is staggering: http://i.imgur.com/dhd9GG1.png
> 
> I'm consuming from large database table with a simple query that outputs
> about 400 000 lines that I then insert into a RabbitMQ queue.
> 
> The insert rate starts really high and then start to fall gradually.
> I've tried running camel without inserting in the queue and there is also a
> slowdown in processing.
> 
> The CPU is always maxed out with java...
> 
> From the same machine can consume from the queue with storm at about 3000
> lines/sec. Without slowdowns and with unmarshaling with XStream (8000
> lines/s without unmarshaling).
> 
> Can anyone point me in the right direction to solve this?
> 
> Thanks.
> 
> #########################################################
> Here is my Spring file.
> 
> <?xml version="1.0" encoding="UTF-8"?>
> 
> <beans xmlns="http://www.springframework.org/schema/beans"
>    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>    xsi:schemaLocation="
>       http://www.springframework.org/schema/beans 
>       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
>       http://camel.apache.org/schema/spring 
>       http://camel.apache.org/schema/spring/camel-spring.xsd">
> 
>    
>    <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
>        destroy-method="close">
>        <property name="driverClassName" value="oracle.jdbc.OracleDriver" />
>        <property name="url" value="jdbc:oracle:thin:@localhost:1521:xe" />
>        <property name="username" value="sys as sysdba" />
>        <property name="password" value="xxxxx" />
>    </bean>
> 
>    
>    <bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
>        <property name="dataSource" ref="dataSource" />
>    </bean>
> 
>    
>    <camelContext streamCache="true"
> xmlns="http://camel.apache.org/schema/spring">
>    
>    
>        
>        <propertyPlaceholder id="placeholder" location="classpath:sql.properties"
> />
> 
>        
>        <dataFormats>
>            <xstream id="xstream-utf8" encoding="UTF-8" />
>            <xstream id="xstream-default" />
>        </dataFormats>
>    
>        
>        <route id="databaseRoute">
>            <from
> uri="quartz2://myGroup/myTimerName?trigger.repeatInterval=20000&amp;trigger.repeatCount=0"
> />
>            <to uri="sql:{{sql.selectOrderIdMax}}" />
>            <split streaming="true" parallelProcessing="true">
>                <simple>${body}</simple>
>                <to uri="direct:sqlqueue" />
>            </split>
>        </route> 
> 
>        <route id="processOrder-route" >
>            <from uri="direct:sqlqueue" />
>            <process ref="lineProcessor" />
>            <marshal ref="xstream-utf8" />
>            <to
> uri="rabbitmq://xx.x.x.x:5672/myExchange?username=xxx&amp;password=xxx" />
>        </route>
>        
> 
>    </camelContext>
> 
>    
>    <bean id="lineProcessor"
> class="eu.rteixeira.dis.cameldbtoqueue.LineProcessor" />
> 
> </beans>
> 
> #########################################################
> The SQL is:
> sql.selectOrderIdMax=select * from sales where integration_id = (select
> max(integration_id) as max_int from sales)
> 
> #########################################################
> The LineProcessor is just constructs an object for each line of the results
> to the pass to the serializer. The slowdown happened without it.
> 
> /**
> * 
> */
> package eu.rteixeira.dis.cameldbtoqueue;
> 
> import eu.rteixeira.dis.objmodel.LineBean;
> 
> import java.util.Map;
> 
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> 
> public class LineProcessor implements Processor {
> 
>    public void process(Exchange exchange) throws Exception {
>        
>        Map<String, ?> row = (Map<String,?>) exchange.getIn().getBody();
>        
>        LineBean lb = new LineBean(row);
>        
>        
>        exchange.getIn().setBody(lb);
>    }
> }
> 
> #########################################################
> The LineBean
> 
> package eu.rteixeira.dis.objmodel;
> 
> import java.math.BigDecimal;
> import java.sql.Timestamp;
> import java.util.Map;
> 
> public class LineBean {
>    
>    Integer store;
>    Integer sku;
>    String ean;
>    BigDecimal qty;
>    BigDecimal pvp; 
>    Timestamp sales_date; 
>    Timestamp vdate;
>    Integer physical_store;
>    Integer integration_id;
>    Integer group_no;
>    String tran_type;
>    BigDecimal sales_value;
>    BigDecimal discount_qty;
>    BigDecimal discount_value;
>    BigDecimal discount_voucher_qty;
>    BigDecimal discount_voucher_value;
>    BigDecimal base_value;
>    
>    
>    
>    public LineBean() {
>        super();
>    }
> 
>    public LineBean(Map<String, ?> row){
>        
>        store = ((row.get("store") == null) ? null :
> Integer.parseInt(row.get("STORE").toString()));
>        sku = ((row.get("sku") == null) ? null :
> Integer.parseInt(row.get("sku").toString()));
>        ean = ((row.get("ean") == null) ? null : row.get("ean").toString());
>        qty = ((row.get("qty")==null) ? null : new
> BigDecimal(row.get("qty").toString()));
>        pvp = ((row.get("pvp") == null) ? null : new
> BigDecimal(row.get("pvp").toString()));
>        sales_date = ((row.get("sales_date") == null) ? null :
> Timestamp.valueOf(row.get("sales_date").toString()));
>        vdate = ((row.get("vdate") == null) ? null :
> Timestamp.valueOf(row.get("vdate").toString()));
>        physical_store = ((row.get("physical_store") == null) ? null :
> Integer.parseInt(row.get("physical_store").toString()));
>        integration_id = ((row.get("integration_id") == null) ? null :
> Integer.parseInt(row.get("integration_id").toString()));
>        group_no = ((row.get("group_no") == null) ? null :
> Integer.parseInt(row.get("group_no").toString()));
>        tran_type = ((row.get("tran_type") == null) ? null :
> row.get("tran_type").toString());
>        sales_value = ((row.get("sales_value") == null) ? null : new
> BigDecimal(row.get("sales_value").toString()));
>        discount_qty = ((row.get("discount_qty")==null) ? null : new
> BigDecimal(row.get("discount_qty").toString()));
>        discount_value = ((row.get("discount_value")==null) ? null : new
> BigDecimal(row.get("discount_value").toString()));
>        discount_voucher_qty = ((row.get("discount_voucher_qty")==null) ? null :
> new BigDecimal(row.get("discount_voucher_qty").toString()));
>        discount_voucher_value = ((row.get("discount_voucher_value")==null) ? null
> : new BigDecimal(row.get("discount_voucher_value").toString()));
>        base_value = ((row.get("base_value")==null) ? null : new
> BigDecimal(row.get("base_value").toString()));
>    
>    }
> 
> 
> 
> --
> View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560.html
> Sent from the Camel - Users mailing list archive at Nabble.com.