You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by blau <be...@laurant.fr> on 2010/04/13 15:45:17 UTC

a Flow Relgulator Component


Hello, this is a little contribution to camel projet.

Sometime you need to regulate (slow down) the flow of a stream.
For example, imagine a marketdata flow where body of message is a
Map<String, Object>. Update rate is far too important for your need
(sometime more thant 20 mess/sec), so you want to regulate it at 1mess/sec.

As messages are not delayed but aggregated, this flowregulator need a
MessageAggregator strategy (implementation given by a factory).

This implementation is based on the DelayQueue from java.util.concurrent.

ex: from("timer://test?period=1000").to("flowregulator://test?period=5000");

/**
 * This camel component is able to regulate the flow of a route.
 * <p>FlowRegulatorComponent uses a period parameter to trigger updates.
 * <p>First input message is send immediately to the output.
 * Others messages (if period is not over) are inserted into a <em>time
pipeline</em> implemented with a DelayQueue, waiting for timeout to be send
to output
 * <p>If period is over, an incoming message will be send immediately to
output.   
 * 
 * As messages are not delayed but aggregated, this flowregulator need a
MessageAggregator implementation given by a factory. 
 * 
 * <p>example:
 * <code>
 * <p>camelContext.addComponent("flowregulator", new
FlowRelgulatorComponent(new StringMessageAggregatorFactory()));
 *
<p>from("timer://test?period=1000").to("flowregulator://test?period=5000");
 * <p>from("flowregulator://test?period=5000").to...
 * </code>
 * @author bernard LAURANT
 */
public class FlowRelgulatorComponent extends DefaultComponent {

	/**
	 * the time pipeline
	 */
	private DelayQueue<FlowRelgulatorEndPoint> flowRelgulatorEndPoints = new
DelayQueue<FlowRelgulatorEndPoint>();
	
	/**
	 * default period is set to 1s
	 */
	private long defaultPeriod = 1000;
	
	private MessageAggregatorFactory messageAggregatorFactory;
	
	public FlowRelgulatorComponent() {
		super();
	}

	public FlowRelgulatorComponent(CamelContext camelContext) {
		super(camelContext);
	}

	public FlowRelgulatorComponent(MessageAggregatorFactory
messageAggregatorFactory) {
		super();
		this.messageAggregatorFactory = messageAggregatorFactory;
	}
	
	public FlowRelgulatorComponent(CamelContext camelContext,
MessageAggregatorFactory messageAggregatorFactory) {
		super(camelContext);
		this.messageAggregatorFactory = messageAggregatorFactory;
	}
	
	@SuppressWarnings("unchecked")
	@Override
    protected Endpoint createEndpoint(String uri, String remaining, Map
parameters) throws Exception {
		Long period = (Long)getAndRemoveParameter(parameters, "period",
Long.class);
		if (period == null) {
			period = defaultPeriod;
		}
		FlowRelgulatorEndPoint flowRelgulatorEndPoint = new
FlowRelgulatorEndPoint(uri, this, remaining);
		flowRelgulatorEndPoint.setPeriod(period);
	
flowRelgulatorEndPoint.setAggregator(messageAggregatorFactory.createMessageAggregator());
		return flowRelgulatorEndPoint;
    }

	public void setDefaultPeriod(long defaultPeriod) {
		this.defaultPeriod = defaultPeriod;
	}

	public void pipeIn(FlowRelgulatorEndPoint flowRelgulatorEndPoint) {
		synchronized (flowRelgulatorEndPoints) {
			if (!flowRelgulatorEndPoints.contains(flowRelgulatorEndPoint)) {
				flowRelgulatorEndPoints.put(flowRelgulatorEndPoint);
			}
		}
	}
	
	@Override
	public void start() throws Exception {
		super.start();
		Thread pipeOut = new Thread(new Runnable() {
			@Override
			public void run() {
				while (true) {
					try {
						FlowRelgulatorEndPoint flowRelgulatorEndPoint =
flowRelgulatorEndPoints.poll(200, TimeUnit.MILLISECONDS);
						if (flowRelgulatorEndPoint != null) {
							flowRelgulatorEndPoint.flush();
						}
					} catch (InterruptedException e) {
					}
				}
			}
		});
		pipeOut.start();
	}
	
	public void setMessageAggregatorFactory(MessageAggregatorFactory
messageAggregatorFactory) {
		this.messageAggregatorFactory = messageAggregatorFactory;
	}
}




/**
 * Endpoint for FlowRegulator.
 * 
 * @see FlowRegulatorComponent
 * @author bernard LAURANT
 */
public class FlowRelgulatorEndPoint extends DefaultEndpoint implements
Delayed {

	private MessageAggregator messageAggregator;
	private ReguledConsumer reguledConsumer;
	
	private String remaining;
	private long period;
	private long timeOut;
	private long lastMessSent;
	
	public FlowRelgulatorEndPoint(String uri, FlowRelgulatorComponent
flowRelgulatorComponent, String remaining) {
		super(uri, flowRelgulatorComponent);
		this.remaining = remaining;
	}

	@Override
	public String toString() {
		return createEndpointUri();
	}
	
	@Override
	protected String createEndpointUri() {
		return new
StringBuilder().append("flowregulator://").append(remaining).append("?period=").append(period).toString();
	}
	
	@Override
	public long getDelay(TimeUnit unit) {
		long millis2PipeOut = -(System.currentTimeMillis() - timeOut);  
		return unit.convert(millis2PipeOut, TimeUnit.MILLISECONDS);
	}

	@Override
	public int compareTo(Delayed o) {
		long thisDelay = getDelay(TimeUnit.MILLISECONDS);
		long anotherDelay = o.getDelay(TimeUnit.MILLISECONDS);
		return (thisDelay<anotherDelay ? -1 : (thisDelay==anotherDelay ? 0 : 1));
	}
	
	@Override
	public int hashCode() {
		final int prime = 31;
		int result = super.hashCode();
		result = prime * result + (int) (period ^ (period >>> 32));
		result = prime * result + ((remaining == null) ? 0 :
remaining.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (!super.equals(obj))
			return false;
		if (getClass() != obj.getClass())
			return false;
		FlowRelgulatorEndPoint other = (FlowRelgulatorEndPoint) obj;
		if (remaining == null) {
			if (other.remaining != null)
				return false;
		} else if (!remaining.equals(other.remaining))
			return false;
		if (period != other.period)
			return false;
		return true;
	}


	@Override
	public Producer createProducer() throws Exception {
		return new DefaultProducer(this) {
			@Override
			public void process(Exchange exchange) throws Exception {
				messageAggregator.agregateMessage(exchange.getIn());
				if (lastMessSent == 0) {
					flush();// first msg: send it immediately
				} else {
					timeOut = lastMessSent + period; // timeout to pipeout the msg
					if (System.currentTimeMillis() >= timeOut) {
						flush(); // send it immediately, because timeout has expired
					} else {
						// put it in the time pipeline
					
((FlowRelgulatorComponent)FlowRelgulatorEndPoint.this.getComponent()).pipeIn(FlowRelgulatorEndPoint.this);	
					}
				}
			}
		};
	}

	@Override
	public Consumer createConsumer(Processor processor) throws Exception {
		if (reguledConsumer == null) {
			reguledConsumer = new ReguledConsumer(this, processor);
		}
		return reguledConsumer; 
	}
	
	public void flush() {
		reguledConsumer.send();
		lastMessSent = System.currentTimeMillis();
	}
	
	class ReguledConsumer extends DefaultConsumer {

		public ReguledConsumer(Endpoint endPoint, Processor processor) {
			super(endPoint, processor);
		}

		public void send() {
			Exchange exchange = getEndpoint().createExchange();
			exchange.setIn(messageAggregator.getAndClearMessage());
			try {
				getProcessor().process(exchange);
			} catch (Exception e) {
				handleException(e);
			}
		}
	}

	@Override
	public boolean isSingleton() {
		return true;
	}
	
	public void setPeriod(long period) {
		this.period = period;
	}
	
	public void setAggregator(MessageAggregator messageAggregator) {
		this.messageAggregator = messageAggregator;
	}
}


The MessageAgregator interface :

public interface MessageAggregatorFactory {

	MessageAggregator createMessageAggregator();
}

public interface MessageAggregator {
	void agregateMessage(Message message);
	void clearMessage();
	Message getMessage();
	Message getAndClearMessage();
}

Exemple for a Map :

public class MapMessageAggregatorFactory implements MessageAggregatorFactory
{

	@Override
	public MessageAggregator createMessageAggregator() {
		return new MessageAggregator() {
			
			Message data;
			
			@Override
			public synchronized Message getMessage() {
				return data;
			}
			
			@Override
			public synchronized void clearMessage() {
				data = null;
			}
			
			@Override
			public synchronized Message getAndClearMessage() {
				Message res = getMessage();
				clearMessage();
				return res;
			}
			
			@SuppressWarnings("unchecked")
			@Override
			public synchronized void agregateMessage(Message message) {
				if (message == null)
					return;
				if (data == null) {
					data = message;
				} else {
					Map newData = (Map) message.getBody();
					((Map)data.getBody()).putAll(newData);
				}
			}
		};
	}
}

exemple for a string :

public class StringMessageAggregatorFactory implements
MessageAggregatorFactory {

	@Override
	public MessageAggregator createMessageAggregator() {
		return new MessageAggregator() {
			
			Message data;
			
			@Override
			public synchronized Message getMessage() {
				return data;
			}
			
			@Override
			public synchronized void clearMessage() {
				data.setBody("");
			}
			
			@Override
			public synchronized void agregateMessage(Message message) {
				if (data == null) {
					data = message;
				} else {
					data.setBody((String)data.getBody() + message.getBody());	
					data.getHeaders().putAll(message.getHeaders());
				}
			}

			@Override
			public synchronized Message getAndClearMessage() {
				Message res = data;
				data = null;
				return res;
			}
		};
	}
}

And a simple test :

public class TestFlow {

	static CamelContext camelContext; 
	static int counter = 0;
	
	public static void main(String[] args) throws Exception {
		camelContext = new DefaultCamelContext();
		camelContext.addComponent("flowregulator", new FlowRelgulatorComponent(new
StringMessageAggregatorFactory()));

		camelContext.addRoutes(new RouteBuilder() {
			@Override
			public void configure() throws Exception {
				from("timer://test?period=1000")
				.process(new Processor() {
					@Override
					public void process(Exchange exchange) throws Exception {
						System.out.println("timer : " + exchange.getIn().getHeaders());
						Message msg = exchange.getIn();
						msg.setBody(Integer.toString(counter++));
						exchange.setOut(msg);
					}
				})
				.to("flowregulator://test?period=5000");
                
				from("flowregulator://test?period=5000").process(new Processor() {
					@Override
					public void process(Exchange exchange) throws Exception {
						System.out.println("flowReg: " + exchange.getIn() + " " +
exchange.getIn().getHeaders());		
					}
				});
			}
		});
		camelContext.start();
	}

}



-- 
View this message in context: http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28230406.html
Sent from the Camel Development mailing list archive at Nabble.com.


Re: a Flow Relgulator Component

Posted by blau <be...@laurant.fr>.
Hello,

According to my use case,
- I have to intake the message because I need the data inside it
(aggregation is done with map.putAll())
- I have to delay endpoint processing message (or aggregated message) only
if current time < time of the last message + period. (regulation)
 
For this purpose, RoutePolicy looks lighter than Component. But I can’t see
how I can avoid the use of a time pipeline (like DelayedQueue) ?
Suspending/resuming consumer is not enough; I need to resume the consumer at
a specific time with a specific message.


Details about use case :

Message 1
BID: 12.2
ASK:  12.3

-> send this first message now
Next message (if exist) will be sent in 1 period

Message 2
TRADEPRICE = 12.3

Message 3
BID: 12.3
ASK: 12.4

Message 4
BID: 12.3
ASK: 12.5

-> 3 messages during the period: send message 2+3+4 (aggregation is done
with map.putAll())
TRADEPRICE = 12.3
BID: 12.3
ASK: 12.5



Claus Ibsen-2 wrote:
> 
> Hi
> 
> Thanks for your contribution. And the idea of using DelayedQueue is good.
> Its something we will add to Camel error handler in the future to
> offer non blocked waiting while waiting to do next redelivery.
> 
> In terms of dynamically throttle a route I believe Camel's RoutePolicy
> is maybe more powerful
> http://camel.apache.org/routepolicy.html
> 
> It allows you to avoid intaking the messages which then wont risk of
> loosing messages if they are temporary stored in an in memory
> DelayedQueue.
> 
> 
> 
> On Tue, Apr 13, 2010 at 3:45 PM, blau  wrote:
>>
>>
>> Hello, this is a little contribution to camel projet.
>>
>> Sometime you need to regulate (slow down) the flow of a stream.
>> For example, imagine a marketdata flow where body of message is a
>> Map. Update rate is far too important for your need
>> (sometime more thant 20 mess/sec), so you want to regulate it at
>> 1mess/sec.
>>
>> As messages are not delayed but aggregated, this flowregulator need a
>> MessageAggregator strategy (implementation given by a factory).
>>
>> This implementation is based on the DelayQueue from java.util.concurrent.
>>
>> ex:
>> from("timer://test?period=1000").to("flowregulator://test?period=5000");
>>
>> /**
>>  * This camel component is able to regulate the flow of a route.
>>  * 
FlowRegulatorComponent uses a period parameter to trigger updates.
>>  * 
First input message is send immediately to the output.
>>  * Others messages (if period is not over) are inserted into a time
>> pipeline implemented with a DelayQueue, waiting for timeout to be send
>> to output
>>  * 
If period is over, an incoming message will be send immediately to
>> output.
>>  *
>>  * As messages are not delayed but aggregated, this flowregulator need a
>> MessageAggregator implementation given by a factory.
>>  *
>>  * 
example:
>>  * 
>>  * 
camelContext.addComponent("flowregulator", new
>> FlowRelgulatorComponent(new StringMessageAggregatorFactory()));
>>  *
>> 
from("timer://test?period=1000").to("flowregulator://test?period=5000");
>>  * 
from("flowregulator://test?period=5000").to...
>>  * 
>>  * @author bernard LAURANT
>>  */
>> public class FlowRelgulatorComponent extends DefaultComponent {
>>
>>        /**
>>         * the time pipeline
>>         */
>>        private DelayQueue flowRelgulatorEndPoints = new
>> DelayQueue();
>>
>>        /**
>>         * default period is set to 1s
>>         */
>>        private long defaultPeriod = 1000;
>>
>>        private MessageAggregatorFactory messageAggregatorFactory;
>>
>>        public FlowRelgulatorComponent() {
>>                super();
>>        }
>>
>>        public FlowRelgulatorComponent(CamelContext camelContext) {
>>                super(camelContext);
>>        }
>>
>>        public FlowRelgulatorComponent(MessageAggregatorFactory
>> messageAggregatorFactory) {
>>                super();
>>                this.messageAggregatorFactory = messageAggregatorFactory;
>>        }
>>
>>        public FlowRelgulatorComponent(CamelContext camelContext,
>> MessageAggregatorFactory messageAggregatorFactory) {
>>                super(camelContext);
>>                this.messageAggregatorFactory = messageAggregatorFactory;
>>        }
>>
>>        @SuppressWarnings("unchecked")
>>        @Override
>>    protected Endpoint createEndpoint(String uri, String remaining, Map
>> parameters) throws Exception {
>>                Long period = (Long)getAndRemoveParameter(parameters,
>> "period",
>> Long.class);
>>                if (period == null) {
>>                        period = defaultPeriod;
>>                }
>>                FlowRelgulatorEndPoint flowRelgulatorEndPoint = new
>> FlowRelgulatorEndPoint(uri, this, remaining);
>>                flowRelgulatorEndPoint.setPeriod(period);
>>
>> flowRelgulatorEndPoint.setAggregator(messageAggregatorFactory.createMessageAggregator());
>>                return flowRelgulatorEndPoint;
>>    }
>>
>>        public void setDefaultPeriod(long defaultPeriod) {
>>                this.defaultPeriod = defaultPeriod;
>>        }
>>
>>        public void pipeIn(FlowRelgulatorEndPoint flowRelgulatorEndPoint)
>> {
>>                synchronized (flowRelgulatorEndPoints) {
>>                        if
>> (!flowRelgulatorEndPoints.contains(flowRelgulatorEndPoint)) {
>>                              
>>  flowRelgulatorEndPoints.put(flowRelgulatorEndPoint);
>>                        }
>>                }
>>        }
>>
>>        @Override
>>        public void start() throws Exception {
>>                super.start();
>>                Thread pipeOut = new Thread(new Runnable() {
>>                        @Override
>>                        public void run() {
>>                                while (true) {
>>                                        try {
>>                                                FlowRelgulatorEndPoint
>> flowRelgulatorEndPoint =
>> flowRelgulatorEndPoints.poll(200, TimeUnit.MILLISECONDS);
>>                                                if (flowRelgulatorEndPoint
>> != null) {
>>                                                      
>>  flowRelgulatorEndPoint.flush();
>>                                                }
>>                                        } catch (InterruptedException e) {
>>                                        }
>>                                }
>>                        }
>>                });
>>                pipeOut.start();
>>        }
>>
>>        public void setMessageAggregatorFactory(MessageAggregatorFactory
>> messageAggregatorFactory) {
>>                this.messageAggregatorFactory = messageAggregatorFactory;
>>        }
>> }
>>
>>
>>
>>
>> /**
>>  * Endpoint for FlowRegulator.
>>  *
>>  * @see FlowRegulatorComponent
>>  * @author bernard LAURANT
>>  */
>> public class FlowRelgulatorEndPoint extends DefaultEndpoint implements
>> Delayed {
>>
>>        private MessageAggregator messageAggregator;
>>        private ReguledConsumer reguledConsumer;
>>
>>        private String remaining;
>>        private long period;
>>        private long timeOut;
>>        private long lastMessSent;
>>
>>        public FlowRelgulatorEndPoint(String uri, FlowRelgulatorComponent
>> flowRelgulatorComponent, String remaining) {
>>                super(uri, flowRelgulatorComponent);
>>                this.remaining = remaining;
>>        }
>>
>>        @Override
>>        public String toString() {
>>                return createEndpointUri();
>>        }
>>
>>        @Override
>>        protected String createEndpointUri() {
>>                return new
>> StringBuilder().append("flowregulator://").append(remaining).append("?period=").append(period).toString();
>>        }
>>
>>        @Override
>>        public long getDelay(TimeUnit unit) {
>>                long millis2PipeOut = -(System.currentTimeMillis() -
>> timeOut);
>>                return unit.convert(millis2PipeOut,
>> TimeUnit.MILLISECONDS);
>>        }
>>
>>        @Override
>>        public int compareTo(Delayed o) {
>>                long thisDelay = getDelay(TimeUnit.MILLISECONDS);
>>                long anotherDelay = o.getDelay(TimeUnit.MILLISECONDS);
>>                return (thisDelay        }
>>
>>        @Override
>>        public int hashCode() {
>>                final int prime = 31;
>>                int result = super.hashCode();
>>                result = prime * result + (int) (period ^ (period >>>
>> 32));
>>                result = prime * result + ((remaining == null) ? 0 :
>> remaining.hashCode());
>>                return result;
>>        }
>>
>>        @Override
>>        public boolean equals(Object obj) {
>>                if (this == obj)
>>                        return true;
>>                if (!super.equals(obj))
>>                        return false;
>>                if (getClass() != obj.getClass())
>>                        return false;
>>                FlowRelgulatorEndPoint other = (FlowRelgulatorEndPoint)
>> obj;
>>                if (remaining == null) {
>>                        if (other.remaining != null)
>>                                return false;
>>                } else if (!remaining.equals(other.remaining))
>>                        return false;
>>                if (period != other.period)
>>                        return false;
>>                return true;
>>        }
>>
>>
>>        @Override
>>        public Producer createProducer() throws Exception {
>>                return new DefaultProducer(this) {
>>                        @Override
>>                        public void process(Exchange exchange) throws
>> Exception {
>>                              
>>  messageAggregator.agregateMessage(exchange.getIn());
>>                                if (lastMessSent == 0) {
>>                                        flush();// first msg: send it
>> immediately
>>                                } else {
>>                                        timeOut = lastMessSent + period;
>> // timeout to pipeout the msg
>>                                        if (System.currentTimeMillis() >=
>> timeOut) {
>>                                                flush(); // send it
>> immediately, because timeout has expired
>>                                        } else {
>>                                                // put it in the time
>> pipeline
>>
>> ((FlowRelgulatorComponent)FlowRelgulatorEndPoint.this.getComponent()).pipeIn(FlowRelgulatorEndPoint.this);
>>                                        }
>>                                }
>>                        }
>>                };
>>        }
>>
>>        @Override
>>        public Consumer createConsumer(Processor processor) throws
>> Exception {
>>                if (reguledConsumer == null) {
>>                        reguledConsumer = new ReguledConsumer(this,
>> processor);
>>                }
>>                return reguledConsumer;
>>        }
>>
>>        public void flush() {
>>                reguledConsumer.send();
>>                lastMessSent = System.currentTimeMillis();
>>        }
>>
>>        class ReguledConsumer extends DefaultConsumer {
>>
>>                public ReguledConsumer(Endpoint endPoint, Processor
>> processor) {
>>                        super(endPoint, processor);
>>                }
>>
>>                public void send() {
>>                        Exchange exchange =
>> getEndpoint().createExchange();
>>                      
>>  exchange.setIn(messageAggregator.getAndClearMessage());
>>                        try {
>>                                getProcessor().process(exchange);
>>                        } catch (Exception e) {
>>                                handleException(e);
>>                        }
>>                }
>>        }
>>
>>        @Override
>>        public boolean isSingleton() {
>>                return true;
>>        }
>>
>>        public void setPeriod(long period) {
>>                this.period = period;
>>        }
>>
>>        public void setAggregator(MessageAggregator messageAggregator) {
>>                this.messageAggregator = messageAggregator;
>>        }
>> }
>>
>>
>> The MessageAgregator interface :
>>
>> public interface MessageAggregatorFactory {
>>
>>        MessageAggregator createMessageAggregator();
>> }
>>
>> public interface MessageAggregator {
>>        void agregateMessage(Message message);
>>        void clearMessage();
>>        Message getMessage();
>>        Message getAndClearMessage();
>> }
>>
>> Exemple for a Map :
>>
>> public class MapMessageAggregatorFactory implements
>> MessageAggregatorFactory
>> {
>>
>>        @Override
>>        public MessageAggregator createMessageAggregator() {
>>                return new MessageAggregator() {
>>
>>                        Message data;
>>
>>                        @Override
>>                        public synchronized Message getMessage() {
>>                                return data;
>>                        }
>>
>>                        @Override
>>                        public synchronized void clearMessage() {
>>                                data = null;
>>                        }
>>
>>                        @Override
>>                        public synchronized Message getAndClearMessage() {
>>                                Message res = getMessage();
>>                                clearMessage();
>>                                return res;
>>                        }
>>
>>                        @SuppressWarnings("unchecked")
>>                        @Override
>>                        public synchronized void agregateMessage(Message
>> message) {
>>                                if (message == null)
>>                                        return;
>>                                if (data == null) {
>>                                        data = message;
>>                                } else {
>>                                        Map newData = (Map)
>> message.getBody();
>>                                      
>>  ((Map)data.getBody()).putAll(newData);
>>                                }
>>                        }
>>                };
>>        }
>> }
>>
>> exemple for a string :
>>
>> public class StringMessageAggregatorFactory implements
>> MessageAggregatorFactory {
>>
>>        @Override
>>        public MessageAggregator createMessageAggregator() {
>>                return new MessageAggregator() {
>>
>>                        Message data;
>>
>>                        @Override
>>                        public synchronized Message getMessage() {
>>                                return data;
>>                        }
>>
>>                        @Override
>>                        public synchronized void clearMessage() {
>>                                data.setBody("");
>>                        }
>>
>>                        @Override
>>                        public synchronized void agregateMessage(Message
>> message) {
>>                                if (data == null) {
>>                                        data = message;
>>                                } else {
>>                                      
>>  data.setBody((String)data.getBody() + message.getBody());
>>                                      
>>  data.getHeaders().putAll(message.getHeaders());
>>                                }
>>                        }
>>
>>                        @Override
>>                        public synchronized Message getAndClearMessage() {
>>                                Message res = data;
>>                                data = null;
>>                                return res;
>>                        }
>>                };
>>        }
>> }
>>
>> And a simple test :
>>
>> public class TestFlow {
>>
>>        static CamelContext camelContext;
>>        static int counter = 0;
>>
>>        public static void main(String[] args) throws Exception {
>>                camelContext = new DefaultCamelContext();
>>                camelContext.addComponent("flowregulator", new
>> FlowRelgulatorComponent(new
>> StringMessageAggregatorFactory()));
>>
>>                camelContext.addRoutes(new RouteBuilder() {
>>                        @Override
>>                        public void configure() throws Exception {
>>                                from("timer://test?period=1000")
>>                                .process(new Processor() {
>>                                        @Override
>>                                        public void process(Exchange
>> exchange) throws Exception {
>>                                                System.out.println("timer
>> : " + exchange.getIn().getHeaders());
>>                                                Message msg =
>> exchange.getIn();
>>                                              
>>  msg.setBody(Integer.toString(counter++));
>>                                                exchange.setOut(msg);
>>                                        }
>>                                })
>>                                .to("flowregulator://test?period=5000");
>>
>>                              
>>  from("flowregulator://test?period=5000").process(new Processor() {
>>                                        @Override
>>                                        public void process(Exchange
>> exchange) throws Exception {
>>                                              
>>  System.out.println("flowReg: " + exchange.getIn() + " " +
>> exchange.getIn().getHeaders());
>>                                        }
>>                                });
>>                        }
>>                });
>>                camelContext.start();
>>        }
>>
>> }
>>
>>
>>
>> --
>> View this message in context:
>> http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28230406.html
>> Sent from the Camel Development mailing list archive at Nabble.com.
>>
>>
> 
> 
> 
> -- 
> Claus Ibsen
> Apache Camel Committer
> 
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
> 
> 

-- 
View this message in context: http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28243300.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: a Flow Relgulator Component

Posted by blau <be...@laurant.fr>.
According to my use case,
- I have to intake the message because I need the data inside it
(aggregation is done with map.putAll())
- I have to delay endpoint processing message (or aggregated message) only
if current time < time of the last message + period. (regulation)
 
For this purpose, RoutePolicy looks lighter than Component. But I can’t see
how I can avoid the use of a time pipeline (like DelayedQueue) ?
Suspending/resuming consumer is not enough; I need to resume the consumer at
a specific time (with a specific message).

In fact, here is my use case :

Message 1
BID: 12.2
ASK:  12.3

-> send this first message now
Next message (if exist) will be send in 1 period)

Message 2
TRADEPRICE = 12.3

Message 3
BID: 12.3
ASK: 12.4

Message 4
BID: 12.3
ASK: 12.5

-> 3 messages during the period: send message 2+3+4 (aggregation is done
with map.putAll())
TRADEPRICE = 12.3
BID: 12.3
ASK: 12.5



Claus Ibsen-2 wrote:
> 
> Hi
> 
> Thanks for your contribution. And the idea of using DelayedQueue is good.
> Its something we will add to Camel error handler in the future to
> offer non blocked waiting while waiting to do next redelivery.
> 
> In terms of dynamically throttle a route I believe Camel's RoutePolicy
> is maybe more powerful
> http://camel.apache.org/routepolicy.html
> 
> It allows you to avoid intaking the messages which then wont risk of
> loosing messages if they are temporary stored in an in memory
> DelayedQueue.
> 
> 
> 
> On Tue, Apr 13, 2010 at 3:45 PM, blau <be...@laurant.fr> wrote:
>>
>>
>> Hello, this is a little contribution to camel projet.
>>
>> Sometime you need to regulate (slow down) the flow of a stream.
>> For example, imagine a marketdata flow where body of message is a
>> Map<String, Object>. Update rate is far too important for your need
>> (sometime more thant 20 mess/sec), so you want to regulate it at
>> 1mess/sec.
>>
>> As messages are not delayed but aggregated, this flowregulator need a
>> MessageAggregator strategy (implementation given by a factory).
>>
>> This implementation is based on the DelayQueue from java.util.concurrent.
>>
>> ex:
>> from("timer://test?period=1000").to("flowregulator://test?period=5000");
>>
>> /**
>>  * This camel component is able to regulate the flow of a route.
>>  * <p>FlowRegulatorComponent uses a period parameter to trigger updates.
>>  * <p>First input message is send immediately to the output.
>>  * Others messages (if period is not over) are inserted into a <em>time
>> pipeline</em> implemented with a DelayQueue, waiting for timeout to be
>> send
>> to output
>>  * <p>If period is over, an incoming message will be send immediately to
>> output.
>>  *
>>  * As messages are not delayed but aggregated, this flowregulator need a
>> MessageAggregator implementation given by a factory.
>>  *
>>  * <p>example:
>>  * <code>
>>  * <p>camelContext.addComponent("flowregulator", new
>> FlowRelgulatorComponent(new StringMessageAggregatorFactory()));
>>  *
>> <p>from("timer://test?period=1000").to("flowregulator://test?period=5000");
>>  * <p>from("flowregulator://test?period=5000").to...
>>  * </code>
>>  * @author bernard LAURANT
>>  */
>> public class FlowRelgulatorComponent extends DefaultComponent {
>>
>>        /**
>>         * the time pipeline
>>         */
>>        private DelayQueue<FlowRelgulatorEndPoint> flowRelgulatorEndPoints
>> = new
>> DelayQueue<FlowRelgulatorEndPoint>();
>>
>>        /**
>>         * default period is set to 1s
>>         */
>>        private long defaultPeriod = 1000;
>>
>>        private MessageAggregatorFactory messageAggregatorFactory;
>>
>>        public FlowRelgulatorComponent() {
>>                super();
>>        }
>>
>>        public FlowRelgulatorComponent(CamelContext camelContext) {
>>                super(camelContext);
>>        }
>>
>>        public FlowRelgulatorComponent(MessageAggregatorFactory
>> messageAggregatorFactory) {
>>                super();
>>                this.messageAggregatorFactory = messageAggregatorFactory;
>>        }
>>
>>        public FlowRelgulatorComponent(CamelContext camelContext,
>> MessageAggregatorFactory messageAggregatorFactory) {
>>                super(camelContext);
>>                this.messageAggregatorFactory = messageAggregatorFactory;
>>        }
>>
>>        @SuppressWarnings("unchecked")
>>        @Override
>>    protected Endpoint createEndpoint(String uri, String remaining, Map
>> parameters) throws Exception {
>>                Long period = (Long)getAndRemoveParameter(parameters,
>> "period",
>> Long.class);
>>                if (period == null) {
>>                        period = defaultPeriod;
>>                }
>>                FlowRelgulatorEndPoint flowRelgulatorEndPoint = new
>> FlowRelgulatorEndPoint(uri, this, remaining);
>>                flowRelgulatorEndPoint.setPeriod(period);
>>
>> flowRelgulatorEndPoint.setAggregator(messageAggregatorFactory.createMessageAggregator());
>>                return flowRelgulatorEndPoint;
>>    }
>>
>>        public void setDefaultPeriod(long defaultPeriod) {
>>                this.defaultPeriod = defaultPeriod;
>>        }
>>
>>        public void pipeIn(FlowRelgulatorEndPoint flowRelgulatorEndPoint)
>> {
>>                synchronized (flowRelgulatorEndPoints) {
>>                        if
>> (!flowRelgulatorEndPoints.contains(flowRelgulatorEndPoint)) {
>>                              
>>  flowRelgulatorEndPoints.put(flowRelgulatorEndPoint);
>>                        }
>>                }
>>        }
>>
>>        @Override
>>        public void start() throws Exception {
>>                super.start();
>>                Thread pipeOut = new Thread(new Runnable() {
>>                        @Override
>>                        public void run() {
>>                                while (true) {
>>                                        try {
>>                                                FlowRelgulatorEndPoint
>> flowRelgulatorEndPoint =
>> flowRelgulatorEndPoints.poll(200, TimeUnit.MILLISECONDS);
>>                                                if (flowRelgulatorEndPoint
>> != null) {
>>                                                      
>>  flowRelgulatorEndPoint.flush();
>>                                                }
>>                                        } catch (InterruptedException e) {
>>                                        }
>>                                }
>>                        }
>>                });
>>                pipeOut.start();
>>        }
>>
>>        public void setMessageAggregatorFactory(MessageAggregatorFactory
>> messageAggregatorFactory) {
>>                this.messageAggregatorFactory = messageAggregatorFactory;
>>        }
>> }
>>
>>
>>
>>
>> /**
>>  * Endpoint for FlowRegulator.
>>  *
>>  * @see FlowRegulatorComponent
>>  * @author bernard LAURANT
>>  */
>> public class FlowRelgulatorEndPoint extends DefaultEndpoint implements
>> Delayed {
>>
>>        private MessageAggregator messageAggregator;
>>        private ReguledConsumer reguledConsumer;
>>
>>        private String remaining;
>>        private long period;
>>        private long timeOut;
>>        private long lastMessSent;
>>
>>        public FlowRelgulatorEndPoint(String uri, FlowRelgulatorComponent
>> flowRelgulatorComponent, String remaining) {
>>                super(uri, flowRelgulatorComponent);
>>                this.remaining = remaining;
>>        }
>>
>>        @Override
>>        public String toString() {
>>                return createEndpointUri();
>>        }
>>
>>        @Override
>>        protected String createEndpointUri() {
>>                return new
>> StringBuilder().append("flowregulator://").append(remaining).append("?period=").append(period).toString();
>>        }
>>
>>        @Override
>>        public long getDelay(TimeUnit unit) {
>>                long millis2PipeOut = -(System.currentTimeMillis() -
>> timeOut);
>>                return unit.convert(millis2PipeOut,
>> TimeUnit.MILLISECONDS);
>>        }
>>
>>        @Override
>>        public int compareTo(Delayed o) {
>>                long thisDelay = getDelay(TimeUnit.MILLISECONDS);
>>                long anotherDelay = o.getDelay(TimeUnit.MILLISECONDS);
>>                return (thisDelay<anotherDelay ? -1 :
>> (thisDelay==anotherDelay ? 0 : 1));
>>        }
>>
>>        @Override
>>        public int hashCode() {
>>                final int prime = 31;
>>                int result = super.hashCode();
>>                result = prime * result + (int) (period ^ (period >>>
>> 32));
>>                result = prime * result + ((remaining == null) ? 0 :
>> remaining.hashCode());
>>                return result;
>>        }
>>
>>        @Override
>>        public boolean equals(Object obj) {
>>                if (this == obj)
>>                        return true;
>>                if (!super.equals(obj))
>>                        return false;
>>                if (getClass() != obj.getClass())
>>                        return false;
>>                FlowRelgulatorEndPoint other = (FlowRelgulatorEndPoint)
>> obj;
>>                if (remaining == null) {
>>                        if (other.remaining != null)
>>                                return false;
>>                } else if (!remaining.equals(other.remaining))
>>                        return false;
>>                if (period != other.period)
>>                        return false;
>>                return true;
>>        }
>>
>>
>>        @Override
>>        public Producer createProducer() throws Exception {
>>                return new DefaultProducer(this) {
>>                        @Override
>>                        public void process(Exchange exchange) throws
>> Exception {
>>                              
>>  messageAggregator.agregateMessage(exchange.getIn());
>>                                if (lastMessSent == 0) {
>>                                        flush();// first msg: send it
>> immediately
>>                                } else {
>>                                        timeOut = lastMessSent + period;
>> // timeout to pipeout the msg
>>                                        if (System.currentTimeMillis() >=
>> timeOut) {
>>                                                flush(); // send it
>> immediately, because timeout has expired
>>                                        } else {
>>                                                // put it in the time
>> pipeline
>>
>> ((FlowRelgulatorComponent)FlowRelgulatorEndPoint.this.getComponent()).pipeIn(FlowRelgulatorEndPoint.this);
>>                                        }
>>                                }
>>                        }
>>                };
>>        }
>>
>>        @Override
>>        public Consumer createConsumer(Processor processor) throws
>> Exception {
>>                if (reguledConsumer == null) {
>>                        reguledConsumer = new ReguledConsumer(this,
>> processor);
>>                }
>>                return reguledConsumer;
>>        }
>>
>>        public void flush() {
>>                reguledConsumer.send();
>>                lastMessSent = System.currentTimeMillis();
>>        }
>>
>>        class ReguledConsumer extends DefaultConsumer {
>>
>>                public ReguledConsumer(Endpoint endPoint, Processor
>> processor) {
>>                        super(endPoint, processor);
>>                }
>>
>>                public void send() {
>>                        Exchange exchange =
>> getEndpoint().createExchange();
>>                      
>>  exchange.setIn(messageAggregator.getAndClearMessage());
>>                        try {
>>                                getProcessor().process(exchange);
>>                        } catch (Exception e) {
>>                                handleException(e);
>>                        }
>>                }
>>        }
>>
>>        @Override
>>        public boolean isSingleton() {
>>                return true;
>>        }
>>
>>        public void setPeriod(long period) {
>>                this.period = period;
>>        }
>>
>>        public void setAggregator(MessageAggregator messageAggregator) {
>>                this.messageAggregator = messageAggregator;
>>        }
>> }
>>
>>
>> The MessageAgregator interface :
>>
>> public interface MessageAggregatorFactory {
>>
>>        MessageAggregator createMessageAggregator();
>> }
>>
>> public interface MessageAggregator {
>>        void agregateMessage(Message message);
>>        void clearMessage();
>>        Message getMessage();
>>        Message getAndClearMessage();
>> }
>>
>> Exemple for a Map :
>>
>> public class MapMessageAggregatorFactory implements
>> MessageAggregatorFactory
>> {
>>
>>        @Override
>>        public MessageAggregator createMessageAggregator() {
>>                return new MessageAggregator() {
>>
>>                        Message data;
>>
>>                        @Override
>>                        public synchronized Message getMessage() {
>>                                return data;
>>                        }
>>
>>                        @Override
>>                        public synchronized void clearMessage() {
>>                                data = null;
>>                        }
>>
>>                        @Override
>>                        public synchronized Message getAndClearMessage() {
>>                                Message res = getMessage();
>>                                clearMessage();
>>                                return res;
>>                        }
>>
>>                        @SuppressWarnings("unchecked")
>>                        @Override
>>                        public synchronized void agregateMessage(Message
>> message) {
>>                                if (message == null)
>>                                        return;
>>                                if (data == null) {
>>                                        data = message;
>>                                } else {
>>                                        Map newData = (Map)
>> message.getBody();
>>                                      
>>  ((Map)data.getBody()).putAll(newData);
>>                                }
>>                        }
>>                };
>>        }
>> }
>>
>> exemple for a string :
>>
>> public class StringMessageAggregatorFactory implements
>> MessageAggregatorFactory {
>>
>>        @Override
>>        public MessageAggregator createMessageAggregator() {
>>                return new MessageAggregator() {
>>
>>                        Message data;
>>
>>                        @Override
>>                        public synchronized Message getMessage() {
>>                                return data;
>>                        }
>>
>>                        @Override
>>                        public synchronized void clearMessage() {
>>                                data.setBody("");
>>                        }
>>
>>                        @Override
>>                        public synchronized void agregateMessage(Message
>> message) {
>>                                if (data == null) {
>>                                        data = message;
>>                                } else {
>>                                      
>>  data.setBody((String)data.getBody() + message.getBody());
>>                                      
>>  data.getHeaders().putAll(message.getHeaders());
>>                                }
>>                        }
>>
>>                        @Override
>>                        public synchronized Message getAndClearMessage() {
>>                                Message res = data;
>>                                data = null;
>>                                return res;
>>                        }
>>                };
>>        }
>> }
>>
>> And a simple test :
>>
>> public class TestFlow {
>>
>>        static CamelContext camelContext;
>>        static int counter = 0;
>>
>>        public static void main(String[] args) throws Exception {
>>                camelContext = new DefaultCamelContext();
>>                camelContext.addComponent("flowregulator", new
>> FlowRelgulatorComponent(new
>> StringMessageAggregatorFactory()));
>>
>>                camelContext.addRoutes(new RouteBuilder() {
>>                        @Override
>>                        public void configure() throws Exception {
>>                                from("timer://test?period=1000")
>>                                .process(new Processor() {
>>                                        @Override
>>                                        public void process(Exchange
>> exchange) throws Exception {
>>                                                System.out.println("timer
>> : " + exchange.getIn().getHeaders());
>>                                                Message msg =
>> exchange.getIn();
>>                                              
>>  msg.setBody(Integer.toString(counter++));
>>                                                exchange.setOut(msg);
>>                                        }
>>                                })
>>                                .to("flowregulator://test?period=5000");
>>
>>                              
>>  from("flowregulator://test?period=5000").process(new Processor() {
>>                                        @Override
>>                                        public void process(Exchange
>> exchange) throws Exception {
>>                                              
>>  System.out.println("flowReg: " + exchange.getIn() + " " +
>> exchange.getIn().getHeaders());
>>                                        }
>>                                });
>>                        }
>>                });
>>                camelContext.start();
>>        }
>>
>> }
>>
>>
>>
>> --
>> View this message in context:
>> http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28230406.html
>> Sent from the Camel Development mailing list archive at Nabble.com.
>>
>>
> 
> 
> 
> -- 
> Claus Ibsen
> Apache Camel Committer
> 
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
> 
> 

-- 
View this message in context: http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28243412.html
Sent from the Camel Development mailing list archive at Nabble.com.


Re: a Flow Relgulator Component

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Thanks for your contribution. And the idea of using DelayedQueue is good.
Its something we will add to Camel error handler in the future to
offer non blocked waiting while waiting to do next redelivery.

In terms of dynamically throttle a route I believe Camel's RoutePolicy
is maybe more powerful
http://camel.apache.org/routepolicy.html

It allows you to avoid intaking the messages which then wont risk of
loosing messages if they are temporary stored in an in memory
DelayedQueue.



On Tue, Apr 13, 2010 at 3:45 PM, blau <be...@laurant.fr> wrote:
>
>
> Hello, this is a little contribution to camel projet.
>
> Sometime you need to regulate (slow down) the flow of a stream.
> For example, imagine a marketdata flow where body of message is a
> Map<String, Object>. Update rate is far too important for your need
> (sometime more thant 20 mess/sec), so you want to regulate it at 1mess/sec.
>
> As messages are not delayed but aggregated, this flowregulator need a
> MessageAggregator strategy (implementation given by a factory).
>
> This implementation is based on the DelayQueue from java.util.concurrent.
>
> ex: from("timer://test?period=1000").to("flowregulator://test?period=5000");
>
> /**
>  * This camel component is able to regulate the flow of a route.
>  * <p>FlowRegulatorComponent uses a period parameter to trigger updates.
>  * <p>First input message is send immediately to the output.
>  * Others messages (if period is not over) are inserted into a <em>time
> pipeline</em> implemented with a DelayQueue, waiting for timeout to be send
> to output
>  * <p>If period is over, an incoming message will be send immediately to
> output.
>  *
>  * As messages are not delayed but aggregated, this flowregulator need a
> MessageAggregator implementation given by a factory.
>  *
>  * <p>example:
>  * <code>
>  * <p>camelContext.addComponent("flowregulator", new
> FlowRelgulatorComponent(new StringMessageAggregatorFactory()));
>  *
> <p>from("timer://test?period=1000").to("flowregulator://test?period=5000");
>  * <p>from("flowregulator://test?period=5000").to...
>  * </code>
>  * @author bernard LAURANT
>  */
> public class FlowRelgulatorComponent extends DefaultComponent {
>
>        /**
>         * the time pipeline
>         */
>        private DelayQueue<FlowRelgulatorEndPoint> flowRelgulatorEndPoints = new
> DelayQueue<FlowRelgulatorEndPoint>();
>
>        /**
>         * default period is set to 1s
>         */
>        private long defaultPeriod = 1000;
>
>        private MessageAggregatorFactory messageAggregatorFactory;
>
>        public FlowRelgulatorComponent() {
>                super();
>        }
>
>        public FlowRelgulatorComponent(CamelContext camelContext) {
>                super(camelContext);
>        }
>
>        public FlowRelgulatorComponent(MessageAggregatorFactory
> messageAggregatorFactory) {
>                super();
>                this.messageAggregatorFactory = messageAggregatorFactory;
>        }
>
>        public FlowRelgulatorComponent(CamelContext camelContext,
> MessageAggregatorFactory messageAggregatorFactory) {
>                super(camelContext);
>                this.messageAggregatorFactory = messageAggregatorFactory;
>        }
>
>        @SuppressWarnings("unchecked")
>        @Override
>    protected Endpoint createEndpoint(String uri, String remaining, Map
> parameters) throws Exception {
>                Long period = (Long)getAndRemoveParameter(parameters, "period",
> Long.class);
>                if (period == null) {
>                        period = defaultPeriod;
>                }
>                FlowRelgulatorEndPoint flowRelgulatorEndPoint = new
> FlowRelgulatorEndPoint(uri, this, remaining);
>                flowRelgulatorEndPoint.setPeriod(period);
>
> flowRelgulatorEndPoint.setAggregator(messageAggregatorFactory.createMessageAggregator());
>                return flowRelgulatorEndPoint;
>    }
>
>        public void setDefaultPeriod(long defaultPeriod) {
>                this.defaultPeriod = defaultPeriod;
>        }
>
>        public void pipeIn(FlowRelgulatorEndPoint flowRelgulatorEndPoint) {
>                synchronized (flowRelgulatorEndPoints) {
>                        if (!flowRelgulatorEndPoints.contains(flowRelgulatorEndPoint)) {
>                                flowRelgulatorEndPoints.put(flowRelgulatorEndPoint);
>                        }
>                }
>        }
>
>        @Override
>        public void start() throws Exception {
>                super.start();
>                Thread pipeOut = new Thread(new Runnable() {
>                        @Override
>                        public void run() {
>                                while (true) {
>                                        try {
>                                                FlowRelgulatorEndPoint flowRelgulatorEndPoint =
> flowRelgulatorEndPoints.poll(200, TimeUnit.MILLISECONDS);
>                                                if (flowRelgulatorEndPoint != null) {
>                                                        flowRelgulatorEndPoint.flush();
>                                                }
>                                        } catch (InterruptedException e) {
>                                        }
>                                }
>                        }
>                });
>                pipeOut.start();
>        }
>
>        public void setMessageAggregatorFactory(MessageAggregatorFactory
> messageAggregatorFactory) {
>                this.messageAggregatorFactory = messageAggregatorFactory;
>        }
> }
>
>
>
>
> /**
>  * Endpoint for FlowRegulator.
>  *
>  * @see FlowRegulatorComponent
>  * @author bernard LAURANT
>  */
> public class FlowRelgulatorEndPoint extends DefaultEndpoint implements
> Delayed {
>
>        private MessageAggregator messageAggregator;
>        private ReguledConsumer reguledConsumer;
>
>        private String remaining;
>        private long period;
>        private long timeOut;
>        private long lastMessSent;
>
>        public FlowRelgulatorEndPoint(String uri, FlowRelgulatorComponent
> flowRelgulatorComponent, String remaining) {
>                super(uri, flowRelgulatorComponent);
>                this.remaining = remaining;
>        }
>
>        @Override
>        public String toString() {
>                return createEndpointUri();
>        }
>
>        @Override
>        protected String createEndpointUri() {
>                return new
> StringBuilder().append("flowregulator://").append(remaining).append("?period=").append(period).toString();
>        }
>
>        @Override
>        public long getDelay(TimeUnit unit) {
>                long millis2PipeOut = -(System.currentTimeMillis() - timeOut);
>                return unit.convert(millis2PipeOut, TimeUnit.MILLISECONDS);
>        }
>
>        @Override
>        public int compareTo(Delayed o) {
>                long thisDelay = getDelay(TimeUnit.MILLISECONDS);
>                long anotherDelay = o.getDelay(TimeUnit.MILLISECONDS);
>                return (thisDelay<anotherDelay ? -1 : (thisDelay==anotherDelay ? 0 : 1));
>        }
>
>        @Override
>        public int hashCode() {
>                final int prime = 31;
>                int result = super.hashCode();
>                result = prime * result + (int) (period ^ (period >>> 32));
>                result = prime * result + ((remaining == null) ? 0 :
> remaining.hashCode());
>                return result;
>        }
>
>        @Override
>        public boolean equals(Object obj) {
>                if (this == obj)
>                        return true;
>                if (!super.equals(obj))
>                        return false;
>                if (getClass() != obj.getClass())
>                        return false;
>                FlowRelgulatorEndPoint other = (FlowRelgulatorEndPoint) obj;
>                if (remaining == null) {
>                        if (other.remaining != null)
>                                return false;
>                } else if (!remaining.equals(other.remaining))
>                        return false;
>                if (period != other.period)
>                        return false;
>                return true;
>        }
>
>
>        @Override
>        public Producer createProducer() throws Exception {
>                return new DefaultProducer(this) {
>                        @Override
>                        public void process(Exchange exchange) throws Exception {
>                                messageAggregator.agregateMessage(exchange.getIn());
>                                if (lastMessSent == 0) {
>                                        flush();// first msg: send it immediately
>                                } else {
>                                        timeOut = lastMessSent + period; // timeout to pipeout the msg
>                                        if (System.currentTimeMillis() >= timeOut) {
>                                                flush(); // send it immediately, because timeout has expired
>                                        } else {
>                                                // put it in the time pipeline
>
> ((FlowRelgulatorComponent)FlowRelgulatorEndPoint.this.getComponent()).pipeIn(FlowRelgulatorEndPoint.this);
>                                        }
>                                }
>                        }
>                };
>        }
>
>        @Override
>        public Consumer createConsumer(Processor processor) throws Exception {
>                if (reguledConsumer == null) {
>                        reguledConsumer = new ReguledConsumer(this, processor);
>                }
>                return reguledConsumer;
>        }
>
>        public void flush() {
>                reguledConsumer.send();
>                lastMessSent = System.currentTimeMillis();
>        }
>
>        class ReguledConsumer extends DefaultConsumer {
>
>                public ReguledConsumer(Endpoint endPoint, Processor processor) {
>                        super(endPoint, processor);
>                }
>
>                public void send() {
>                        Exchange exchange = getEndpoint().createExchange();
>                        exchange.setIn(messageAggregator.getAndClearMessage());
>                        try {
>                                getProcessor().process(exchange);
>                        } catch (Exception e) {
>                                handleException(e);
>                        }
>                }
>        }
>
>        @Override
>        public boolean isSingleton() {
>                return true;
>        }
>
>        public void setPeriod(long period) {
>                this.period = period;
>        }
>
>        public void setAggregator(MessageAggregator messageAggregator) {
>                this.messageAggregator = messageAggregator;
>        }
> }
>
>
> The MessageAgregator interface :
>
> public interface MessageAggregatorFactory {
>
>        MessageAggregator createMessageAggregator();
> }
>
> public interface MessageAggregator {
>        void agregateMessage(Message message);
>        void clearMessage();
>        Message getMessage();
>        Message getAndClearMessage();
> }
>
> Exemple for a Map :
>
> public class MapMessageAggregatorFactory implements MessageAggregatorFactory
> {
>
>        @Override
>        public MessageAggregator createMessageAggregator() {
>                return new MessageAggregator() {
>
>                        Message data;
>
>                        @Override
>                        public synchronized Message getMessage() {
>                                return data;
>                        }
>
>                        @Override
>                        public synchronized void clearMessage() {
>                                data = null;
>                        }
>
>                        @Override
>                        public synchronized Message getAndClearMessage() {
>                                Message res = getMessage();
>                                clearMessage();
>                                return res;
>                        }
>
>                        @SuppressWarnings("unchecked")
>                        @Override
>                        public synchronized void agregateMessage(Message message) {
>                                if (message == null)
>                                        return;
>                                if (data == null) {
>                                        data = message;
>                                } else {
>                                        Map newData = (Map) message.getBody();
>                                        ((Map)data.getBody()).putAll(newData);
>                                }
>                        }
>                };
>        }
> }
>
> exemple for a string :
>
> public class StringMessageAggregatorFactory implements
> MessageAggregatorFactory {
>
>        @Override
>        public MessageAggregator createMessageAggregator() {
>                return new MessageAggregator() {
>
>                        Message data;
>
>                        @Override
>                        public synchronized Message getMessage() {
>                                return data;
>                        }
>
>                        @Override
>                        public synchronized void clearMessage() {
>                                data.setBody("");
>                        }
>
>                        @Override
>                        public synchronized void agregateMessage(Message message) {
>                                if (data == null) {
>                                        data = message;
>                                } else {
>                                        data.setBody((String)data.getBody() + message.getBody());
>                                        data.getHeaders().putAll(message.getHeaders());
>                                }
>                        }
>
>                        @Override
>                        public synchronized Message getAndClearMessage() {
>                                Message res = data;
>                                data = null;
>                                return res;
>                        }
>                };
>        }
> }
>
> And a simple test :
>
> public class TestFlow {
>
>        static CamelContext camelContext;
>        static int counter = 0;
>
>        public static void main(String[] args) throws Exception {
>                camelContext = new DefaultCamelContext();
>                camelContext.addComponent("flowregulator", new FlowRelgulatorComponent(new
> StringMessageAggregatorFactory()));
>
>                camelContext.addRoutes(new RouteBuilder() {
>                        @Override
>                        public void configure() throws Exception {
>                                from("timer://test?period=1000")
>                                .process(new Processor() {
>                                        @Override
>                                        public void process(Exchange exchange) throws Exception {
>                                                System.out.println("timer : " + exchange.getIn().getHeaders());
>                                                Message msg = exchange.getIn();
>                                                msg.setBody(Integer.toString(counter++));
>                                                exchange.setOut(msg);
>                                        }
>                                })
>                                .to("flowregulator://test?period=5000");
>
>                                from("flowregulator://test?period=5000").process(new Processor() {
>                                        @Override
>                                        public void process(Exchange exchange) throws Exception {
>                                                System.out.println("flowReg: " + exchange.getIn() + " " +
> exchange.getIn().getHeaders());
>                                        }
>                                });
>                        }
>                });
>                camelContext.start();
>        }
>
> }
>
>
>
> --
> View this message in context: http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28230406.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus