You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Neil Thorne <mr...@hotmail.com> on 2007/09/08 00:23:24 UTC

Better Aggregator support

Hi,

I'm looking at the current aggregator support which seems to be based around
throttling, but I need a full blown stateful aggregator.

I implemented my own Aggregator albeit in memory like this which uses the
messageId as we can get multiple payment messages for the same orderId,
which is dependent on my Splitter. Please find my test case with the
expressions that I am using below.

My questions are

* how can I implement an aggregator whose batch size will change based on an
external event (in my case I can route the original order to the aggregator
first to give it a heads up on how many payments to expect) - I need an
aggregator per message batch.
* can we make this stateful easily (by using some generic JPA mechanism?)
for system crash recovery?

thanks,

Neil

package examples;

import org.apache.camel.*;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.activemq.ActiveMQComponent;
import org.apache.camel.component.jms.JmsExchange;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.Aggregator;

import javax.jms.JMSException;
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;

import junit.framework.TestCase;

/**
 * @author Neil Thorne
 */
public class MyTest extends TestCase {

    private MockEndpoint resultEndpoint;

    public void testSplitAndAggregate() throws Exception {
        Order order1 = createOrder("1");
        Order order2 = createOrder("2");

        final CamelContext camelContext = new DefaultCamelContext();

        resultEndpoint = (MockEndpoint)
camelContext.getEndpoint("mock:result");
        resultEndpoint.expectedBodiesReceived(order1, order2);

        camelContext.addComponent("activemq",
ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=true"));
        camelContext.addRoutes(new RouteBuilder(){
            public void configure() throws Exception {
                Expression splitterExpression = new Expression(){
                    public Object evaluate(Exchange exchange) {
                        Order order = (Order) exchange.getIn().getBody();
                        JmsExchange jmsExchange = (JmsExchange) exchange;
                        javax.jms.Message jmsMessage =
jmsExchange.getIn().getJmsMessage();
                        try {
                            String jmsMessageID =
jmsMessage.getJMSMessageID();
                            jmsMessage.setJMSCorrelationID(jmsMessageID);
                        } catch (JMSException e) {
                            throw new RuntimeException(e);
                        }
                        return order.getPayments();
                    }
                };
                Expression aggregatorExpression = new Expression(){
                    public Object evaluate(Exchange exchange) {
                        Payment payment = (Payment)
exchange.getIn().getBody();
                        JmsMessage in = (JmsMessage) exchange.getIn();
                        String jmsCorrelationID = null;
                        try {
                            jmsCorrelationID =
in.getJmsMessage().getJMSCorrelationID();
                        } catch (JMSException e) {
                            throw new RuntimeException(e);
                        }
                        return jmsCorrelationID + payment.getOrderId();
                    }
                };
                AggregationStrategy aggregationStrategy = new
AggregationStrategy() {
                    public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
                        Object oldBody = oldExchange.getIn().getBody();
                        if (oldBody instanceof Payment) {
                            Payment oldPayment = (Payment) oldBody;
                            Order order = new
Order(oldPayment.getOrderId());
                            order.getPayments().add(oldPayment);
                            Object newBody = newExchange.getIn().getBody();
                            if (newBody instanceof Payment) {
                                Payment newPayment = (Payment) newBody;
                                order.getPayments().add(newPayment);
                            }
                            newExchange.getIn().setBody(order);
                            return newExchange;
                        }
                        else if(oldBody instanceof Order){
                            Order order = (Order) oldBody;
                            Object newBody = newExchange.getIn().getBody();
                            if (newBody instanceof Payment) {
                                Payment newPayment = (Payment) newBody;
                                order.getPayments().add(newPayment);
                                newExchange.getIn().setBody(order);
                            }
                            return newExchange;
                        }
                        throw new IllegalStateException("OldBody should be
type Payment or Order but was " + oldBody.getClass());
                    }
                };
               
from("activemq:queue:bus.in").splitter(splitterExpression).to("activemq:queue:splitThings");
               
from("activemq:queue:splitThings").aggregator(aggregatorExpression,
aggregationStrategy).to("mock:result");
            }

        });
        camelContext.start();
        List<Route> routeList = camelContext.getRoutes();
        for (Iterator<Route> iterator = routeList.iterator();
iterator.hasNext();) {
            Route route = iterator.next();
            List<Service> servicesForRoute = route.getServicesForRoute();
            for (Iterator<Service> iterator1 = servicesForRoute.iterator();
iterator1.hasNext();) {
                Service service = iterator1.next();
                if(service instanceof Aggregator){
                    Aggregator aggregator = (Aggregator) service;
                    aggregator.setBatchSize(4);
                    aggregator.setBatchTimeout(10000);
                }
            }
        }
        CamelTemplate camelTemplate = new CamelTemplate(camelContext);
        camelTemplate.send("activemq:queue:bus.in", new Processor(){
            public void process(Exchange exchange) throws Exception {
                Order order = createOrder("1");
                exchange.getIn().setBody(order);
            }
        });
        camelTemplate.send("activemq:queue:bus.in", new Processor(){
            public void process(Exchange exchange) throws Exception {
                Order order = createOrder("2");
                exchange.getIn().setBody(order);
            }
        });
        Thread.sleep(1000);

        resultEndpoint.assertIsSatisfied();
        camelContext.stop();
    }

    private Order createOrder(String orderId) {
        final Order order = new Order(orderId);
        List orderPayments = new ArrayList();
        orderPayments.add(new Payment(orderId, orderId + "1"));
        orderPayments.add(new Payment(orderId, orderId + "2"));
        orderPayments.add(new Payment(orderId, orderId + "3"));
        orderPayments.add(new Payment(orderId, orderId + "4"));
        order.setPayments(orderPayments);
        return order;
    }

}


package examples;

import java.io.Serializable;
import java.util.List;
import java.util.ArrayList;

/**
 * @author Neil Thorne
 */
public class Order implements Serializable {

