You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by cristisor <cr...@yahoo.com> on 2013/07/11 13:42:12 UTC

Exceptions when aggregating messages

Hello,

I'm trying to aggregate a bunch of messages, each containing a String as
body, the In message from each exchange that comes to the aggregator, so
that I can send only one exchange containing the list of strings to the next
route in the end.

If I set the body as a List<org.apache.camel.Message> the send operation
fails with the following:
NotSerializableException: org.apache.camel.impl.DefaultMessage

This is understandable since Message is not serializable, but then I tried
aggregating a list List<MessageBean> where MessageBean implements the
Serializable interface and contains the body(String), the
header(CaseInsensitiveMap) and properties(CaseInsensitiveMap). This time the
exception is different:
NotSerializableException: org.apache.camel.component.file.GenericFile

Can someone spot the problem? I could aggregated the complete exchanges and
send them, but isn't the overhead too big?



--
View this message in context: http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Exceptions when aggregating messages

Posted by cristisor <cr...@yahoo.com>.
Thanks for making things more clear. I read the documentation and I
understand your point.



--
View this message in context: http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523p5735830.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Exceptions when aggregating messages

Posted by Christian Müller <ch...@gmail.com>.
Check out the Camel JMS page. Look for the transferExchane option.

Best,
Christian
Am 17.07.2013 07:19 schrieb "cristisor" <cr...@yahoo.com>:

> I'm afraid that I don't understand which option you are talking about. And
> how can I "care about the the property value which could not be
> serialized",
> please explain in more details if possible?
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523p5735765.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Re: Exceptions when aggregating messages

Posted by cristisor <cr...@yahoo.com>.
I'm afraid that I don't understand which option you are talking about. And
how can I "care about the the property value which could not be serialized",
please explain in more details if possible?

Thanks.



--
View this message in context: http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523p5735765.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Exceptions when aggregating messages

Posted by Willem jiang <wi...@gmail.com>.
Did you enable the option to let camel-jms serialize the exchange?
You need to care about the the property value which could not be serialized if that option is turned on.


--  
Willem Jiang

Red Hat, Inc.
FuseSource is now part of Red Hat
Web: http://www.fusesource.com | http://www.redhat.com
Blog: http://willemjiang.blogspot.com (http://willemjiang.blogspot.com/) (English)
          http://jnn.iteye.com (http://jnn.javaeye.com/) (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem





On Tuesday, July 16, 2013 at 10:07 PM, cristisor wrote:

