You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Okello Nelson <cn...@gmail.com> on 2013/05/27 14:08:17 UTC

Exchanges Aggregation for batch loading to database

Hi Guys,

I have the following Java DSL:

from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100")
.routeId(fileType + "_ValidQ-To-DB")
.onException(Exception.class).redeliveryPolicyRef("redeliverypolicy")
.to(queuePrefix + fileType + "_DBLoading_ErrorQ").end()
.aggregate(header("CRBOriginalFileName"))
.completionSize(5000)
.parallelProcessing()
.groupExchanges()
.to("bean:keBouncedChequeLoader");

The destination is a bean. The bean is as shown below:

public void process(Exchange exchange) throws Exception {
List<Exchange> exchanges = exchange.getProperty(Exchange.GROUPED_EXCHANGE,
List.class);
Session session = SessionFactoryUtils.getSession(sessionFactory, true);
Transaction tx = session.beginTransaction();
 for(int i = 0; i < exchanges.size(); i++) {
KEBouncedCheque bc = (KEBouncedCheque) exchanges.get(i).getIn().getBody();
session.save(exchanges.get(i));
FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"), bc.getClientNumber()
+ "\n", true);
if( i % 20 == 0) {
session.flush();
session.clear();
}
}
 tx.commit();
session.close();
 }

The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing the
error

"Unknown message type [org.apache.activemq.command.ActiveMQMessage]"...

I'm not sure what I'm doing wrong. Any assistance will be appreciated very
much.

Kind Regards,
Okello Nelson.

Re: Exchanges Aggregation for batch loading to database

Posted by Okello Nelson <cn...@gmail.com>.
Any time, Claus.


On Tue, May 28, 2013 at 10:38 AM, Claus Ibsen <cl...@gmail.com> wrote:

> Hi
>
> Thanks for sharing your solution.
>
> On Tue, May 28, 2013 at 9:30 AM, Okello Nelson <cn...@gmail.com>
> wrote:
> > My Solution:
> > ==========
> >
> > Guys, I managed to sort out my issue in the following manner:
> >
> > I had to copy my app jar into the ActiveMQ's lib directory. This was so
> > that ActiveMQ can have access to my data transfer objects.
> >
> > Next, I had to create a custom Aggregator (look at my previous post). In
> my
> > custom aggregator, all I was doing is append to  a list the body of an
> > exchange that comes in. This body represented a business object  (model
> > class).
> >
> > Thirdly, in my route definition, in addition to having completionSize, I
> > also had to include completionTimeout. This was so that when the number
> of
> > exchanges could not reach my completionSize (when all exchanges have been
> > received and there are no more), then the only completion condition that
> > could be fulfilled is the completionTimeout.
> >
> > Lastly, in my bean, the one that does batch inserts into a PostgreSQL
> > database was just an ordinary processor, and the exchange contained a
> list
> > of my objects. Nothing like Exchange.GROUPED_EXCHANGES. Just the
> > exchange.getBody(List<MyModelClass.class>).
> >
> > Kind Regards,
> > Okello Nelson.
> >
> >
> >
> >
> > On Mon, May 27, 2013 at 5:18 PM, Okello Nelson <cn...@gmail.com>
> wrote:
> >
> >> Hi Guys,
> >>
> >> I've been trying to sort this issue. I've created a custom aggregation
> >> strategy instead of relying on the inbuilt "groupExchanges()". My
> >> aggregation strategy is shown below:
> >>
> >> @Service( value = "keAggregationStrategy" )
> >> public class KEAggregationStrategy implements IKEAggregationStrategy {
> >>
> >> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
> >>  Object newBody = newExchange.getIn().getBody();
> >> ArrayList<Object> list = null;
> >>  if(oldExchange == null) {
> >> list = new ArrayList<Object>();
> >> list.add(newBody);
> >>  newExchange.getIn().setBody(list);
> >> return newExchange;
> >>  } else {
> >> list = oldExchange.getIn().getBody(ArrayList.class);
> >> list.add(newBody);
> >>  return oldExchange;
> >> }
> >>  }
> >>
> >> }
> >>
> >> Now, when I run the app, the following exception is thrown by ActiveMQ:
> >> FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using
> embedded
> >> ActiveMQ.
> >>
> >> javax.jms.JMSException: Failed to build body from content. Serializable
> class not available to broker. Reason: java.lang.ClassNotFoundException:
> com.package.models.bc.ke.MyEntityClass
> >>
> >>
> >>
> >>
> >> On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn.okello@gmail.com
> >wrote:
> >>
> >>> Hi Guys,
> >>>
> >>> I have the following Java DSL:
> >>>
> >>> from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100")
> >>>  .routeId(fileType + "_ValidQ-To-DB")
> >>>  .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy")
> >>> .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end()
> >>>  .aggregate(header("CRBOriginalFileName"))
> >>> .completionSize(5000)
> >>> .parallelProcessing()
> >>>  .groupExchanges()
> >>> .to("bean:keBouncedChequeLoader");
> >>>
> >>> The destination is a bean. The bean is as shown below:
> >>>
> >>> public void process(Exchange exchange) throws Exception {
> >>> List<Exchange> exchanges =
> >>> exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
> >>>  Session session = SessionFactoryUtils.getSession(sessionFactory,
> true);
> >>> Transaction tx = session.beginTransaction();
> >>>  for(int i = 0; i < exchanges.size(); i++) {
> >>> KEBouncedCheque bc = (KEBouncedCheque)
> exchanges.get(i).getIn().getBody();
> >>>  session.save(exchanges.get(i));
> >>> FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"),
> >>> bc.getClientNumber() + "\n", true);
> >>>  if( i % 20 == 0) {
> >>> session.flush();
> >>> session.clear();
> >>>  }
> >>> }
> >>>  tx.commit();
> >>>  session.close();
> >>>  }
> >>>
> >>> The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing
> the
> >>> error
> >>>
> >>> "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"...
> >>>
> >>> I'm not sure what I'm doing wrong. Any assistance will be appreciated
> >>> very much.
> >>>
> >>> Kind Regards,
> >>> Okello Nelson.
> >>>
> >>
> >>
> >>
> >> --
> >> Kind Regards,
> >> Okello Nelson
> >> +254 722 137 826
> >> cn.okello@gmail.com
> >>
> >
> >
> >
> > --
> > Kind Regards,
> > Okello Nelson
> > +254 722 137 826
> > cn.okello@gmail.com
>
>
>
> --
> Claus Ibsen
> -----------------
> www.camelone.org: The open source integration conference.
>
> Red Hat, Inc.
> FuseSource is now part of Red Hat
> Email: cibsen@redhat.com
> Web: http://fusesource.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
>



-- 
Kind Regards,
Okello Nelson
+254 722 137 826
cn.okello@gmail.com

Re: Exchanges Aggregation for batch loading to database

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Thanks for sharing your solution.

On Tue, May 28, 2013 at 9:30 AM, Okello Nelson <cn...@gmail.com> wrote:
> My Solution:
> ==========
>
> Guys, I managed to sort out my issue in the following manner:
>
> I had to copy my app jar into the ActiveMQ's lib directory. This was so
> that ActiveMQ can have access to my data transfer objects.
>
> Next, I had to create a custom Aggregator (look at my previous post). In my
> custom aggregator, all I was doing is append to  a list the body of an
> exchange that comes in. This body represented a business object  (model
> class).
>
> Thirdly, in my route definition, in addition to having completionSize, I
> also had to include completionTimeout. This was so that when the number of
> exchanges could not reach my completionSize (when all exchanges have been
> received and there are no more), then the only completion condition that
> could be fulfilled is the completionTimeout.
>
> Lastly, in my bean, the one that does batch inserts into a PostgreSQL
> database was just an ordinary processor, and the exchange contained a list
> of my objects. Nothing like Exchange.GROUPED_EXCHANGES. Just the
> exchange.getBody(List<MyModelClass.class>).
>
> Kind Regards,
> Okello Nelson.
>
>
>
>
> On Mon, May 27, 2013 at 5:18 PM, Okello Nelson <cn...@gmail.com> wrote:
>
>> Hi Guys,
>>
>> I've been trying to sort this issue. I've created a custom aggregation
>> strategy instead of relying on the inbuilt "groupExchanges()". My
>> aggregation strategy is shown below:
>>
>> @Service( value = "keAggregationStrategy" )
>> public class KEAggregationStrategy implements IKEAggregationStrategy {
>>
>> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>>  Object newBody = newExchange.getIn().getBody();
>> ArrayList<Object> list = null;
>>  if(oldExchange == null) {
>> list = new ArrayList<Object>();
>> list.add(newBody);
>>  newExchange.getIn().setBody(list);
>> return newExchange;
>>  } else {
>> list = oldExchange.getIn().getBody(ArrayList.class);
>> list.add(newBody);
>>  return oldExchange;
>> }
>>  }
>>
>> }
>>
>> Now, when I run the app, the following exception is thrown by ActiveMQ:
>> FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using embedded
>> ActiveMQ.
>>
>> javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: com.package.models.bc.ke.MyEntityClass
>>
>>
>>
>>
>> On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn...@gmail.com>wrote:
>>
>>> Hi Guys,
>>>
>>> I have the following Java DSL:
>>>
>>> from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100")
>>>  .routeId(fileType + "_ValidQ-To-DB")
>>>  .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy")
>>> .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end()
>>>  .aggregate(header("CRBOriginalFileName"))
>>> .completionSize(5000)
>>> .parallelProcessing()
>>>  .groupExchanges()
>>> .to("bean:keBouncedChequeLoader");
>>>
>>> The destination is a bean. The bean is as shown below:
>>>
>>> public void process(Exchange exchange) throws Exception {
>>> List<Exchange> exchanges =
>>> exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
>>>  Session session = SessionFactoryUtils.getSession(sessionFactory, true);
>>> Transaction tx = session.beginTransaction();
>>>  for(int i = 0; i < exchanges.size(); i++) {
>>> KEBouncedCheque bc = (KEBouncedCheque) exchanges.get(i).getIn().getBody();
>>>  session.save(exchanges.get(i));
>>> FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"),
>>> bc.getClientNumber() + "\n", true);
>>>  if( i % 20 == 0) {
>>> session.flush();
>>> session.clear();
>>>  }
>>> }
>>>  tx.commit();
>>>  session.close();
>>>  }
>>>
>>> The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing the
>>> error
>>>
>>> "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"...
>>>
>>> I'm not sure what I'm doing wrong. Any assistance will be appreciated
>>> very much.
>>>
>>> Kind Regards,
>>> Okello Nelson.
>>>
>>
>>
>>
>> --
>> Kind Regards,
>> Okello Nelson
>> +254 722 137 826
>> cn.okello@gmail.com
>>
>
>
>
> --
> Kind Regards,
> Okello Nelson
> +254 722 137 826
> cn.okello@gmail.com



-- 
Claus Ibsen
-----------------
www.camelone.org: The open source integration conference.

Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cibsen@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Exchanges Aggregation for batch loading to database

Posted by Okello Nelson <cn...@gmail.com>.
My Solution:
==========

Guys, I managed to sort out my issue in the following manner:

I had to copy my app jar into the ActiveMQ's lib directory. This was so
that ActiveMQ can have access to my data transfer objects.

Next, I had to create a custom Aggregator (look at my previous post). In my
custom aggregator, all I was doing is append to  a list the body of an
exchange that comes in. This body represented a business object  (model
class).

Thirdly, in my route definition, in addition to having completionSize, I
also had to include completionTimeout. This was so that when the number of
exchanges could not reach my completionSize (when all exchanges have been
received and there are no more), then the only completion condition that
could be fulfilled is the completionTimeout.

Lastly, in my bean, the one that does batch inserts into a PostgreSQL
database was just an ordinary processor, and the exchange contained a list
of my objects. Nothing like Exchange.GROUPED_EXCHANGES. Just the
exchange.getBody(List<MyModelClass.class>).

Kind Regards,
Okello Nelson.




On Mon, May 27, 2013 at 5:18 PM, Okello Nelson <cn...@gmail.com> wrote:

> Hi Guys,
>
> I've been trying to sort this issue. I've created a custom aggregation
> strategy instead of relying on the inbuilt "groupExchanges()". My
> aggregation strategy is shown below:
>
> @Service( value = "keAggregationStrategy" )
> public class KEAggregationStrategy implements IKEAggregationStrategy {
>
> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>  Object newBody = newExchange.getIn().getBody();
> ArrayList<Object> list = null;
>  if(oldExchange == null) {
> list = new ArrayList<Object>();
> list.add(newBody);
>  newExchange.getIn().setBody(list);
> return newExchange;
>  } else {
> list = oldExchange.getIn().getBody(ArrayList.class);
> list.add(newBody);
>  return oldExchange;
> }
>  }
>
> }
>
> Now, when I run the app, the following exception is thrown by ActiveMQ:
> FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using embedded
> ActiveMQ.
>
> javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: com.package.models.bc.ke.MyEntityClass
>
>
>
>
> On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn...@gmail.com>wrote:
>
>> Hi Guys,
>>
>> I have the following Java DSL:
>>
>> from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100")
>>  .routeId(fileType + "_ValidQ-To-DB")
>>  .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy")
>> .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end()
>>  .aggregate(header("CRBOriginalFileName"))
>> .completionSize(5000)
>> .parallelProcessing()
>>  .groupExchanges()
>> .to("bean:keBouncedChequeLoader");
>>
>> The destination is a bean. The bean is as shown below:
>>
>> public void process(Exchange exchange) throws Exception {
>> List<Exchange> exchanges =
>> exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
>>  Session session = SessionFactoryUtils.getSession(sessionFactory, true);
>> Transaction tx = session.beginTransaction();
>>  for(int i = 0; i < exchanges.size(); i++) {
>> KEBouncedCheque bc = (KEBouncedCheque) exchanges.get(i).getIn().getBody();
>>  session.save(exchanges.get(i));
>> FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"),
>> bc.getClientNumber() + "\n", true);
>>  if( i % 20 == 0) {
>> session.flush();
>> session.clear();
>>  }
>> }
>>  tx.commit();
>>  session.close();
>>  }
>>
>> The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing the
>> error
>>
>> "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"...
>>
>> I'm not sure what I'm doing wrong. Any assistance will be appreciated
>> very much.
>>
>> Kind Regards,
>> Okello Nelson.
>>
>
>
>
> --
> Kind Regards,
> Okello Nelson
> +254 722 137 826
> cn.okello@gmail.com
>



-- 
Kind Regards,
Okello Nelson
+254 722 137 826
cn.okello@gmail.com

Re: Exchanges Aggregation for batch loading to database

Posted by Okello Nelson <cn...@gmail.com>.
Hi Guys,

I've been trying to sort this issue. I've created a custom aggregation
strategy instead of relying on the inbuilt "groupExchanges()". My
aggregation strategy is shown below:

@Service( value = "keAggregationStrategy" )
public class KEAggregationStrategy implements IKEAggregationStrategy {

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody();
ArrayList<Object> list = null;
 if(oldExchange == null) {
list = new ArrayList<Object>();
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
 } else {
list = oldExchange.getIn().getBody(ArrayList.class);
list.add(newBody);
return oldExchange;
}
 }

}

Now, when I run the app, the following exception is thrown by ActiveMQ:
FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using embedded
ActiveMQ.

javax.jms.JMSException: Failed to build body from content.
Serializable class not available to broker. Reason:
java.lang.ClassNotFoundException:
com.package.models.bc.ke.MyEntityClass




On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn...@gmail.com> wrote:

> Hi Guys,
>
> I have the following Java DSL:
>
> from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100")
> .routeId(fileType + "_ValidQ-To-DB")
>  .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy")
> .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end()
>  .aggregate(header("CRBOriginalFileName"))
> .completionSize(5000)
> .parallelProcessing()
>  .groupExchanges()
> .to("bean:keBouncedChequeLoader");
>
> The destination is a bean. The bean is as shown below:
>
> public void process(Exchange exchange) throws Exception {
> List<Exchange> exchanges = exchange.getProperty(Exchange.GROUPED_EXCHANGE,
> List.class);
>  Session session = SessionFactoryUtils.getSession(sessionFactory, true);
> Transaction tx = session.beginTransaction();
>  for(int i = 0; i < exchanges.size(); i++) {
> KEBouncedCheque bc = (KEBouncedCheque) exchanges.get(i).getIn().getBody();
>  session.save(exchanges.get(i));
> FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"),
> bc.getClientNumber() + "\n", true);
>  if( i % 20 == 0) {
> session.flush();
> session.clear();
>  }
> }
>  tx.commit();
>  session.close();
>  }
>
> The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing the
> error
>
> "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"...
>
> I'm not sure what I'm doing wrong. Any assistance will be appreciated very
> much.
>
> Kind Regards,
> Okello Nelson.
>



-- 
Kind Regards,
Okello Nelson
+254 722 137 826
cn.okello@gmail.com