    private String orderId;
    private List payments = new ArrayList();

    public Order(String orderId) {
        this.orderId = orderId;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public List getPayments() {
        return payments;
    }

    public void setPayments(List payments) {
        this.payments = payments;
    }

    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof Order)) return false;

        final Order order = (Order) o;

        if (orderId != null ? !orderId.equals(order.orderId) : order.orderId
!= null) return false;
        if (payments != null ? !payments.equals(order.payments) :
order.payments != null) return false;

        return true;
    }

    public int hashCode() {
        int result;
        result = (orderId != null ? orderId.hashCode() : 0);
        result = 29 * result + (payments != null ? payments.hashCode() : 0);
        return result;
    }

    public String toString() {
        return "Order{orderId=" + orderId + ",payments=" + payments + "}";
    }

}

package examples;

import java.io.Serializable;
import java.util.List;

/**
 * @author Neil Thorne
 */
public class Payment implements Serializable {

    private String orderId;
    private String paymentId;

    public Payment(String orderId, String paymentId) {
        this.orderId = orderId;
        this.paymentId = paymentId;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getPaymentId() {
        return paymentId;
    }

    public void setPaymentId(String paymentId) {
        this.paymentId = paymentId;
    }

    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof Payment)) return false;

        final Payment payment = (Payment) o;

        if (orderId != null ? !orderId.equals(payment.orderId) :
payment.orderId != null) return false;
        if (paymentId != null ? !paymentId.equals(payment.paymentId) :
payment.paymentId != null) return false;

        return true;
    }

    public int hashCode() {
        int result;
        result = (orderId != null ? orderId.hashCode() : 0);
        result = 29 * result + (paymentId != null ? paymentId.hashCode() :
0);
        return result;
    }

    public String toString() {
        return "Payment{orderId=" + orderId + ",paymentId=" + paymentId +
"}";
    }

}

-- 
View this message in context: http://www.nabble.com/Better-Aggregator-support-tf4404085s22882.html#a12564277
Sent from the Camel - Users mailing list archive at Nabble.com.