> It seems that I was trying to serialize the exchange properties, which is
> nonsense because the exchange that arrives on the new route will contain
> it's own properties, and everything was failing probably when trying to
> serialize the following property:
> *CamelFileExchangeFile=GenericFile[path\filename.txt]*
>  
> I don't understand why this was happening but I will only send the
> information that I need instead of sending everything.
>  
>  
>  
> --
> View this message in context: http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523p5735728.html
> Sent from the Camel - Users mailing list archive at Nabble.com (http://Nabble.com).




Re: Exceptions when aggregating messages

Posted by cristisor <cr...@yahoo.com>.
It seems that I was trying to serialize the exchange properties, which is
nonsense because the exchange that arrives on the new route will contain
it's own properties, and everything was failing probably when trying to
serialize the following property:
*CamelFileExchangeFile=GenericFile[path\filename.txt]*

I don't understand why this was happening but I will only send the
information that I need instead of sending everything.



--
View this message in context: http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523p5735728.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Exceptions when aggregating messages

Posted by cristisor <cr...@yahoo.com>.
Hello and thank you for your reply.


This is the code for aggregating the whole camel message:
    class ObjectsAggregationStrategy implements AggregationStrategy {

        final Logger logger =
Logger.getLogger(ObjectsAggregationStrategy.class);

        @SuppressWarnings("unchecked")
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
            List<Message> items;
            if (oldExchange == null) {
                items = new ArrayList<Message>();
                items.add(newExchange.getIn());
                Exchange copyExchange = newExchange.copy();
                copyExchange.getIn().setBody(items);
                return copyExchange;
            }

            items = oldExchange.getIn().getBody(List.class);
            Message newItem = newExchange.getIn();
            items.add(newItem);
            oldExchange.getIn().setBody(items);

            Exchange finalExchange = updateInMessageHeaders(oldExchange,
newExchange);
            finalExchange = updateExchangeProperties(oldExchange,
newExchange);

            return finalExchange;
        }

        private Exchange updateExchangeProperties(Exchange oldExchange,
Exchange newExchange) {
            CaseInsensitiveMap properties = new
CaseInsensitiveMap(newExchange.getProperties());

            for (Entry<String, Object> entry : properties.entrySet()) {
                oldExchange.setProperty(entry.getKey(), entry.getValue());
            }

            return oldExchange;
        }

        private Exchange updateInMessageHeaders(Exchange oldExchange,
Exchange newExchange) {
            CaseInsensitiveMap headers = new
CaseInsensitiveMap(newExchange.getIn().getHeaders());

            for (Entry<String, Object> entry : headers.entrySet()) {
                if
(entry.getKey().equals(MessageConstant.ERES_FILE_COMPLETE) &&
entry.getValue().equals(Boolean.TRUE)) {
                    logger.debug("Received and set the ERES_FILE_COMPLETE
header from routeId["
                            + oldExchange.getFromRouteId() + "].");
                }
                oldExchange.getIn().setHeader(entry.getKey(),
entry.getValue());
            }

            return oldExchange;
        }
    }


And this is the code for aggregating a serializable custom bean:
public class MessageAggregationStrategy<T> implements AggregationStrategy {
    final Logger logger =
Logger.getLogger(MessageAggregationStrategy.class);

    @SuppressWarnings("unchecked")
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        MessageBean<T> bean = setExchangeData(newExchange);
        List<MessageBean&lt;T>> items;
        if (oldExchange == null) {
            items = new ArrayList<MessageBean&lt;T>>();
            items.add(bean);
            Exchange copyExchange = newExchange.copy();
            copyExchange.getIn().setBody(items);
            return copyExchange;
        }

        items = oldExchange.getIn().getBody(List.class);
        items.add(bean);
        oldExchange.getIn().setBody(items);

        return oldExchange;
    }

    @SuppressWarnings("unchecked")
    private MessageBean<T> setExchangeData(Exchange exchange) {
        MessageBean<T> bean = new MessageBean<T>();
        bean.setBody((T) exchange.getIn().getBody());
        bean.setHeaders(new
CaseInsensitiveMap(exchange.getIn().getHeaders()));
        bean.setProperties(new
CaseInsensitiveMap(exchange.getProperties()));
        return bean;
    }


where MessageBean is:
public class MessageBean<T> implements Serializable {
    private static final long serialVersionUID = -987592973477513095L;
    private T body;
    private CaseInsensitiveMap properties;
    private CaseInsensitiveMap headers;

    public T getBody() {
        return body;
    }

    public CaseInsensitiveMap getProperties() {
        return properties;
    }

    public CaseInsensitiveMap getHeaders() {
        return headers;
    }

    public void setBody(T body) {
        this.body = body;
    }

    public void setProperties(CaseInsensitiveMap properties) {
        this.properties = properties;
    }

    public void setHeaders(CaseInsensitiveMap headers) {
        this.headers = headers;
    }
}




--
View this message in context: http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523p5735560.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Exceptions when aggregating messages

Posted by Christian Posta <ch...@gmail.com>.
Post your code if you could, and I can help take a look.


On Thu, Jul 11, 2013 at 7:42 AM, cristisor <cr...@yahoo.com> wrote:

> Hello,
>
> I'm trying to aggregate a bunch of messages, each containing a String as
> body, the In message from each exchange that comes to the aggregator, so
> that I can send only one exchange containing the list of strings to the
> next
> route in the end.
>
> If I set the body as a List<org.apache.camel.Message> the send operation
> fails with the following:
> NotSerializableException: org.apache.camel.impl.DefaultMessage
>
> This is understandable since Message is not serializable, but then I tried
> aggregating a list List<MessageBean> where MessageBean implements the
> Serializable interface and contains the body(String), the
> header(CaseInsensitiveMap) and properties(CaseInsensitiveMap). This time
> the
> exception is different:
> NotSerializableException: org.apache.camel.component.file.GenericFile
>
> Can someone spot the problem? I could aggregated the complete exchanges and
> send them, but isn't the overhead too big?
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Exceptions-when-aggregating-messages-tp5735523.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta